Browse Source

Move all session cleanup code to a single code path.

This should resolve any cases where sessions were not removed from the hub
after their connection closed.

Also moves buddy image management entirely into the session and
vastly improves locking around session data. Also allocates less
when sending client left messages and paves the way for session specific
message codecs.
pull/159/head
Lance Cooper 11 years ago
parent
commit
6bcd814c1f
  1. 95
      src/app/spreed-webrtc-server/channelling_api.go
  2. 18
      src/app/spreed-webrtc-server/channelling_api_test.go
  3. 6
      src/app/spreed-webrtc-server/client.go
  4. 12
      src/app/spreed-webrtc-server/connection.go
  5. 22
      src/app/spreed-webrtc-server/hub.go
  6. 4
      src/app/spreed-webrtc-server/main.go
  7. 29
      src/app/spreed-webrtc-server/room_manager.go
  8. 18
      src/app/spreed-webrtc-server/roomworker.go
  9. 212
      src/app/spreed-webrtc-server/session.go
  10. 34
      src/app/spreed-webrtc-server/session_manager.go
  11. 4
      src/app/spreed-webrtc-server/user.go
  12. 2
      src/app/spreed-webrtc-server/ws.go

95
src/app/spreed-webrtc-server/channelling_api.go

@ -23,7 +23,6 @@ package main
import ( import (
"log" "log"
"strings"
"time" "time"
) )
@ -34,7 +33,6 @@ const (
type ChannellingAPI interface { type ChannellingAPI interface {
OnConnect(Client, *Session) OnConnect(Client, *Session)
OnIncoming(ResponseSender, *Session, *DataIncoming) OnIncoming(ResponseSender, *Session, *DataIncoming)
OnDisconnect(*Session)
} }
type channellingAPI struct { type channellingAPI struct {
@ -46,11 +44,9 @@ type channellingAPI struct {
ContactManager ContactManager
TurnDataCreator TurnDataCreator
Unicaster Unicaster
Broadcaster
buddyImages ImageCache
} }
func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEncoder SessionEncoder, sessionManager SessionManager, statsCounter StatsCounter, contactManager ContactManager, turnDataCreator TurnDataCreator, unicaster Unicaster, broadcaster Broadcaster, buddyImages ImageCache) ChannellingAPI { func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEncoder SessionEncoder, sessionManager SessionManager, statsCounter StatsCounter, contactManager ContactManager, turnDataCreator TurnDataCreator, unicaster Unicaster) ChannellingAPI {
return &channellingAPI{ return &channellingAPI{
config, config,
roomStatus, roomStatus,
@ -60,8 +56,6 @@ func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEnco
contactManager, contactManager,
turnDataCreator, turnDataCreator,
unicaster, unicaster,
broadcaster,
buddyImages,
} }
} }
@ -77,40 +71,31 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D
case "Hello": case "Hello":
//log.Println("Hello", msg.Hello, c.Index()) //log.Println("Hello", msg.Hello, c.Index())
// TODO(longsleep): Filter room id and user agent. // TODO(longsleep): Filter room id and user agent.
api.UpdateSession(session, &SessionUpdate{Types: []string{"Ua"}, Ua: msg.Hello.Ua}) session.Update(&SessionUpdate{Types: []string{"Ua"}, Ua: msg.Hello.Ua})
if session.Hello && session.Roomid != msg.Hello.Id {
api.LeaveRoom(session)
api.Broadcast(session, session.DataSessionLeft("soft"))
}
room, err := session.JoinRoom(msg.Hello.Id, msg.Hello.Credentials, c)
// NOTE(lcooper): Iid filtered for compatibility's sake. // NOTE(lcooper): Iid filtered for compatibility's sake.
// Evaluate sending unconditionally when supported by all clients. // Evaluate sending unconditionally when supported by all clients.
if room, err := api.JoinRoom(msg.Hello.Id, msg.Hello.Credentials, session, c); err == nil { if msg.Iid != "" {
session.Hello = true if err == nil {
session.Roomid = msg.Hello.Id
if msg.Iid != "" {
c.Reply(msg.Iid, &DataWelcome{ c.Reply(msg.Iid, &DataWelcome{
Type: "Welcome", Type: "Welcome",
Room: room, Room: room,
Users: api.RoomUsers(session), Users: api.RoomUsers(session),
}) })
} } else {
api.Broadcast(session, session.DataSessionJoined())
} else {
session.Hello = false
if msg.Iid != "" {
c.Reply(msg.Iid, err) c.Reply(msg.Iid, err)
} }
} }
case "Offer": case "Offer":
// TODO(longsleep): Validate offer // TODO(longsleep): Validate offer
api.Unicast(session, msg.Offer.To, msg.Offer) session.Unicast(msg.Offer.To, msg.Offer)
case "Candidate": case "Candidate":
// TODO(longsleep): Validate candidate // TODO(longsleep): Validate candidate
api.Unicast(session, msg.Candidate.To, msg.Candidate) session.Unicast(msg.Candidate.To, msg.Candidate)
case "Answer": case "Answer":
// TODO(longsleep): Validate Answer // TODO(longsleep): Validate Answer
api.Unicast(session, msg.Answer.To, msg.Answer) session.Unicast(msg.Answer.To, msg.Answer)
case "Users": case "Users":
if session.Hello { if session.Hello {
sessions := &DataSessions{Type: "Users", Users: api.RoomUsers(session)} sessions := &DataSessions{Type: "Users", Users: api.RoomUsers(session)}
@ -125,27 +110,27 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D
if err := api.Authenticate(session, st, ""); err == nil { if err := api.Authenticate(session, st, ""); err == nil {
log.Println("Authentication success", session.Userid) log.Println("Authentication success", session.Userid)
api.SendSelf(c, session) api.SendSelf(c, session)
api.BroadcastSessionStatus(session) session.BroadcastStatus()
} else { } else {
log.Println("Authentication failed", err, st.Userid, st.Nonce) log.Println("Authentication failed", err, st.Userid, st.Nonce)
} }
case "Bye": case "Bye":
api.Unicast(session, msg.Bye.To, msg.Bye) session.Unicast(msg.Bye.To, msg.Bye)
case "Status": case "Status":
//log.Println("Status", msg.Status) //log.Println("Status", msg.Status)
api.UpdateSession(session, &SessionUpdate{Types: []string{"Status"}, Status: msg.Status.Status}) session.Update(&SessionUpdate{Types: []string{"Status"}, Status: msg.Status.Status})
api.BroadcastSessionStatus(session) session.BroadcastStatus()
case "Chat": case "Chat":
// TODO(longsleep): Limit sent chat messages per incoming connection. // TODO(longsleep): Limit sent chat messages per incoming connection.
if !msg.Chat.Chat.NoEcho { if !msg.Chat.Chat.NoEcho {
api.Unicast(session, session.Id, msg.Chat) session.Unicast(session.Id, msg.Chat)
} }
msg.Chat.Chat.Time = time.Now().Format(time.RFC3339) msg.Chat.Chat.Time = time.Now().Format(time.RFC3339)
if msg.Chat.To == "" { if msg.Chat.To == "" {
// TODO(longsleep): Check if chat broadcast is allowed. // TODO(longsleep): Check if chat broadcast is allowed.
if session.Hello { if session.Hello {
api.CountBroadcastChat() api.CountBroadcastChat()
api.Broadcast(session, msg.Chat) session.Broadcast(msg.Chat)
} }
} else { } else {
if msg.Chat.Chat.Status != nil && msg.Chat.Chat.Status.ContactRequest != nil { if msg.Chat.Chat.Status != nil && msg.Chat.Chat.Status.ContactRequest != nil {
@ -159,10 +144,10 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D
api.CountUnicastChat() api.CountUnicastChat()
} }
api.Unicast(session, msg.Chat.To, msg.Chat) session.Unicast(msg.Chat.To, msg.Chat)
if msg.Chat.Chat.Mid != "" { if msg.Chat.Chat.Mid != "" {
// Send out delivery confirmation status chat message. // Send out delivery confirmation status chat message.
api.Unicast(session, session.Id, &DataChat{To: msg.Chat.To, Type: "Chat", Chat: &DataChatMessage{Mid: msg.Chat.Chat.Mid, Status: &DataChatStatus{State: "sent"}}}) session.Unicast(session.Id, &DataChat{To: msg.Chat.To, Type: "Chat", Chat: &DataChatMessage{Mid: msg.Chat.Chat.Mid, Status: &DataChatStatus{State: "sent"}}})
} }
} }
case "Conference": case "Conference":
@ -173,7 +158,7 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D
// Send conference update to anyone. // Send conference update to anyone.
for _, id := range msg.Conference.Conference { for _, id := range msg.Conference.Conference {
if id != session.Id { if id != session.Id {
api.Unicast(session, id, msg.Conference) session.Unicast(id, msg.Conference)
} }
} }
} }
@ -211,7 +196,7 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D
} }
case "Room": case "Room":
if room, err := api.UpdateRoom(session, msg.Room); err == nil { if room, err := api.UpdateRoom(session, msg.Room); err == nil {
api.Broadcast(session, room) session.Broadcast(room)
c.Reply(msg.Iid, room) c.Reply(msg.Iid, room)
} else { } else {
c.Reply(msg.Iid, err) c.Reply(msg.Iid, err)
@ -221,25 +206,6 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D
} }
} }
func (api *channellingAPI) OnDisconnect(session *Session) {
session.Disconnect()
api.LeaveRoom(session)
dsl := session.DataSessionLeft("hard")
if session.Hello {
api.Broadcast(session, dsl)
}
session.RunForAllSubscribers(func(session *Session) {
log.Println("Notifying subscriber that we are gone", session.Id)
api.Unicast(session, session.Id, dsl)
})
api.Unicaster.OnDisconnect(session)
api.buddyImages.Delete(session.Id)
}
func (api *channellingAPI) SendSelf(c Responder, session *Session) { func (api *channellingAPI) SendSelf(c Responder, session *Session) {
token, err := api.EncodeSessionToken(session) token, err := api.EncodeSessionToken(session)
if err != nil { if err != nil {
@ -261,26 +227,3 @@ func (api *channellingAPI) SendSelf(c Responder, session *Session) {
} }
c.Reply("", self) c.Reply("", self)
} }
func (api *channellingAPI) UpdateSession(session *Session, s *SessionUpdate) uint64 {
if s.Status != nil {
status, ok := s.Status.(map[string]interface{})
if ok && status["buddyPicture"] != nil {
pic := status["buddyPicture"].(string)
if strings.HasPrefix(pic, "data:") {
imageId := api.buddyImages.Update(session.Id, pic[5:])
if imageId != "" {
status["buddyPicture"] = "img:" + imageId
}
}
}
}
return session.Update(s)
}
func (api *channellingAPI) BroadcastSessionStatus(session *Session) {
if session.Hello {
api.Broadcast(session, session.DataSessionStatus())
}
}

18
src/app/spreed-webrtc-server/channelling_api_test.go

@ -62,12 +62,12 @@ func (fake *fakeRoomManager) JoinRoom(id string, _ *DataRoomCredentials, session
return &DataRoom{Name: id}, fake.joinError return &DataRoom{Name: id}, fake.joinError
} }
func (fake *fakeRoomManager) LeaveRoom(session *Session) { func (fake *fakeRoomManager) LeaveRoom(roomID, sessionID string) {
fake.leftID = session.Roomid fake.leftID = roomID
} }
func (fake *fakeRoomManager) Broadcast(_ *Session, msg interface{}) { func (fake *fakeRoomManager) Broadcast(_, _ string, outgoing *DataOutgoing) {
fake.broadcasts = append(fake.broadcasts, msg) fake.broadcasts = append(fake.broadcasts, outgoing.Data)
} }
func (fake *fakeRoomManager) UpdateRoom(_ *Session, _ *DataRoom) (*DataRoom, error) { func (fake *fakeRoomManager) UpdateRoom(_ *Session, _ *DataRoom) (*DataRoom, error) {
@ -98,8 +98,14 @@ func assertErrorReply(t *testing.T, client *fakeClient, iid, code string) {
} }
func NewTestChannellingAPI() (ChannellingAPI, *fakeClient, *Session, *fakeRoomManager) { func NewTestChannellingAPI() (ChannellingAPI, *fakeClient, *Session, *fakeRoomManager) {
client, roomManager, session := &fakeClient{}, &fakeRoomManager{}, &Session{} client, roomManager := &fakeClient{}, &fakeRoomManager{}
return NewChannellingAPI(nil, roomManager, nil, nil, nil, nil, nil, nil, roomManager, nil), client, session, roomManager session := &Session{
attestations: sessionNonces,
Broadcaster: roomManager,
RoomStatusManager: roomManager,
}
session.attestation = &SessionAttestation{s: session}
return NewChannellingAPI(nil, roomManager, nil, nil, nil, nil, nil, nil), client, session, roomManager
} }
func Test_ChannellingAPI_OnIncoming_HelloMessage_JoinsTheSelectedRoom(t *testing.T) { func Test_ChannellingAPI_OnIncoming_HelloMessage_JoinsTheSelectedRoom(t *testing.T) {

6
src/app/spreed-webrtc-server/client.go

@ -42,7 +42,7 @@ type Client interface {
ResponseSender ResponseSender
Session() *Session Session() *Session
Index() uint64 Index() uint64
Close(bool) Close()
} }
type client struct { type client struct {
@ -70,10 +70,6 @@ func (client *client) OnText(b Buffer) {
} }
} }
func (client *client) OnDisconnect() {
client.ChannellingAPI.OnDisconnect(client.session)
}
func (client *client) Reply(iid string, m interface{}) { func (client *client) Reply(iid string, m interface{}) {
outgoing := &DataOutgoing{From: client.session.Id, Iid: iid, Data: m} outgoing := &DataOutgoing{From: client.session.Id, Iid: iid, Data: m}
if b, err := client.EncodeOutgoing(outgoing); err == nil { if b, err := client.EncodeOutgoing(outgoing); err == nil {

12
src/app/spreed-webrtc-server/connection.go

@ -56,7 +56,7 @@ const (
type Connection interface { type Connection interface {
Index() uint64 Index() uint64
Send(Buffer) Send(Buffer)
Close(runCallbacks bool) Close()
readPump() readPump()
writePump() writePump()
} }
@ -65,7 +65,6 @@ type ConnectionHandler interface {
NewBuffer() Buffer NewBuffer() Buffer
OnConnect(Connection) OnConnect(Connection)
OnText(Buffer) OnText(Buffer)
OnDisconnect()
} }
type connection struct { type connection struct {
@ -98,15 +97,12 @@ func (c *connection) Index() uint64 {
return c.Idx return c.Idx
} }
func (c *connection) Close(runCallbacks bool) { func (c *connection) Close() {
c.mutex.Lock() c.mutex.Lock()
if c.isClosed { if c.isClosed {
c.mutex.Unlock() c.mutex.Unlock()
return return
} }
if runCallbacks {
c.handler.OnDisconnect()
}
c.ws.Close() c.ws.Close()
c.isClosed = true c.isClosed = true
for { for {
@ -170,7 +166,7 @@ func (c *connection) readPump() {
} }
} }
c.Close(true) c.Close()
} }
// Write message to outbound queue. // Write message to outbound queue.
@ -269,7 +265,7 @@ func (c *connection) writePump() {
cleanup: cleanup:
//fmt.Println("writePump done") //fmt.Println("writePump done")
timer.Stop() timer.Stop()
c.Close(true) c.Close()
} }
// Write ping message. // Write ping message.

22
src/app/spreed-webrtc-server/hub.go

@ -48,8 +48,8 @@ type SessionStore interface {
type Unicaster interface { type Unicaster interface {
SessionStore SessionStore
OnConnect(Client, *Session) OnConnect(Client, *Session)
Unicast(session *Session, to string, m interface{}) Unicast(to string, outgoing *DataOutgoing)
OnDisconnect(*Session) OnDisconnect(sessionID string)
} }
type ContactManager interface { type ContactManager interface {
@ -158,7 +158,7 @@ func (h *hub) OnConnect(client Client, session *Session) {
// Register connection or replace existing one. // Register connection or replace existing one.
if ec, ok := h.clients[session.Id]; ok { if ec, ok := h.clients[session.Id]; ok {
ec.Close(false) ec.Close()
//log.Printf("Register (%d) from %s: %s (existing)\n", c.Idx, c.Id) //log.Printf("Register (%d) from %s: %s (existing)\n", c.Idx, c.Id)
} }
h.clients[session.Id] = client h.clients[session.Id] = client
@ -167,26 +167,20 @@ func (h *hub) OnConnect(client Client, session *Session) {
//log.Printf("Register (%d) from %s: %s\n", c.Idx, c.Id) //log.Printf("Register (%d) from %s: %s\n", c.Idx, c.Id)
} }
func (h *hub) OnDisconnect(session *Session) { func (h *hub) OnDisconnect(sessionID string) {
h.mutex.Lock() h.mutex.Lock()
delete(h.clients, session.Id) delete(h.clients, sessionID)
h.mutex.Unlock() h.mutex.Unlock()
} }
func (h *hub) GetClient(id string) (client Client, ok bool) { func (h *hub) GetClient(sessionID string) (client Client, ok bool) {
h.mutex.RLock() h.mutex.RLock()
client, ok = h.clients[id] client, ok = h.clients[sessionID]
h.mutex.RUnlock() h.mutex.RUnlock()
return return
} }
func (h *hub) Unicast(session *Session, to string, m interface{}) { func (h *hub) Unicast(to string, outgoing *DataOutgoing) {
outgoing := &DataOutgoing{
From: session.Id,
To: to,
A: session.Attestation(),
Data: m,
}
if message, err := h.EncodeOutgoing(outgoing); err == nil { if message, err := h.EncodeOutgoing(outgoing); err == nil {
client, ok := h.GetClient(to) client, ok := h.GetClient(to)
if !ok { if !ok {

4
src/app/spreed-webrtc-server/main.go

@ -340,9 +340,9 @@ func runner(runtime phoenix.Runtime) error {
roomManager := NewRoomManager(config, codec) roomManager := NewRoomManager(config, codec)
hub := NewHub(config, sessionSecret, encryptionSecret, turnSecret, codec) hub := NewHub(config, sessionSecret, encryptionSecret, turnSecret, codec)
tickets := NewTickets(sessionSecret, encryptionSecret, computedRealm) tickets := NewTickets(sessionSecret, encryptionSecret, computedRealm)
sessionManager := NewSessionManager(config, tickets, sessionSecret) sessionManager := NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret)
statsManager := NewStatsManager(hub, roomManager, sessionManager) statsManager := NewStatsManager(hub, roomManager, sessionManager)
channellingAPI := NewChannellingAPI(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, roomManager, buddyImages) channellingAPI := NewChannellingAPI(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub)
r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler)) r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler))
r.Handle("/static/img/buddy/{flags}/{imageid}/{idx:.*}", http.StripPrefix(config.B, makeImageHandler(buddyImages, time.Duration(24)*time.Hour))) r.Handle("/static/img/buddy/{flags}/{imageid}/{idx:.*}", http.StripPrefix(config.B, makeImageHandler(buddyImages, time.Duration(24)*time.Hour)))
r.Handle("/static/{path:.*}", http.StripPrefix(config.B, httputils.FileStaticServer(http.Dir(rootFolder)))) r.Handle("/static/{path:.*}", http.StripPrefix(config.B, httputils.FileStaticServer(http.Dir(rootFolder))))

29
src/app/spreed-webrtc-server/room_manager.go

@ -29,12 +29,12 @@ import (
type RoomStatusManager interface { type RoomStatusManager interface {
RoomUsers(*Session) []*DataSession RoomUsers(*Session) []*DataSession
JoinRoom(string, *DataRoomCredentials, *Session, Sender) (*DataRoom, error) JoinRoom(string, *DataRoomCredentials, *Session, Sender) (*DataRoom, error)
LeaveRoom(*Session) LeaveRoom(roomID, sessionID string)
UpdateRoom(*Session, *DataRoom) (*DataRoom, error) UpdateRoom(*Session, *DataRoom) (*DataRoom, error)
} }
type Broadcaster interface { type Broadcaster interface {
Broadcast(*Session, interface{}) Broadcast(sessionID, roomID string, outgoing *DataOutgoing)
} }
type RoomStats interface { type RoomStats interface {
@ -84,9 +84,9 @@ func (rooms *roomManager) JoinRoom(id string, credentials *DataRoomCredentials,
return roomWorker.Join(credentials, session, sender) return roomWorker.Join(credentials, session, sender)
} }
func (rooms *roomManager) LeaveRoom(session *Session) { func (rooms *roomManager) LeaveRoom(roomID, sessionID string) {
if room, ok := rooms.Get(session.Roomid); ok { if room, ok := rooms.Get(roomID); ok {
room.Leave(session) room.Leave(sessionID)
} }
} }
@ -104,29 +104,22 @@ func (rooms *roomManager) UpdateRoom(session *Session, room *DataRoom) (*DataRoo
return room, nil return room, nil
} }
func (rooms *roomManager) Broadcast(session *Session, m interface{}) { func (rooms *roomManager) Broadcast(sessionID, roomID string, outgoing *DataOutgoing) {
outgoing := &DataOutgoing{
From: session.Id,
A: session.Attestation(),
Data: m,
}
message, err := rooms.EncodeOutgoing(outgoing) message, err := rooms.EncodeOutgoing(outgoing)
if err != nil { if err != nil {
return return
} }
id := session.Roomid if roomID != "" && roomID == rooms.globalRoomID {
if id != "" && id == rooms.globalRoomID {
rooms.RLock() rooms.RLock()
for _, room := range rooms.roomTable { for _, room := range rooms.roomTable {
room.Broadcast(session, message) room.Broadcast(sessionID, message)
} }
rooms.RUnlock() rooms.RUnlock()
} else if room, ok := rooms.Get(id); ok { } else if room, ok := rooms.Get(roomID); ok {
room.Broadcast(session, message) room.Broadcast(sessionID, message)
} else { } else {
log.Printf("No room named %s found for broadcast message %#v", id, m) log.Printf("No room named %s found for broadcast %#v", roomID, outgoing)
} }
message.Decref() message.Decref()
} }

18
src/app/spreed-webrtc-server/roomworker.go

@ -39,9 +39,9 @@ type RoomWorker interface {
Users() []*roomUser Users() []*roomUser
Update(*DataRoom) error Update(*DataRoom) error
GetUsers() []*DataSession GetUsers() []*DataSession
Broadcast(*Session, Buffer) Broadcast(sessionID string, buf Buffer)
Join(*DataRoomCredentials, *Session, Sender) (*DataRoom, error) Join(*DataRoomCredentials, *Session, Sender) (*DataRoom, error)
Leave(*Session) Leave(sessionID string)
} }
type roomWorker struct { type roomWorker struct {
@ -213,12 +213,12 @@ func (r *roomWorker) GetUsers() []*DataSession {
return <-out return <-out
} }
func (r *roomWorker) Broadcast(session *Session, message Buffer) { func (r *roomWorker) Broadcast(sessionID string, message Buffer) {
worker := func() { worker := func() {
r.mutex.RLock() r.mutex.RLock()
for id, user := range r.users { for id, user := range r.users {
if id == session.Id { if id == sessionID {
// Skip broadcast to self. // Skip broadcast to self.
continue continue
} }
@ -261,10 +261,6 @@ func (r *roomWorker) Join(credentials *DataRoomCredentials, session *Session, se
} }
} }
if session.IsDisconnected() {
log.Println("Refusing to join a disconnected session to room", session.Id)
return
}
r.users[session.Id] = &roomUser{session, sender} r.users[session.Id] = &roomUser{session, sender}
// NOTE(lcooper): Needs to be a copy, else we risk races with // NOTE(lcooper): Needs to be a copy, else we risk races with
// a subsequent modification of room properties. // a subsequent modification of room properties.
@ -277,11 +273,11 @@ func (r *roomWorker) Join(credentials *DataRoomCredentials, session *Session, se
return result.DataRoom, result.error return result.DataRoom, result.error
} }
func (r *roomWorker) Leave(session *Session) { func (r *roomWorker) Leave(sessionID string) {
worker := func() { worker := func() {
r.mutex.Lock() r.mutex.Lock()
if _, ok := r.users[session.Id]; ok { if _, ok := r.users[sessionID]; ok {
delete(r.users, session.Id) delete(r.users, sessionID)
} }
r.mutex.Unlock() r.mutex.Unlock()
} }

212
src/app/spreed-webrtc-server/session.go

@ -25,6 +25,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/gorilla/securecookie" "github.com/gorilla/securecookie"
"strings"
"sync" "sync"
"time" "time"
) )
@ -32,6 +33,11 @@ import (
var sessionNonces *securecookie.SecureCookie var sessionNonces *securecookie.SecureCookie
type Session struct { type Session struct {
SessionManager
Unicaster
Broadcaster
RoomStatusManager
buddyImages ImageCache
Id string Id string
Sid string Sid string
Ua string Ua string
@ -52,16 +58,21 @@ type Session struct {
disconnected bool disconnected bool
} }
func NewSession(attestations *securecookie.SecureCookie, id, sid string) *Session { func NewSession(manager SessionManager, unicaster Unicaster, broadcaster Broadcaster, rooms RoomStatusManager, buddyImages ImageCache, attestations *securecookie.SecureCookie, id, sid string) *Session {
session := &Session{ session := &Session{
Id: id, SessionManager: manager,
Sid: sid, Unicaster: unicaster,
Prio: 100, Broadcaster: broadcaster,
stamp: time.Now().Unix(), RoomStatusManager: rooms,
attestations: attestations, buddyImages: buddyImages,
subscriptions: make(map[string]*Session), Id: id,
subscribers: make(map[string]*Session), Sid: sid,
Prio: 100,
stamp: time.Now().Unix(),
attestations: attestations,
subscriptions: make(map[string]*Session),
subscribers: make(map[string]*Session),
} }
session.NewAttestation() session.NewAttestation()
return session return session
@ -104,52 +115,144 @@ func (s *Session) RemoveSubscriber(id string) {
s.mutex.Unlock() s.mutex.Unlock()
} }
func (s *Session) RunForAllSubscribers(f func(session *Session)) { func (s *Session) JoinRoom(roomID string, credentials *DataRoomCredentials, sender Sender) (*DataRoom, error) {
s.mutex.Lock() s.mutex.Lock()
for _, session := range s.subscribers { defer s.mutex.Unlock()
s.mutex.Unlock()
f(session) if s.Hello && s.Roomid != roomID {
s.mutex.Lock() s.RoomStatusManager.LeaveRoom(s.Roomid, s.Id)
s.Broadcaster.Broadcast(s.Id, s.Roomid, &DataOutgoing{
From: s.Id,
A: s.attestation.Token(),
Data: &DataSession{
Type: "Left",
Id: s.Id,
Status: "soft",
},
})
} }
s.mutex.Unlock()
room, err := s.RoomStatusManager.JoinRoom(roomID, credentials, s, sender)
if err == nil {
s.Hello = true
s.Roomid = roomID
s.Broadcaster.Broadcast(s.Id, s.Roomid, &DataOutgoing{
From: s.Id,
A: s.attestation.Token(),
Data: &DataSession{
Type: "Joined",
Id: s.Id,
Userid: s.userid,
Ua: s.Ua,
Prio: s.Prio,
},
})
} else {
s.Hello = false
}
return room, err
} }
func (s *Session) Disconnect() { func (s *Session) Broadcast(m interface{}) {
s.mutex.RLock()
s.mutex.Lock() if s.Hello {
s.disconnected = true s.Broadcaster.Broadcast(s.Id, s.Roomid, &DataOutgoing{
s.mutex.Unlock() From: s.Id,
A: s.attestation.Token(),
Data: m,
})
}
s.mutex.RUnlock()
} }
func (s *Session) IsDisconnected() bool { func (s *Session) BroadcastStatus() {
s.mutex.RLock()
if s.Hello {
s.Broadcaster.Broadcast(s.Id, s.Roomid, &DataOutgoing{
From: s.Id,
A: s.attestation.Token(),
Data: &DataSession{
Type: "Status",
Id: s.Id,
Userid: s.userid,
Status: s.Status,
Rev: s.UpdateRev,
Prio: s.Prio,
},
})
}
s.mutex.RUnlock()
}
func (s *Session) Unicast(to string, m interface{}) {
s.mutex.RLock() s.mutex.RLock()
defer s.mutex.RUnlock() outgoing := &DataOutgoing{
return s.disconnected From: s.Id,
To: to,
A: s.attestation.Token(),
Data: m,
}
s.mutex.RUnlock()
s.Unicaster.Unicast(to, outgoing)
} }
func (s *Session) Close() { func (s *Session) Close() {
s.mutex.Lock() s.mutex.Lock()
// Remove foreign references.
outgoing := &DataOutgoing{
From: s.Id,
A: s.attestation.Token(),
Data: &DataSession{
Type: "Left",
Id: s.Id,
Status: "hard",
},
}
if s.Hello {
// NOTE(lcooper): If we don't check for Hello here, we could deadlock
// when implicitly creating a room while a user is reconnecting.
s.Broadcaster.Broadcast(s.Id, s.Roomid, outgoing)
s.RoomStatusManager.LeaveRoom(s.Roomid, s.Id)
}
for _, session := range s.subscribers {
s.Unicaster.Unicast(session.Id, outgoing)
}
for _, session := range s.subscriptions { for _, session := range s.subscriptions {
session.RemoveSubscriber(s.Id) session.RemoveSubscriber(s.Id)
} }
// Remove session cross references.
s.Unicaster.OnDisconnect(s.Id)
s.SessionManager.DestroySession(s.Id, s.userid)
s.buddyImages.Delete(s.Id)
s.subscriptions = make(map[string]*Session) s.subscriptions = make(map[string]*Session)
s.subscribers = make(map[string]*Session) s.subscribers = make(map[string]*Session)
s.mutex.Unlock() s.disconnected = true
s.mutex.Unlock()
} }
func (s *Session) Update(update *SessionUpdate) uint64 { func (s *Session) Update(update *SessionUpdate) uint64 {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
if update.Status != nil {
status, ok := update.Status.(map[string]interface{})
if ok && status["buddyPicture"] != nil {
pic := status["buddyPicture"].(string)
if strings.HasPrefix(pic, "data:") {
imageId := s.buddyImages.Update(s.Id, pic[5:])
if imageId != "" {
status["buddyPicture"] = "img:" + imageId
}
}
}
}
for _, key := range update.Types { for _, key := range update.Types {
//fmt.Println("type update", key) //fmt.Println("type update", key)
@ -268,50 +371,6 @@ func (s *Session) SetUseridFake(userid string) {
} }
func (s *Session) DataSessionLeft(state string) *DataSession {
s.mutex.RLock()
defer s.mutex.RUnlock()
return &DataSession{
Type: "Left",
Id: s.Id,
Status: state,
}
}
func (s *Session) DataSessionJoined() *DataSession {
s.mutex.RLock()
defer s.mutex.RUnlock()
return &DataSession{
Type: "Joined",
Id: s.Id,
Userid: s.userid,
Ua: s.Ua,
Prio: s.Prio,
}
}
func (s *Session) DataSessionStatus() *DataSession {
s.mutex.RLock()
defer s.mutex.RUnlock()
return &DataSession{
Type: "Status",
Id: s.Id,
Userid: s.userid,
Status: s.Status,
Rev: s.UpdateRev,
Prio: s.Prio,
}
}
func (s *Session) NewAttestation() { func (s *Session) NewAttestation() {
s.attestation = &SessionAttestation{ s.attestation = &SessionAttestation{
s: s, s: s,
@ -319,13 +378,6 @@ func (s *Session) NewAttestation() {
s.attestation.Update() s.attestation.Update()
} }
func (s *Session) Attestation() (attestation string) {
s.mutex.RLock()
attestation = s.attestation.Token()
s.mutex.RUnlock()
return
}
func (s *Session) UpdateAttestation() { func (s *Session) UpdateAttestation() {
s.mutex.Lock() s.mutex.Lock()
s.attestation.Update() s.attestation.Update()

34
src/app/spreed-webrtc-server/session_manager.go

@ -37,14 +37,18 @@ type SessionManager interface {
UserStats UserStats
RetrieveUsersWith(func(*http.Request) (string, error)) RetrieveUsersWith(func(*http.Request) (string, error))
CreateSession(*http.Request) *Session CreateSession(*http.Request) *Session
DestroySession(*Session) DestroySession(sessionID, userID string)
Authenticate(*Session, *SessionToken, string) error Authenticate(*Session, *SessionToken, string) error
GetUserSessions(session *Session, id string) []*DataSession GetUserSessions(session *Session, id string) []*DataSession
} }
type sessionManager struct { type sessionManager struct {
Tickets
sync.RWMutex sync.RWMutex
Tickets
Unicaster
Broadcaster
RoomStatusManager
buddyImages ImageCache
config *Config config *Config
userTable map[string]*User userTable map[string]*User
fakesessionTable map[string]*Session fakesessionTable map[string]*Session
@ -52,10 +56,14 @@ type sessionManager struct {
attestations *securecookie.SecureCookie attestations *securecookie.SecureCookie
} }
func NewSessionManager(config *Config, tickets Tickets, sessionSecret []byte) SessionManager { func NewSessionManager(config *Config, tickets Tickets, unicaster Unicaster, broadcaster Broadcaster, rooms RoomStatusManager, buddyImages ImageCache, sessionSecret []byte) SessionManager {
sessionManager := &sessionManager{ sessionManager := &sessionManager{
tickets,
sync.RWMutex{}, sync.RWMutex{},
tickets,
unicaster,
broadcaster,
rooms,
buddyImages,
config, config,
make(map[string]*User), make(map[string]*User),
make(map[string]*Session), make(map[string]*Session),
@ -103,7 +111,7 @@ func (sessionManager *sessionManager) CreateSession(request *http.Request) *Sess
} }
} }
session := NewSession(sessionManager.attestations, st.Id, st.Sid) session := NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid)
if userid != "" { if userid != "" {
// XXX(lcooper): Should errors be handled here? // XXX(lcooper): Should errors be handled here?
@ -113,15 +121,15 @@ func (sessionManager *sessionManager) CreateSession(request *http.Request) *Sess
return session return session
} }
func (sessionManager *sessionManager) DestroySession(session *Session) { func (sessionManager *sessionManager) DestroySession(sessionID, userID string) {
session.Close() if userID == "" {
return
}
sessionManager.Lock() sessionManager.Lock()
if suserid := session.Userid(); suserid != "" { user, ok := sessionManager.userTable[userID]
user, ok := sessionManager.userTable[suserid] if ok && user.RemoveSession(sessionID) {
if ok && user.RemoveSession(session) { delete(sessionManager.userTable, userID)
delete(sessionManager.userTable, suserid)
}
} }
sessionManager.Unlock() sessionManager.Unlock()
} }
@ -158,7 +166,7 @@ func (sessionManager *sessionManager) GetUserSessions(session *Session, userid s
session, ok := sessionManager.fakesessionTable[userid] session, ok := sessionManager.fakesessionTable[userid]
if !ok { if !ok {
st := sessionManager.FakeSessionToken(userid) st := sessionManager.FakeSessionToken(userid)
session = NewSession(sessionManager.attestations, st.Id, st.Sid) session = NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid)
session.SetUseridFake(st.Userid) session.SetUseridFake(st.Userid)
sessionManager.fakesessionTable[userid] = session sessionManager.fakesessionTable[userid] = session
} }

4
src/app/spreed-webrtc-server/user.go

@ -57,10 +57,10 @@ func (u *User) AddSession(s *Session) bool {
} }
// Return true if no session left. // Return true if no session left.
func (u *User) RemoveSession(s *Session) bool { func (u *User) RemoveSession(sessionID string) bool {
last := false last := false
u.mutex.Lock() u.mutex.Lock()
delete(u.sessionTable, s.Id) delete(u.sessionTable, sessionID)
if len(u.sessionTable) == 0 { if len(u.sessionTable) == 0 {
log.Println("Last session unregistered for user", u.Id) log.Println("Last session unregistered for user", u.Id)
last = true last = true

2
src/app/spreed-webrtc-server/ws.go

@ -69,7 +69,7 @@ func makeWSHandler(connectionCounter ConnectionCounter, sessionManager SessionMa
// Create a new connection instance. // Create a new connection instance.
session := sessionManager.CreateSession(r) session := sessionManager.CreateSession(r)
defer sessionManager.DestroySession(session) defer session.Close()
client := NewClient(codec, channellingAPI, session) client := NewClient(codec, channellingAPI, session)
conn := NewConnection(connectionCounter.CountConnection(), ws, client) conn := NewConnection(connectionCounter.CountConnection(), ws, client)

Loading…
Cancel
Save