From 02a6ae2a2bfd057d5fe04a5a9bd5e5fb17c90942 Mon Sep 17 00:00:00 2001 From: Simon Eisenmann Date: Tue, 10 Feb 2015 18:57:34 +0100 Subject: [PATCH] Fixed a problem where replaced sessions got cleaned up i a previous connection was still closing. --- .../spreed-webrtc-server/channelling_api.go | 5 ++++ src/app/spreed-webrtc-server/client.go | 23 +++++++++++++++++-- src/app/spreed-webrtc-server/connection.go | 2 ++ src/app/spreed-webrtc-server/hub.go | 13 ++++------- src/app/spreed-webrtc-server/session.go | 5 +++- src/app/spreed-webrtc-server/ws.go | 1 - 6 files changed, 36 insertions(+), 13 deletions(-) diff --git a/src/app/spreed-webrtc-server/channelling_api.go b/src/app/spreed-webrtc-server/channelling_api.go index 96ea1db8..400c5d14 100644 --- a/src/app/spreed-webrtc-server/channelling_api.go +++ b/src/app/spreed-webrtc-server/channelling_api.go @@ -32,6 +32,7 @@ const ( type ChannellingAPI interface { OnConnect(Client, *Session) + OnDisconnect(string) OnIncoming(ResponseSender, *Session, *DataIncoming) } @@ -64,6 +65,10 @@ func (api *channellingAPI) OnConnect(client Client, session *Session) { api.SendSelf(client, session) } +func (api *channellingAPI) OnDisconnect(sessionID string) { + api.Unicaster.OnDisconnect(sessionID) +} + func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *DataIncoming) { switch msg.Type { case "Self": diff --git a/src/app/spreed-webrtc-server/client.go b/src/app/spreed-webrtc-server/client.go index 1a437605..f069dc90 100644 --- a/src/app/spreed-webrtc-server/client.go +++ b/src/app/spreed-webrtc-server/client.go @@ -43,17 +43,19 @@ type Client interface { Session() *Session Index() uint64 Close() + ReplaceAndClose() } type client struct { Codec ChannellingAPI Connection - session *Session + session *Session + replaced bool } func NewClient(codec Codec, api ChannellingAPI, session *Session) *client { - return &client{codec, api, nil, session} + return &client{codec, api, nil, session, false} } func (client *client) OnConnect(conn Connection) { @@ -61,6 +63,15 @@ func (client *client) OnConnect(conn Connection) { client.ChannellingAPI.OnConnect(client, client.session) } +func (client *client) OnDisconnect() { + client.session.Close() + if client.replaced { + log.Printf("Not cleaning up session %s as client %d was replaced\n", client.session.Id, client.Index()) + return + } + client.ChannellingAPI.OnDisconnect(client.session.Id) +} + func (client *client) OnText(b Buffer) { if incoming, err := client.DecodeIncoming(b); err == nil { client.OnIncoming(client, client.session, incoming) @@ -81,3 +92,11 @@ func (client *client) Reply(iid string, m interface{}) { func (client *client) Session() *Session { return client.session } + +func (client *client) ReplaceAndClose() { + client.replaced = true + client.session.Close() + if client.Connection != nil { + client.Connection.Close() + } +} diff --git a/src/app/spreed-webrtc-server/connection.go b/src/app/spreed-webrtc-server/connection.go index f99e405e..6ba7b456 100644 --- a/src/app/spreed-webrtc-server/connection.go +++ b/src/app/spreed-webrtc-server/connection.go @@ -64,6 +64,7 @@ type Connection interface { type ConnectionHandler interface { NewBuffer() Buffer OnConnect(Connection) + OnDisconnect() OnText(Buffer) } @@ -167,6 +168,7 @@ func (c *connection) readPump() { } c.Close() + c.handler.OnDisconnect() } // Write message to outbound queue. diff --git a/src/app/spreed-webrtc-server/hub.go b/src/app/spreed-webrtc-server/hub.go index 9a899f12..74d04860 100644 --- a/src/app/spreed-webrtc-server/hub.go +++ b/src/app/spreed-webrtc-server/hub.go @@ -150,24 +150,19 @@ func (h *hub) GetSession(id string) (session *Session, ok bool) { } func (h *hub) OnConnect(client Client, session *Session) { - // Set flags. - h.mutex.Lock() - - log.Printf("Created client with id %s", session.Id) - + log.Printf("Created client %d with id %s\n", client.Index(), session.Id) // Register connection or replace existing one. if ec, ok := h.clients[session.Id]; ok { - ec.Close() - //log.Printf("Register (%d) from %s: %s (existing)\n", c.Idx, c.Id) + log.Printf("Closing obsolete client %d with id %s\n", ec.Index(), session.Id) + ec.ReplaceAndClose() } h.clients[session.Id] = client - //fmt.Println("registered", c.Id) h.mutex.Unlock() - //log.Printf("Register (%d) from %s: %s\n", c.Idx, c.Id) } func (h *hub) OnDisconnect(sessionID string) { + log.Printf("Cleaning up session id %s\n", sessionID) h.mutex.Lock() delete(h.clients, sessionID) h.mutex.Unlock() diff --git a/src/app/spreed-webrtc-server/session.go b/src/app/spreed-webrtc-server/session.go index 2a040511..2adcc0e2 100644 --- a/src/app/spreed-webrtc-server/session.go +++ b/src/app/spreed-webrtc-server/session.go @@ -204,6 +204,10 @@ func (s *Session) Unicast(to string, m interface{}) { func (s *Session) Close() { s.mutex.Lock() + if s.disconnected { + s.mutex.Unlock() + return + } outgoing := &DataOutgoing{ From: s.Id, @@ -230,7 +234,6 @@ func (s *Session) Close() { session.RemoveSubscriber(s.Id) } - s.Unicaster.OnDisconnect(s.Id) s.SessionManager.DestroySession(s.Id, s.userid) s.buddyImages.Delete(s.Id) diff --git a/src/app/spreed-webrtc-server/ws.go b/src/app/spreed-webrtc-server/ws.go index 3cd7fb1e..70ad83fd 100644 --- a/src/app/spreed-webrtc-server/ws.go +++ b/src/app/spreed-webrtc-server/ws.go @@ -69,7 +69,6 @@ func makeWSHandler(connectionCounter ConnectionCounter, sessionManager SessionMa // Create a new connection instance. session := sessionManager.CreateSession(r) - defer session.Close() client := NewClient(codec, channellingAPI, session) conn := NewConnection(connectionCounter.CountConnection(), ws, client)