Browse Source

Fixed a problem where replaced sessions got cleaned up i a previous connection was still closing.

pull/168/head
Simon Eisenmann 11 years ago
parent
commit
02a6ae2a2b
  1. 5
      src/app/spreed-webrtc-server/channelling_api.go
  2. 23
      src/app/spreed-webrtc-server/client.go
  3. 2
      src/app/spreed-webrtc-server/connection.go
  4. 13
      src/app/spreed-webrtc-server/hub.go
  5. 5
      src/app/spreed-webrtc-server/session.go
  6. 1
      src/app/spreed-webrtc-server/ws.go

5
src/app/spreed-webrtc-server/channelling_api.go

@ -32,6 +32,7 @@ const (
type ChannellingAPI interface { type ChannellingAPI interface {
OnConnect(Client, *Session) OnConnect(Client, *Session)
OnDisconnect(string)
OnIncoming(ResponseSender, *Session, *DataIncoming) OnIncoming(ResponseSender, *Session, *DataIncoming)
} }
@ -64,6 +65,10 @@ func (api *channellingAPI) OnConnect(client Client, session *Session) {
api.SendSelf(client, session) api.SendSelf(client, session)
} }
func (api *channellingAPI) OnDisconnect(sessionID string) {
api.Unicaster.OnDisconnect(sessionID)
}
func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *DataIncoming) { func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *DataIncoming) {
switch msg.Type { switch msg.Type {
case "Self": case "Self":

23
src/app/spreed-webrtc-server/client.go

@ -43,17 +43,19 @@ type Client interface {
Session() *Session Session() *Session
Index() uint64 Index() uint64
Close() Close()
ReplaceAndClose()
} }
type client struct { type client struct {
Codec Codec
ChannellingAPI ChannellingAPI
Connection Connection
session *Session session *Session
replaced bool
} }
func NewClient(codec Codec, api ChannellingAPI, session *Session) *client { func NewClient(codec Codec, api ChannellingAPI, session *Session) *client {
return &client{codec, api, nil, session} return &client{codec, api, nil, session, false}
} }
func (client *client) OnConnect(conn Connection) { func (client *client) OnConnect(conn Connection) {
@ -61,6 +63,15 @@ func (client *client) OnConnect(conn Connection) {
client.ChannellingAPI.OnConnect(client, client.session) client.ChannellingAPI.OnConnect(client, client.session)
} }
func (client *client) OnDisconnect() {
client.session.Close()
if client.replaced {
log.Printf("Not cleaning up session %s as client %d was replaced\n", client.session.Id, client.Index())
return
}
client.ChannellingAPI.OnDisconnect(client.session.Id)
}
func (client *client) OnText(b Buffer) { func (client *client) OnText(b Buffer) {
if incoming, err := client.DecodeIncoming(b); err == nil { if incoming, err := client.DecodeIncoming(b); err == nil {
client.OnIncoming(client, client.session, incoming) client.OnIncoming(client, client.session, incoming)
@ -81,3 +92,11 @@ func (client *client) Reply(iid string, m interface{}) {
func (client *client) Session() *Session { func (client *client) Session() *Session {
return client.session return client.session
} }
func (client *client) ReplaceAndClose() {
client.replaced = true
client.session.Close()
if client.Connection != nil {
client.Connection.Close()
}
}

2
src/app/spreed-webrtc-server/connection.go

@ -64,6 +64,7 @@ type Connection interface {
type ConnectionHandler interface { type ConnectionHandler interface {
NewBuffer() Buffer NewBuffer() Buffer
OnConnect(Connection) OnConnect(Connection)
OnDisconnect()
OnText(Buffer) OnText(Buffer)
} }
@ -167,6 +168,7 @@ func (c *connection) readPump() {
} }
c.Close() c.Close()
c.handler.OnDisconnect()
} }
// Write message to outbound queue. // Write message to outbound queue.

13
src/app/spreed-webrtc-server/hub.go

@ -150,24 +150,19 @@ func (h *hub) GetSession(id string) (session *Session, ok bool) {
} }
func (h *hub) OnConnect(client Client, session *Session) { func (h *hub) OnConnect(client Client, session *Session) {
// Set flags.
h.mutex.Lock() h.mutex.Lock()
log.Printf("Created client %d with id %s\n", client.Index(), session.Id)
log.Printf("Created client with id %s", session.Id)
// Register connection or replace existing one. // Register connection or replace existing one.
if ec, ok := h.clients[session.Id]; ok { if ec, ok := h.clients[session.Id]; ok {
ec.Close() log.Printf("Closing obsolete client %d with id %s\n", ec.Index(), session.Id)
//log.Printf("Register (%d) from %s: %s (existing)\n", c.Idx, c.Id) ec.ReplaceAndClose()
} }
h.clients[session.Id] = client h.clients[session.Id] = client
//fmt.Println("registered", c.Id)
h.mutex.Unlock() h.mutex.Unlock()
//log.Printf("Register (%d) from %s: %s\n", c.Idx, c.Id)
} }
func (h *hub) OnDisconnect(sessionID string) { func (h *hub) OnDisconnect(sessionID string) {
log.Printf("Cleaning up session id %s\n", sessionID)
h.mutex.Lock() h.mutex.Lock()
delete(h.clients, sessionID) delete(h.clients, sessionID)
h.mutex.Unlock() h.mutex.Unlock()

5
src/app/spreed-webrtc-server/session.go

@ -204,6 +204,10 @@ func (s *Session) Unicast(to string, m interface{}) {
func (s *Session) Close() { func (s *Session) Close() {
s.mutex.Lock() s.mutex.Lock()
if s.disconnected {
s.mutex.Unlock()
return
}
outgoing := &DataOutgoing{ outgoing := &DataOutgoing{
From: s.Id, From: s.Id,
@ -230,7 +234,6 @@ func (s *Session) Close() {
session.RemoveSubscriber(s.Id) session.RemoveSubscriber(s.Id)
} }
s.Unicaster.OnDisconnect(s.Id)
s.SessionManager.DestroySession(s.Id, s.userid) s.SessionManager.DestroySession(s.Id, s.userid)
s.buddyImages.Delete(s.Id) s.buddyImages.Delete(s.Id)

1
src/app/spreed-webrtc-server/ws.go

@ -69,7 +69,6 @@ func makeWSHandler(connectionCounter ConnectionCounter, sessionManager SessionMa
// Create a new connection instance. // Create a new connection instance.
session := sessionManager.CreateSession(r) session := sessionManager.CreateSession(r)
defer session.Close()
client := NewClient(codec, channellingAPI, session) client := NewClient(codec, channellingAPI, session)
conn := NewConnection(connectionCounter.CountConnection(), ws, client) conn := NewConnection(connectionCounter.CountConnection(), ws, client)

Loading…
Cancel
Save