diff --git a/src/app/spreed-webrtc-server/client.go b/src/app/spreed-webrtc-server/client.go index c9f3bb41..b858176b 100644 --- a/src/app/spreed-webrtc-server/client.go +++ b/src/app/spreed-webrtc-server/client.go @@ -43,7 +43,7 @@ type Client interface { Session() *Session Index() uint64 Close() - ReplaceAndClose() + ReplaceAndClose(Client) } type client struct { @@ -87,9 +87,15 @@ func (client *client) Session() *Session { return client.session } -func (client *client) ReplaceAndClose() { - client.session.Close() - if client.Connection != nil { - client.Connection.Close() - } +func (client *client) ReplaceAndClose(oldClient Client) { + oldSession := oldClient.Session() + client.session.Replace(oldSession) + go func() { + // Close old session and client in another go routine, + // to avoid blocking the new client if the old one hangs or + // whatever. + log.Printf("Closing obsolete client %d (replaced with %d) with id %s\n", oldClient.Index(), client.Index(), oldSession.Id) + oldSession.Close() + oldClient.Close() + }() } diff --git a/src/app/spreed-webrtc-server/hub.go b/src/app/spreed-webrtc-server/hub.go index 963c74b9..6a7dcd26 100644 --- a/src/app/spreed-webrtc-server/hub.go +++ b/src/app/spreed-webrtc-server/hub.go @@ -154,12 +154,8 @@ func (h *hub) OnConnect(client Client, session *Session) { log.Printf("Created client %d with id %s\n", client.Index(), session.Id) // Register connection or replace existing one. if ec, ok := h.clients[session.Id]; ok { - // Clean up old client at the end and make sure to run this in another go routine, - // to avoid blocking the new client if the old one hangs or whatever. - go func() { - log.Printf("Closing obsolete client %d (replaced with %d) with id %s\n", ec.Index(), client.Index(), session.Id) - ec.ReplaceAndClose() - }() + // Clean up old client at the end outside the hub lock. + defer client.ReplaceAndClose(ec) } h.clients[session.Id] = client h.mutex.Unlock() diff --git a/src/app/spreed-webrtc-server/session.go b/src/app/spreed-webrtc-server/session.go index 2adcc0e2..a60a9347 100644 --- a/src/app/spreed-webrtc-server/session.go +++ b/src/app/spreed-webrtc-server/session.go @@ -56,6 +56,7 @@ type Session struct { subscriptions map[string]*Session subscribers map[string]*Session disconnected bool + replaced bool } func NewSession(manager SessionManager, unicaster Unicaster, broadcaster Broadcaster, rooms RoomStatusManager, buddyImages ImageCache, attestations *securecookie.SecureCookie, id, sid string) *Session { @@ -209,33 +210,38 @@ func (s *Session) Close() { return } - outgoing := &DataOutgoing{ - From: s.Id, - A: s.attestation.Token(), - Data: &DataSession{ - Type: "Left", - Id: s.Id, - Status: "hard", - }, - } + // TODO(longsleep): Verify that it is ok to not do all this when replaced is true. + if !s.replaced { - if s.Hello { - // NOTE(lcooper): If we don't check for Hello here, we could deadlock - // when implicitly creating a room while a user is reconnecting. - s.Broadcaster.Broadcast(s.Id, s.Roomid, outgoing) - s.RoomStatusManager.LeaveRoom(s.Roomid, s.Id) - } + outgoing := &DataOutgoing{ + From: s.Id, + A: s.attestation.Token(), + Data: &DataSession{ + Type: "Left", + Id: s.Id, + Status: "hard", + }, + } - for _, session := range s.subscribers { - s.Unicaster.Unicast(session.Id, outgoing) - } + if s.Hello { + // NOTE(lcooper): If we don't check for Hello here, we could deadlock + // when implicitly creating a room while a user is reconnecting. + s.Broadcaster.Broadcast(s.Id, s.Roomid, outgoing) + s.RoomStatusManager.LeaveRoom(s.Roomid, s.Id) + } - for _, session := range s.subscriptions { - session.RemoveSubscriber(s.Id) - } + for _, session := range s.subscribers { + s.Unicaster.Unicast(session.Id, outgoing) + } + + for _, session := range s.subscriptions { + session.RemoveSubscriber(s.Id) + } - s.SessionManager.DestroySession(s.Id, s.userid) - s.buddyImages.Delete(s.Id) + s.SessionManager.DestroySession(s.Id, s.userid) + s.buddyImages.Delete(s.Id) + + } s.subscriptions = make(map[string]*Session) s.subscribers = make(map[string]*Session) @@ -244,6 +250,27 @@ func (s *Session) Close() { s.mutex.Unlock() } +func (s *Session) Replace(oldSession *Session) { + + oldSession.mutex.Lock() + if oldSession.disconnected { + oldSession.mutex.Unlock() + return + } + + s.mutex.Lock() + + s.subscriptions = oldSession.subscriptions + s.subscribers = oldSession.subscribers + + s.mutex.Unlock() + + // Mark old session as replaced. + oldSession.replaced = true + oldSession.mutex.Unlock() + +} + func (s *Session) Update(update *SessionUpdate) uint64 { s.mutex.Lock() defer s.mutex.Unlock()