diff --git a/src/app/spreed-webrtc-server/channelling_api.go b/src/app/spreed-webrtc-server/channelling_api.go index 1c2af420..96ea1db8 100644 --- a/src/app/spreed-webrtc-server/channelling_api.go +++ b/src/app/spreed-webrtc-server/channelling_api.go @@ -23,7 +23,6 @@ package main import ( "log" - "strings" "time" ) @@ -34,7 +33,6 @@ const ( type ChannellingAPI interface { OnConnect(Client, *Session) OnIncoming(ResponseSender, *Session, *DataIncoming) - OnDisconnect(*Session) } type channellingAPI struct { @@ -46,11 +44,9 @@ type channellingAPI struct { ContactManager TurnDataCreator Unicaster - Broadcaster - buddyImages ImageCache } -func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEncoder SessionEncoder, sessionManager SessionManager, statsCounter StatsCounter, contactManager ContactManager, turnDataCreator TurnDataCreator, unicaster Unicaster, broadcaster Broadcaster, buddyImages ImageCache) ChannellingAPI { +func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEncoder SessionEncoder, sessionManager SessionManager, statsCounter StatsCounter, contactManager ContactManager, turnDataCreator TurnDataCreator, unicaster Unicaster) ChannellingAPI { return &channellingAPI{ config, roomStatus, @@ -60,8 +56,6 @@ func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEnco contactManager, turnDataCreator, unicaster, - broadcaster, - buddyImages, } } @@ -77,40 +71,31 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D case "Hello": //log.Println("Hello", msg.Hello, c.Index()) // TODO(longsleep): Filter room id and user agent. - api.UpdateSession(session, &SessionUpdate{Types: []string{"Ua"}, Ua: msg.Hello.Ua}) - if session.Hello && session.Roomid != msg.Hello.Id { - api.LeaveRoom(session) - api.Broadcast(session, session.DataSessionLeft("soft")) - } + session.Update(&SessionUpdate{Types: []string{"Ua"}, Ua: msg.Hello.Ua}) + room, err := session.JoinRoom(msg.Hello.Id, msg.Hello.Credentials, c) // NOTE(lcooper): Iid filtered for compatibility's sake. // Evaluate sending unconditionally when supported by all clients. - if room, err := api.JoinRoom(msg.Hello.Id, msg.Hello.Credentials, session, c); err == nil { - session.Hello = true - session.Roomid = msg.Hello.Id - if msg.Iid != "" { + if msg.Iid != "" { + if err == nil { c.Reply(msg.Iid, &DataWelcome{ Type: "Welcome", Room: room, Users: api.RoomUsers(session), }) - } - api.Broadcast(session, session.DataSessionJoined()) - } else { - session.Hello = false - if msg.Iid != "" { + } else { c.Reply(msg.Iid, err) } } case "Offer": // TODO(longsleep): Validate offer - api.Unicast(session, msg.Offer.To, msg.Offer) + session.Unicast(msg.Offer.To, msg.Offer) case "Candidate": // TODO(longsleep): Validate candidate - api.Unicast(session, msg.Candidate.To, msg.Candidate) + session.Unicast(msg.Candidate.To, msg.Candidate) case "Answer": // TODO(longsleep): Validate Answer - api.Unicast(session, msg.Answer.To, msg.Answer) + session.Unicast(msg.Answer.To, msg.Answer) case "Users": if session.Hello { sessions := &DataSessions{Type: "Users", Users: api.RoomUsers(session)} @@ -125,27 +110,27 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D if err := api.Authenticate(session, st, ""); err == nil { log.Println("Authentication success", session.Userid) api.SendSelf(c, session) - api.BroadcastSessionStatus(session) + session.BroadcastStatus() } else { log.Println("Authentication failed", err, st.Userid, st.Nonce) } case "Bye": - api.Unicast(session, msg.Bye.To, msg.Bye) + session.Unicast(msg.Bye.To, msg.Bye) case "Status": //log.Println("Status", msg.Status) - api.UpdateSession(session, &SessionUpdate{Types: []string{"Status"}, Status: msg.Status.Status}) - api.BroadcastSessionStatus(session) + session.Update(&SessionUpdate{Types: []string{"Status"}, Status: msg.Status.Status}) + session.BroadcastStatus() case "Chat": // TODO(longsleep): Limit sent chat messages per incoming connection. if !msg.Chat.Chat.NoEcho { - api.Unicast(session, session.Id, msg.Chat) + session.Unicast(session.Id, msg.Chat) } msg.Chat.Chat.Time = time.Now().Format(time.RFC3339) if msg.Chat.To == "" { // TODO(longsleep): Check if chat broadcast is allowed. if session.Hello { api.CountBroadcastChat() - api.Broadcast(session, msg.Chat) + session.Broadcast(msg.Chat) } } else { if msg.Chat.Chat.Status != nil && msg.Chat.Chat.Status.ContactRequest != nil { @@ -159,10 +144,10 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D api.CountUnicastChat() } - api.Unicast(session, msg.Chat.To, msg.Chat) + session.Unicast(msg.Chat.To, msg.Chat) if msg.Chat.Chat.Mid != "" { // Send out delivery confirmation status chat message. - api.Unicast(session, session.Id, &DataChat{To: msg.Chat.To, Type: "Chat", Chat: &DataChatMessage{Mid: msg.Chat.Chat.Mid, Status: &DataChatStatus{State: "sent"}}}) + session.Unicast(session.Id, &DataChat{To: msg.Chat.To, Type: "Chat", Chat: &DataChatMessage{Mid: msg.Chat.Chat.Mid, Status: &DataChatStatus{State: "sent"}}}) } } case "Conference": @@ -173,7 +158,7 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D // Send conference update to anyone. for _, id := range msg.Conference.Conference { if id != session.Id { - api.Unicast(session, id, msg.Conference) + session.Unicast(id, msg.Conference) } } } @@ -211,7 +196,7 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D } case "Room": if room, err := api.UpdateRoom(session, msg.Room); err == nil { - api.Broadcast(session, room) + session.Broadcast(room) c.Reply(msg.Iid, room) } else { c.Reply(msg.Iid, err) @@ -221,25 +206,6 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D } } -func (api *channellingAPI) OnDisconnect(session *Session) { - session.Disconnect() - api.LeaveRoom(session) - - dsl := session.DataSessionLeft("hard") - if session.Hello { - api.Broadcast(session, dsl) - } - - session.RunForAllSubscribers(func(session *Session) { - log.Println("Notifying subscriber that we are gone", session.Id) - api.Unicast(session, session.Id, dsl) - }) - - api.Unicaster.OnDisconnect(session) - - api.buddyImages.Delete(session.Id) -} - func (api *channellingAPI) SendSelf(c Responder, session *Session) { token, err := api.EncodeSessionToken(session) if err != nil { @@ -261,26 +227,3 @@ func (api *channellingAPI) SendSelf(c Responder, session *Session) { } c.Reply("", self) } - -func (api *channellingAPI) UpdateSession(session *Session, s *SessionUpdate) uint64 { - if s.Status != nil { - status, ok := s.Status.(map[string]interface{}) - if ok && status["buddyPicture"] != nil { - pic := status["buddyPicture"].(string) - if strings.HasPrefix(pic, "data:") { - imageId := api.buddyImages.Update(session.Id, pic[5:]) - if imageId != "" { - status["buddyPicture"] = "img:" + imageId - } - } - } - } - - return session.Update(s) -} - -func (api *channellingAPI) BroadcastSessionStatus(session *Session) { - if session.Hello { - api.Broadcast(session, session.DataSessionStatus()) - } -} diff --git a/src/app/spreed-webrtc-server/channelling_api_test.go b/src/app/spreed-webrtc-server/channelling_api_test.go index 0ee186c1..af9b0f9f 100644 --- a/src/app/spreed-webrtc-server/channelling_api_test.go +++ b/src/app/spreed-webrtc-server/channelling_api_test.go @@ -62,12 +62,12 @@ func (fake *fakeRoomManager) JoinRoom(id string, _ *DataRoomCredentials, session return &DataRoom{Name: id}, fake.joinError } -func (fake *fakeRoomManager) LeaveRoom(session *Session) { - fake.leftID = session.Roomid +func (fake *fakeRoomManager) LeaveRoom(roomID, sessionID string) { + fake.leftID = roomID } -func (fake *fakeRoomManager) Broadcast(_ *Session, msg interface{}) { - fake.broadcasts = append(fake.broadcasts, msg) +func (fake *fakeRoomManager) Broadcast(_, _ string, outgoing *DataOutgoing) { + fake.broadcasts = append(fake.broadcasts, outgoing.Data) } func (fake *fakeRoomManager) UpdateRoom(_ *Session, _ *DataRoom) (*DataRoom, error) { @@ -98,8 +98,14 @@ func assertErrorReply(t *testing.T, client *fakeClient, iid, code string) { } func NewTestChannellingAPI() (ChannellingAPI, *fakeClient, *Session, *fakeRoomManager) { - client, roomManager, session := &fakeClient{}, &fakeRoomManager{}, &Session{} - return NewChannellingAPI(nil, roomManager, nil, nil, nil, nil, nil, nil, roomManager, nil), client, session, roomManager + client, roomManager := &fakeClient{}, &fakeRoomManager{} + session := &Session{ + attestations: sessionNonces, + Broadcaster: roomManager, + RoomStatusManager: roomManager, + } + session.attestation = &SessionAttestation{s: session} + return NewChannellingAPI(nil, roomManager, nil, nil, nil, nil, nil, nil), client, session, roomManager } func Test_ChannellingAPI_OnIncoming_HelloMessage_JoinsTheSelectedRoom(t *testing.T) { diff --git a/src/app/spreed-webrtc-server/client.go b/src/app/spreed-webrtc-server/client.go index 46a57694..1a437605 100644 --- a/src/app/spreed-webrtc-server/client.go +++ b/src/app/spreed-webrtc-server/client.go @@ -42,7 +42,7 @@ type Client interface { ResponseSender Session() *Session Index() uint64 - Close(bool) + Close() } type client struct { @@ -70,10 +70,6 @@ func (client *client) OnText(b Buffer) { } } -func (client *client) OnDisconnect() { - client.ChannellingAPI.OnDisconnect(client.session) -} - func (client *client) Reply(iid string, m interface{}) { outgoing := &DataOutgoing{From: client.session.Id, Iid: iid, Data: m} if b, err := client.EncodeOutgoing(outgoing); err == nil { diff --git a/src/app/spreed-webrtc-server/connection.go b/src/app/spreed-webrtc-server/connection.go index e1d343fc..f99e405e 100644 --- a/src/app/spreed-webrtc-server/connection.go +++ b/src/app/spreed-webrtc-server/connection.go @@ -56,7 +56,7 @@ const ( type Connection interface { Index() uint64 Send(Buffer) - Close(runCallbacks bool) + Close() readPump() writePump() } @@ -65,7 +65,6 @@ type ConnectionHandler interface { NewBuffer() Buffer OnConnect(Connection) OnText(Buffer) - OnDisconnect() } type connection struct { @@ -98,15 +97,12 @@ func (c *connection) Index() uint64 { return c.Idx } -func (c *connection) Close(runCallbacks bool) { +func (c *connection) Close() { c.mutex.Lock() if c.isClosed { c.mutex.Unlock() return } - if runCallbacks { - c.handler.OnDisconnect() - } c.ws.Close() c.isClosed = true for { @@ -170,7 +166,7 @@ func (c *connection) readPump() { } } - c.Close(true) + c.Close() } // Write message to outbound queue. @@ -269,7 +265,7 @@ func (c *connection) writePump() { cleanup: //fmt.Println("writePump done") timer.Stop() - c.Close(true) + c.Close() } // Write ping message. diff --git a/src/app/spreed-webrtc-server/hub.go b/src/app/spreed-webrtc-server/hub.go index 9c1d1742..9a899f12 100644 --- a/src/app/spreed-webrtc-server/hub.go +++ b/src/app/spreed-webrtc-server/hub.go @@ -48,8 +48,8 @@ type SessionStore interface { type Unicaster interface { SessionStore OnConnect(Client, *Session) - Unicast(session *Session, to string, m interface{}) - OnDisconnect(*Session) + Unicast(to string, outgoing *DataOutgoing) + OnDisconnect(sessionID string) } type ContactManager interface { @@ -158,7 +158,7 @@ func (h *hub) OnConnect(client Client, session *Session) { // Register connection or replace existing one. if ec, ok := h.clients[session.Id]; ok { - ec.Close(false) + ec.Close() //log.Printf("Register (%d) from %s: %s (existing)\n", c.Idx, c.Id) } h.clients[session.Id] = client @@ -167,26 +167,20 @@ func (h *hub) OnConnect(client Client, session *Session) { //log.Printf("Register (%d) from %s: %s\n", c.Idx, c.Id) } -func (h *hub) OnDisconnect(session *Session) { +func (h *hub) OnDisconnect(sessionID string) { h.mutex.Lock() - delete(h.clients, session.Id) + delete(h.clients, sessionID) h.mutex.Unlock() } -func (h *hub) GetClient(id string) (client Client, ok bool) { +func (h *hub) GetClient(sessionID string) (client Client, ok bool) { h.mutex.RLock() - client, ok = h.clients[id] + client, ok = h.clients[sessionID] h.mutex.RUnlock() return } -func (h *hub) Unicast(session *Session, to string, m interface{}) { - outgoing := &DataOutgoing{ - From: session.Id, - To: to, - A: session.Attestation(), - Data: m, - } +func (h *hub) Unicast(to string, outgoing *DataOutgoing) { if message, err := h.EncodeOutgoing(outgoing); err == nil { client, ok := h.GetClient(to) if !ok { diff --git a/src/app/spreed-webrtc-server/main.go b/src/app/spreed-webrtc-server/main.go index d05af33f..2bc4e643 100644 --- a/src/app/spreed-webrtc-server/main.go +++ b/src/app/spreed-webrtc-server/main.go @@ -340,9 +340,9 @@ func runner(runtime phoenix.Runtime) error { roomManager := NewRoomManager(config, codec) hub := NewHub(config, sessionSecret, encryptionSecret, turnSecret, codec) tickets := NewTickets(sessionSecret, encryptionSecret, computedRealm) - sessionManager := NewSessionManager(config, tickets, sessionSecret) + sessionManager := NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret) statsManager := NewStatsManager(hub, roomManager, sessionManager) - channellingAPI := NewChannellingAPI(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, roomManager, buddyImages) + channellingAPI := NewChannellingAPI(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub) r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler)) r.Handle("/static/img/buddy/{flags}/{imageid}/{idx:.*}", http.StripPrefix(config.B, makeImageHandler(buddyImages, time.Duration(24)*time.Hour))) r.Handle("/static/{path:.*}", http.StripPrefix(config.B, httputils.FileStaticServer(http.Dir(rootFolder)))) diff --git a/src/app/spreed-webrtc-server/room_manager.go b/src/app/spreed-webrtc-server/room_manager.go index fdcd27ad..3de0528f 100644 --- a/src/app/spreed-webrtc-server/room_manager.go +++ b/src/app/spreed-webrtc-server/room_manager.go @@ -29,12 +29,12 @@ import ( type RoomStatusManager interface { RoomUsers(*Session) []*DataSession JoinRoom(string, *DataRoomCredentials, *Session, Sender) (*DataRoom, error) - LeaveRoom(*Session) + LeaveRoom(roomID, sessionID string) UpdateRoom(*Session, *DataRoom) (*DataRoom, error) } type Broadcaster interface { - Broadcast(*Session, interface{}) + Broadcast(sessionID, roomID string, outgoing *DataOutgoing) } type RoomStats interface { @@ -84,9 +84,9 @@ func (rooms *roomManager) JoinRoom(id string, credentials *DataRoomCredentials, return roomWorker.Join(credentials, session, sender) } -func (rooms *roomManager) LeaveRoom(session *Session) { - if room, ok := rooms.Get(session.Roomid); ok { - room.Leave(session) +func (rooms *roomManager) LeaveRoom(roomID, sessionID string) { + if room, ok := rooms.Get(roomID); ok { + room.Leave(sessionID) } } @@ -104,29 +104,22 @@ func (rooms *roomManager) UpdateRoom(session *Session, room *DataRoom) (*DataRoo return room, nil } -func (rooms *roomManager) Broadcast(session *Session, m interface{}) { - outgoing := &DataOutgoing{ - From: session.Id, - A: session.Attestation(), - Data: m, - } - +func (rooms *roomManager) Broadcast(sessionID, roomID string, outgoing *DataOutgoing) { message, err := rooms.EncodeOutgoing(outgoing) if err != nil { return } - id := session.Roomid - if id != "" && id == rooms.globalRoomID { + if roomID != "" && roomID == rooms.globalRoomID { rooms.RLock() for _, room := range rooms.roomTable { - room.Broadcast(session, message) + room.Broadcast(sessionID, message) } rooms.RUnlock() - } else if room, ok := rooms.Get(id); ok { - room.Broadcast(session, message) + } else if room, ok := rooms.Get(roomID); ok { + room.Broadcast(sessionID, message) } else { - log.Printf("No room named %s found for broadcast message %#v", id, m) + log.Printf("No room named %s found for broadcast %#v", roomID, outgoing) } message.Decref() } diff --git a/src/app/spreed-webrtc-server/roomworker.go b/src/app/spreed-webrtc-server/roomworker.go index 95ce744d..f0dea820 100644 --- a/src/app/spreed-webrtc-server/roomworker.go +++ b/src/app/spreed-webrtc-server/roomworker.go @@ -39,9 +39,9 @@ type RoomWorker interface { Users() []*roomUser Update(*DataRoom) error GetUsers() []*DataSession - Broadcast(*Session, Buffer) + Broadcast(sessionID string, buf Buffer) Join(*DataRoomCredentials, *Session, Sender) (*DataRoom, error) - Leave(*Session) + Leave(sessionID string) } type roomWorker struct { @@ -213,12 +213,12 @@ func (r *roomWorker) GetUsers() []*DataSession { return <-out } -func (r *roomWorker) Broadcast(session *Session, message Buffer) { +func (r *roomWorker) Broadcast(sessionID string, message Buffer) { worker := func() { r.mutex.RLock() for id, user := range r.users { - if id == session.Id { + if id == sessionID { // Skip broadcast to self. continue } @@ -261,10 +261,6 @@ func (r *roomWorker) Join(credentials *DataRoomCredentials, session *Session, se } } - if session.IsDisconnected() { - log.Println("Refusing to join a disconnected session to room", session.Id) - return - } r.users[session.Id] = &roomUser{session, sender} // NOTE(lcooper): Needs to be a copy, else we risk races with // a subsequent modification of room properties. @@ -277,11 +273,11 @@ func (r *roomWorker) Join(credentials *DataRoomCredentials, session *Session, se return result.DataRoom, result.error } -func (r *roomWorker) Leave(session *Session) { +func (r *roomWorker) Leave(sessionID string) { worker := func() { r.mutex.Lock() - if _, ok := r.users[session.Id]; ok { - delete(r.users, session.Id) + if _, ok := r.users[sessionID]; ok { + delete(r.users, sessionID) } r.mutex.Unlock() } diff --git a/src/app/spreed-webrtc-server/session.go b/src/app/spreed-webrtc-server/session.go index 44e30742..fece3c28 100644 --- a/src/app/spreed-webrtc-server/session.go +++ b/src/app/spreed-webrtc-server/session.go @@ -25,6 +25,7 @@ import ( "errors" "fmt" "github.com/gorilla/securecookie" + "strings" "sync" "time" ) @@ -32,6 +33,11 @@ import ( var sessionNonces *securecookie.SecureCookie type Session struct { + SessionManager + Unicaster + Broadcaster + RoomStatusManager + buddyImages ImageCache Id string Sid string Ua string @@ -52,16 +58,21 @@ type Session struct { disconnected bool } -func NewSession(attestations *securecookie.SecureCookie, id, sid string) *Session { +func NewSession(manager SessionManager, unicaster Unicaster, broadcaster Broadcaster, rooms RoomStatusManager, buddyImages ImageCache, attestations *securecookie.SecureCookie, id, sid string) *Session { session := &Session{ - Id: id, - Sid: sid, - Prio: 100, - stamp: time.Now().Unix(), - attestations: attestations, - subscriptions: make(map[string]*Session), - subscribers: make(map[string]*Session), + SessionManager: manager, + Unicaster: unicaster, + Broadcaster: broadcaster, + RoomStatusManager: rooms, + buddyImages: buddyImages, + Id: id, + Sid: sid, + Prio: 100, + stamp: time.Now().Unix(), + attestations: attestations, + subscriptions: make(map[string]*Session), + subscribers: make(map[string]*Session), } session.NewAttestation() return session @@ -104,52 +115,144 @@ func (s *Session) RemoveSubscriber(id string) { s.mutex.Unlock() } -func (s *Session) RunForAllSubscribers(f func(session *Session)) { - +func (s *Session) JoinRoom(roomID string, credentials *DataRoomCredentials, sender Sender) (*DataRoom, error) { s.mutex.Lock() - for _, session := range s.subscribers { - s.mutex.Unlock() - f(session) - s.mutex.Lock() + defer s.mutex.Unlock() + + if s.Hello && s.Roomid != roomID { + s.RoomStatusManager.LeaveRoom(s.Roomid, s.Id) + s.Broadcaster.Broadcast(s.Id, s.Roomid, &DataOutgoing{ + From: s.Id, + A: s.attestation.Token(), + Data: &DataSession{ + Type: "Left", + Id: s.Id, + Status: "soft", + }, + }) } - s.mutex.Unlock() + room, err := s.RoomStatusManager.JoinRoom(roomID, credentials, s, sender) + if err == nil { + s.Hello = true + s.Roomid = roomID + s.Broadcaster.Broadcast(s.Id, s.Roomid, &DataOutgoing{ + From: s.Id, + A: s.attestation.Token(), + Data: &DataSession{ + Type: "Joined", + Id: s.Id, + Userid: s.userid, + Ua: s.Ua, + Prio: s.Prio, + }, + }) + } else { + s.Hello = false + } + return room, err } -func (s *Session) Disconnect() { - - s.mutex.Lock() - s.disconnected = true - s.mutex.Unlock() - +func (s *Session) Broadcast(m interface{}) { + s.mutex.RLock() + if s.Hello { + s.Broadcaster.Broadcast(s.Id, s.Roomid, &DataOutgoing{ + From: s.Id, + A: s.attestation.Token(), + Data: m, + }) + } + s.mutex.RUnlock() } -func (s *Session) IsDisconnected() bool { +func (s *Session) BroadcastStatus() { + s.mutex.RLock() + if s.Hello { + s.Broadcaster.Broadcast(s.Id, s.Roomid, &DataOutgoing{ + From: s.Id, + A: s.attestation.Token(), + Data: &DataSession{ + Type: "Status", + Id: s.Id, + Userid: s.userid, + Status: s.Status, + Rev: s.UpdateRev, + Prio: s.Prio, + }, + }) + } + s.mutex.RUnlock() +} +func (s *Session) Unicast(to string, m interface{}) { s.mutex.RLock() - defer s.mutex.RUnlock() - return s.disconnected + outgoing := &DataOutgoing{ + From: s.Id, + To: to, + A: s.attestation.Token(), + Data: m, + } + s.mutex.RUnlock() + + s.Unicaster.Unicast(to, outgoing) } func (s *Session) Close() { - s.mutex.Lock() - // Remove foreign references. + + outgoing := &DataOutgoing{ + From: s.Id, + A: s.attestation.Token(), + Data: &DataSession{ + Type: "Left", + Id: s.Id, + Status: "hard", + }, + } + + if s.Hello { + // NOTE(lcooper): If we don't check for Hello here, we could deadlock + // when implicitly creating a room while a user is reconnecting. + s.Broadcaster.Broadcast(s.Id, s.Roomid, outgoing) + s.RoomStatusManager.LeaveRoom(s.Roomid, s.Id) + } + + for _, session := range s.subscribers { + s.Unicaster.Unicast(session.Id, outgoing) + } + for _, session := range s.subscriptions { session.RemoveSubscriber(s.Id) } - // Remove session cross references. + + s.Unicaster.OnDisconnect(s.Id) + s.SessionManager.DestroySession(s.Id, s.userid) + s.buddyImages.Delete(s.Id) + s.subscriptions = make(map[string]*Session) s.subscribers = make(map[string]*Session) - s.mutex.Unlock() + s.disconnected = true + s.mutex.Unlock() } func (s *Session) Update(update *SessionUpdate) uint64 { - s.mutex.Lock() defer s.mutex.Unlock() + if update.Status != nil { + status, ok := update.Status.(map[string]interface{}) + if ok && status["buddyPicture"] != nil { + pic := status["buddyPicture"].(string) + if strings.HasPrefix(pic, "data:") { + imageId := s.buddyImages.Update(s.Id, pic[5:]) + if imageId != "" { + status["buddyPicture"] = "img:" + imageId + } + } + } + } + for _, key := range update.Types { //fmt.Println("type update", key) @@ -268,50 +371,6 @@ func (s *Session) SetUseridFake(userid string) { } -func (s *Session) DataSessionLeft(state string) *DataSession { - - s.mutex.RLock() - defer s.mutex.RUnlock() - - return &DataSession{ - Type: "Left", - Id: s.Id, - Status: state, - } - -} - -func (s *Session) DataSessionJoined() *DataSession { - - s.mutex.RLock() - defer s.mutex.RUnlock() - - return &DataSession{ - Type: "Joined", - Id: s.Id, - Userid: s.userid, - Ua: s.Ua, - Prio: s.Prio, - } - -} - -func (s *Session) DataSessionStatus() *DataSession { - - s.mutex.RLock() - defer s.mutex.RUnlock() - - return &DataSession{ - Type: "Status", - Id: s.Id, - Userid: s.userid, - Status: s.Status, - Rev: s.UpdateRev, - Prio: s.Prio, - } - -} - func (s *Session) NewAttestation() { s.attestation = &SessionAttestation{ s: s, @@ -319,13 +378,6 @@ func (s *Session) NewAttestation() { s.attestation.Update() } -func (s *Session) Attestation() (attestation string) { - s.mutex.RLock() - attestation = s.attestation.Token() - s.mutex.RUnlock() - return -} - func (s *Session) UpdateAttestation() { s.mutex.Lock() s.attestation.Update() diff --git a/src/app/spreed-webrtc-server/session_manager.go b/src/app/spreed-webrtc-server/session_manager.go index 28019253..43360f7d 100644 --- a/src/app/spreed-webrtc-server/session_manager.go +++ b/src/app/spreed-webrtc-server/session_manager.go @@ -37,14 +37,18 @@ type SessionManager interface { UserStats RetrieveUsersWith(func(*http.Request) (string, error)) CreateSession(*http.Request) *Session - DestroySession(*Session) + DestroySession(sessionID, userID string) Authenticate(*Session, *SessionToken, string) error GetUserSessions(session *Session, id string) []*DataSession } type sessionManager struct { - Tickets sync.RWMutex + Tickets + Unicaster + Broadcaster + RoomStatusManager + buddyImages ImageCache config *Config userTable map[string]*User fakesessionTable map[string]*Session @@ -52,10 +56,14 @@ type sessionManager struct { attestations *securecookie.SecureCookie } -func NewSessionManager(config *Config, tickets Tickets, sessionSecret []byte) SessionManager { +func NewSessionManager(config *Config, tickets Tickets, unicaster Unicaster, broadcaster Broadcaster, rooms RoomStatusManager, buddyImages ImageCache, sessionSecret []byte) SessionManager { sessionManager := &sessionManager{ - tickets, sync.RWMutex{}, + tickets, + unicaster, + broadcaster, + rooms, + buddyImages, config, make(map[string]*User), make(map[string]*Session), @@ -103,7 +111,7 @@ func (sessionManager *sessionManager) CreateSession(request *http.Request) *Sess } } - session := NewSession(sessionManager.attestations, st.Id, st.Sid) + session := NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid) if userid != "" { // XXX(lcooper): Should errors be handled here? @@ -113,15 +121,15 @@ func (sessionManager *sessionManager) CreateSession(request *http.Request) *Sess return session } -func (sessionManager *sessionManager) DestroySession(session *Session) { - session.Close() +func (sessionManager *sessionManager) DestroySession(sessionID, userID string) { + if userID == "" { + return + } sessionManager.Lock() - if suserid := session.Userid(); suserid != "" { - user, ok := sessionManager.userTable[suserid] - if ok && user.RemoveSession(session) { - delete(sessionManager.userTable, suserid) - } + user, ok := sessionManager.userTable[userID] + if ok && user.RemoveSession(sessionID) { + delete(sessionManager.userTable, userID) } sessionManager.Unlock() } @@ -158,7 +166,7 @@ func (sessionManager *sessionManager) GetUserSessions(session *Session, userid s session, ok := sessionManager.fakesessionTable[userid] if !ok { st := sessionManager.FakeSessionToken(userid) - session = NewSession(sessionManager.attestations, st.Id, st.Sid) + session = NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid) session.SetUseridFake(st.Userid) sessionManager.fakesessionTable[userid] = session } diff --git a/src/app/spreed-webrtc-server/user.go b/src/app/spreed-webrtc-server/user.go index 49c651b6..fa838f05 100644 --- a/src/app/spreed-webrtc-server/user.go +++ b/src/app/spreed-webrtc-server/user.go @@ -57,10 +57,10 @@ func (u *User) AddSession(s *Session) bool { } // Return true if no session left. -func (u *User) RemoveSession(s *Session) bool { +func (u *User) RemoveSession(sessionID string) bool { last := false u.mutex.Lock() - delete(u.sessionTable, s.Id) + delete(u.sessionTable, sessionID) if len(u.sessionTable) == 0 { log.Println("Last session unregistered for user", u.Id) last = true diff --git a/src/app/spreed-webrtc-server/ws.go b/src/app/spreed-webrtc-server/ws.go index 9bba4bfc..3cd7fb1e 100644 --- a/src/app/spreed-webrtc-server/ws.go +++ b/src/app/spreed-webrtc-server/ws.go @@ -69,7 +69,7 @@ func makeWSHandler(connectionCounter ConnectionCounter, sessionManager SessionMa // Create a new connection instance. session := sessionManager.CreateSession(r) - defer sessionManager.DestroySession(session) + defer session.Close() client := NewClient(codec, channellingAPI, session) conn := NewConnection(connectionCounter.CountConnection(), ws, client)