Browse Source

Merge pull request #159 from deathwish/fix_ghost_sessions

Move all session cleanup code to a single code path.
pull/160/head
Simon Eisenmann 11 years ago
parent
commit
0572733def
  1. 91
      src/app/spreed-webrtc-server/channelling_api.go
  2. 18
      src/app/spreed-webrtc-server/channelling_api_test.go
  3. 6
      src/app/spreed-webrtc-server/client.go
  4. 12
      src/app/spreed-webrtc-server/connection.go
  5. 22
      src/app/spreed-webrtc-server/hub.go
  6. 4
      src/app/spreed-webrtc-server/main.go
  7. 29
      src/app/spreed-webrtc-server/room_manager.go
  8. 18
      src/app/spreed-webrtc-server/roomworker.go
  9. 198
      src/app/spreed-webrtc-server/session.go
  10. 34
      src/app/spreed-webrtc-server/session_manager.go
  11. 4
      src/app/spreed-webrtc-server/user.go
  12. 2
      src/app/spreed-webrtc-server/ws.go

91
src/app/spreed-webrtc-server/channelling_api.go

@ -23,7 +23,6 @@ package main
import ( import (
"log" "log"
"strings"
"time" "time"
) )
@ -34,7 +33,6 @@ const (
type ChannellingAPI interface { type ChannellingAPI interface {
OnConnect(Client, *Session) OnConnect(Client, *Session)
OnIncoming(ResponseSender, *Session, *DataIncoming) OnIncoming(ResponseSender, *Session, *DataIncoming)
OnDisconnect(*Session)
} }
type channellingAPI struct { type channellingAPI struct {
@ -46,11 +44,9 @@ type channellingAPI struct {
ContactManager ContactManager
TurnDataCreator TurnDataCreator
Unicaster Unicaster
Broadcaster
buddyImages ImageCache
} }
func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEncoder SessionEncoder, sessionManager SessionManager, statsCounter StatsCounter, contactManager ContactManager, turnDataCreator TurnDataCreator, unicaster Unicaster, broadcaster Broadcaster, buddyImages ImageCache) ChannellingAPI { func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEncoder SessionEncoder, sessionManager SessionManager, statsCounter StatsCounter, contactManager ContactManager, turnDataCreator TurnDataCreator, unicaster Unicaster) ChannellingAPI {
return &channellingAPI{ return &channellingAPI{
config, config,
roomStatus, roomStatus,
@ -60,8 +56,6 @@ func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEnco
contactManager, contactManager,
turnDataCreator, turnDataCreator,
unicaster, unicaster,
broadcaster,
buddyImages,
} }
} }
@ -77,40 +71,31 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D
case "Hello": case "Hello":
//log.Println("Hello", msg.Hello, c.Index()) //log.Println("Hello", msg.Hello, c.Index())
// TODO(longsleep): Filter room id and user agent. // TODO(longsleep): Filter room id and user agent.
api.UpdateSession(session, &SessionUpdate{Types: []string{"Ua"}, Ua: msg.Hello.Ua}) session.Update(&SessionUpdate{Types: []string{"Ua"}, Ua: msg.Hello.Ua})
if session.Hello && session.Roomid != msg.Hello.Id {
api.LeaveRoom(session)
api.Broadcast(session, session.DataSessionLeft("soft"))
}
room, err := session.JoinRoom(msg.Hello.Id, msg.Hello.Credentials, c)
// NOTE(lcooper): Iid filtered for compatibility's sake. // NOTE(lcooper): Iid filtered for compatibility's sake.
// Evaluate sending unconditionally when supported by all clients. // Evaluate sending unconditionally when supported by all clients.
if room, err := api.JoinRoom(msg.Hello.Id, msg.Hello.Credentials, session, c); err == nil {
session.Hello = true
session.Roomid = msg.Hello.Id
if msg.Iid != "" { if msg.Iid != "" {
if err == nil {
c.Reply(msg.Iid, &DataWelcome{ c.Reply(msg.Iid, &DataWelcome{
Type: "Welcome", Type: "Welcome",
Room: room, Room: room,
Users: api.RoomUsers(session), Users: api.RoomUsers(session),
}) })
}
api.Broadcast(session, session.DataSessionJoined())
} else { } else {
session.Hello = false
if msg.Iid != "" {
c.Reply(msg.Iid, err) c.Reply(msg.Iid, err)
} }
} }
case "Offer": case "Offer":
// TODO(longsleep): Validate offer // TODO(longsleep): Validate offer
api.Unicast(session, msg.Offer.To, msg.Offer) session.Unicast(msg.Offer.To, msg.Offer)
case "Candidate": case "Candidate":
// TODO(longsleep): Validate candidate // TODO(longsleep): Validate candidate
api.Unicast(session, msg.Candidate.To, msg.Candidate) session.Unicast(msg.Candidate.To, msg.Candidate)
case "Answer": case "Answer":
// TODO(longsleep): Validate Answer // TODO(longsleep): Validate Answer
api.Unicast(session, msg.Answer.To, msg.Answer) session.Unicast(msg.Answer.To, msg.Answer)
case "Users": case "Users":
if session.Hello { if session.Hello {
sessions := &DataSessions{Type: "Users", Users: api.RoomUsers(session)} sessions := &DataSessions{Type: "Users", Users: api.RoomUsers(session)}
@ -125,27 +110,27 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D
if err := api.Authenticate(session, st, ""); err == nil { if err := api.Authenticate(session, st, ""); err == nil {
log.Println("Authentication success", session.Userid) log.Println("Authentication success", session.Userid)
api.SendSelf(c, session) api.SendSelf(c, session)
api.BroadcastSessionStatus(session) session.BroadcastStatus()
} else { } else {
log.Println("Authentication failed", err, st.Userid, st.Nonce) log.Println("Authentication failed", err, st.Userid, st.Nonce)
} }
case "Bye": case "Bye":
api.Unicast(session, msg.Bye.To, msg.Bye) session.Unicast(msg.Bye.To, msg.Bye)
case "Status": case "Status":
//log.Println("Status", msg.Status) //log.Println("Status", msg.Status)
api.UpdateSession(session, &SessionUpdate{Types: []string{"Status"}, Status: msg.Status.Status}) session.Update(&SessionUpdate{Types: []string{"Status"}, Status: msg.Status.Status})
api.BroadcastSessionStatus(session) session.BroadcastStatus()
case "Chat": case "Chat":
// TODO(longsleep): Limit sent chat messages per incoming connection. // TODO(longsleep): Limit sent chat messages per incoming connection.
if !msg.Chat.Chat.NoEcho { if !msg.Chat.Chat.NoEcho {
api.Unicast(session, session.Id, msg.Chat) session.Unicast(session.Id, msg.Chat)
} }
msg.Chat.Chat.Time = time.Now().Format(time.RFC3339) msg.Chat.Chat.Time = time.Now().Format(time.RFC3339)
if msg.Chat.To == "" { if msg.Chat.To == "" {
// TODO(longsleep): Check if chat broadcast is allowed. // TODO(longsleep): Check if chat broadcast is allowed.
if session.Hello { if session.Hello {
api.CountBroadcastChat() api.CountBroadcastChat()
api.Broadcast(session, msg.Chat) session.Broadcast(msg.Chat)
} }
} else { } else {
if msg.Chat.Chat.Status != nil && msg.Chat.Chat.Status.ContactRequest != nil { if msg.Chat.Chat.Status != nil && msg.Chat.Chat.Status.ContactRequest != nil {
@ -159,10 +144,10 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D
api.CountUnicastChat() api.CountUnicastChat()
} }
api.Unicast(session, msg.Chat.To, msg.Chat) session.Unicast(msg.Chat.To, msg.Chat)
if msg.Chat.Chat.Mid != "" { if msg.Chat.Chat.Mid != "" {
// Send out delivery confirmation status chat message. // Send out delivery confirmation status chat message.
api.Unicast(session, session.Id, &DataChat{To: msg.Chat.To, Type: "Chat", Chat: &DataChatMessage{Mid: msg.Chat.Chat.Mid, Status: &DataChatStatus{State: "sent"}}}) session.Unicast(session.Id, &DataChat{To: msg.Chat.To, Type: "Chat", Chat: &DataChatMessage{Mid: msg.Chat.Chat.Mid, Status: &DataChatStatus{State: "sent"}}})
} }
} }
case "Conference": case "Conference":
@ -173,7 +158,7 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D
// Send conference update to anyone. // Send conference update to anyone.
for _, id := range msg.Conference.Conference { for _, id := range msg.Conference.Conference {
if id != session.Id { if id != session.Id {
api.Unicast(session, id, msg.Conference) session.Unicast(id, msg.Conference)
} }
} }
} }
@ -211,7 +196,7 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D
} }
case "Room": case "Room":
if room, err := api.UpdateRoom(session, msg.Room); err == nil { if room, err := api.UpdateRoom(session, msg.Room); err == nil {
api.Broadcast(session, room) session.Broadcast(room)
c.Reply(msg.Iid, room) c.Reply(msg.Iid, room)
} else { } else {
c.Reply(msg.Iid, err) c.Reply(msg.Iid, err)
@ -221,25 +206,6 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D
} }
} }
func (api *channellingAPI) OnDisconnect(session *Session) {
session.Disconnect()
api.LeaveRoom(session)
dsl := session.DataSessionLeft("hard")
if session.Hello {
api.Broadcast(session, dsl)
}
session.RunForAllSubscribers(func(session *Session) {
log.Println("Notifying subscriber that we are gone", session.Id)
api.Unicast(session, session.Id, dsl)
})
api.Unicaster.OnDisconnect(session)
api.buddyImages.Delete(session.Id)
}
func (api *channellingAPI) SendSelf(c Responder, session *Session) { func (api *channellingAPI) SendSelf(c Responder, session *Session) {
token, err := api.EncodeSessionToken(session) token, err := api.EncodeSessionToken(session)
if err != nil { if err != nil {
@ -261,26 +227,3 @@ func (api *channellingAPI) SendSelf(c Responder, session *Session) {
} }
c.Reply("", self) c.Reply("", self)
} }
func (api *channellingAPI) UpdateSession(session *Session, s *SessionUpdate) uint64 {
if s.Status != nil {
status, ok := s.Status.(map[string]interface{})
if ok && status["buddyPicture"] != nil {
pic := status["buddyPicture"].(string)
if strings.HasPrefix(pic, "data:") {
imageId := api.buddyImages.Update(session.Id, pic[5:])
if imageId != "" {
status["buddyPicture"] = "img:" + imageId
}
}
}
}
return session.Update(s)
}
func (api *channellingAPI) BroadcastSessionStatus(session *Session) {
if session.Hello {
api.Broadcast(session, session.DataSessionStatus())
}
}

18
src/app/spreed-webrtc-server/channelling_api_test.go

@ -62,12 +62,12 @@ func (fake *fakeRoomManager) JoinRoom(id string, _ *DataRoomCredentials, session
return &DataRoom{Name: id}, fake.joinError return &DataRoom{Name: id}, fake.joinError
} }
func (fake *fakeRoomManager) LeaveRoom(session *Session) { func (fake *fakeRoomManager) LeaveRoom(roomID, sessionID string) {
fake.leftID = session.Roomid fake.leftID = roomID
} }
func (fake *fakeRoomManager) Broadcast(_ *Session, msg interface{}) { func (fake *fakeRoomManager) Broadcast(_, _ string, outgoing *DataOutgoing) {
fake.broadcasts = append(fake.broadcasts, msg) fake.broadcasts = append(fake.broadcasts, outgoing.Data)
} }
func (fake *fakeRoomManager) UpdateRoom(_ *Session, _ *DataRoom) (*DataRoom, error) { func (fake *fakeRoomManager) UpdateRoom(_ *Session, _ *DataRoom) (*DataRoom, error) {
@ -98,8 +98,14 @@ func assertErrorReply(t *testing.T, client *fakeClient, iid, code string) {
} }
func NewTestChannellingAPI() (ChannellingAPI, *fakeClient, *Session, *fakeRoomManager) { func NewTestChannellingAPI() (ChannellingAPI, *fakeClient, *Session, *fakeRoomManager) {
client, roomManager, session := &fakeClient{}, &fakeRoomManager{}, &Session{} client, roomManager := &fakeClient{}, &fakeRoomManager{}
return NewChannellingAPI(nil, roomManager, nil, nil, nil, nil, nil, nil, roomManager, nil), client, session, roomManager session := &Session{
attestations: sessionNonces,
Broadcaster: roomManager,
RoomStatusManager: roomManager,
}
session.attestation = &SessionAttestation{s: session}
return NewChannellingAPI(nil, roomManager, nil, nil, nil, nil, nil, nil), client, session, roomManager
} }
func Test_ChannellingAPI_OnIncoming_HelloMessage_JoinsTheSelectedRoom(t *testing.T) { func Test_ChannellingAPI_OnIncoming_HelloMessage_JoinsTheSelectedRoom(t *testing.T) {

6
src/app/spreed-webrtc-server/client.go

@ -42,7 +42,7 @@ type Client interface {
ResponseSender ResponseSender
Session() *Session Session() *Session
Index() uint64 Index() uint64
Close(bool) Close()
} }
type client struct { type client struct {
@ -70,10 +70,6 @@ func (client *client) OnText(b Buffer) {
} }
} }
func (client *client) OnDisconnect() {
client.ChannellingAPI.OnDisconnect(client.session)
}
func (client *client) Reply(iid string, m interface{}) { func (client *client) Reply(iid string, m interface{}) {
outgoing := &DataOutgoing{From: client.session.Id, Iid: iid, Data: m} outgoing := &DataOutgoing{From: client.session.Id, Iid: iid, Data: m}
if b, err := client.EncodeOutgoing(outgoing); err == nil { if b, err := client.EncodeOutgoing(outgoing); err == nil {

12
src/app/spreed-webrtc-server/connection.go

@ -56,7 +56,7 @@ const (
type Connection interface { type Connection interface {
Index() uint64 Index() uint64
Send(Buffer) Send(Buffer)
Close(runCallbacks bool) Close()
readPump() readPump()
writePump() writePump()
} }
@ -65,7 +65,6 @@ type ConnectionHandler interface {
NewBuffer() Buffer NewBuffer() Buffer
OnConnect(Connection) OnConnect(Connection)
OnText(Buffer) OnText(Buffer)
OnDisconnect()
} }
type connection struct { type connection struct {
@ -98,15 +97,12 @@ func (c *connection) Index() uint64 {
return c.Idx return c.Idx
} }
func (c *connection) Close(runCallbacks bool) { func (c *connection) Close() {
c.mutex.Lock() c.mutex.Lock()
if c.isClosed { if c.isClosed {
c.mutex.Unlock() c.mutex.Unlock()
return return
} }
if runCallbacks {
c.handler.OnDisconnect()
}
c.ws.Close() c.ws.Close()
c.isClosed = true c.isClosed = true
for { for {
@ -170,7 +166,7 @@ func (c *connection) readPump() {
} }
} }
c.Close(true) c.Close()
} }
// Write message to outbound queue. // Write message to outbound queue.
@ -269,7 +265,7 @@ func (c *connection) writePump() {
cleanup: cleanup:
//fmt.Println("writePump done") //fmt.Println("writePump done")
timer.Stop() timer.Stop()
c.Close(true) c.Close()
} }
// Write ping message. // Write ping message.

22
src/app/spreed-webrtc-server/hub.go

@ -48,8 +48,8 @@ type SessionStore interface {
type Unicaster interface { type Unicaster interface {
SessionStore SessionStore
OnConnect(Client, *Session) OnConnect(Client, *Session)
Unicast(session *Session, to string, m interface{}) Unicast(to string, outgoing *DataOutgoing)
OnDisconnect(*Session) OnDisconnect(sessionID string)
} }
type ContactManager interface { type ContactManager interface {
@ -158,7 +158,7 @@ func (h *hub) OnConnect(client Client, session *Session) {
// Register connection or replace existing one. // Register connection or replace existing one.
if ec, ok := h.clients[session.Id]; ok { if ec, ok := h.clients[session.Id]; ok {
ec.Close(false) ec.Close()
//log.Printf("Register (%d) from %s: %s (existing)\n", c.Idx, c.Id) //log.Printf("Register (%d) from %s: %s (existing)\n", c.Idx, c.Id)
} }
h.clients[session.Id] = client h.clients[session.Id] = client
@ -167,26 +167,20 @@ func (h *hub) OnConnect(client Client, session *Session) {
//log.Printf("Register (%d) from %s: %s\n", c.Idx, c.Id) //log.Printf("Register (%d) from %s: %s\n", c.Idx, c.Id)
} }
func (h *hub) OnDisconnect(session *Session) { func (h *hub) OnDisconnect(sessionID string) {
h.mutex.Lock() h.mutex.Lock()
delete(h.clients, session.Id) delete(h.clients, sessionID)
h.mutex.Unlock() h.mutex.Unlock()
} }
func (h *hub) GetClient(id string) (client Client, ok bool) { func (h *hub) GetClient(sessionID string) (client Client, ok bool) {
h.mutex.RLock() h.mutex.RLock()
client, ok = h.clients[id] client, ok = h.clients[sessionID]
h.mutex.RUnlock() h.mutex.RUnlock()
return return
} }
func (h *hub) Unicast(session *Session, to string, m interface{}) { func (h *hub) Unicast(to string, outgoing *DataOutgoing) {
outgoing := &DataOutgoing{
From: session.Id,
To: to,
A: session.Attestation(),
Data: m,
}
if message, err := h.EncodeOutgoing(outgoing); err == nil { if message, err := h.EncodeOutgoing(outgoing); err == nil {
client, ok := h.GetClient(to) client, ok := h.GetClient(to)
if !ok { if !ok {

4
src/app/spreed-webrtc-server/main.go

@ -340,9 +340,9 @@ func runner(runtime phoenix.Runtime) error {
roomManager := NewRoomManager(config, codec) roomManager := NewRoomManager(config, codec)
hub := NewHub(config, sessionSecret, encryptionSecret, turnSecret, codec) hub := NewHub(config, sessionSecret, encryptionSecret, turnSecret, codec)
tickets := NewTickets(sessionSecret, encryptionSecret, computedRealm) tickets := NewTickets(sessionSecret, encryptionSecret, computedRealm)
sessionManager := NewSessionManager(config, tickets, sessionSecret) sessionManager := NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret)
statsManager := NewStatsManager(hub, roomManager, sessionManager) statsManager := NewStatsManager(hub, roomManager, sessionManager)
channellingAPI := NewChannellingAPI(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, roomManager, buddyImages) channellingAPI := NewChannellingAPI(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub)
r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler)) r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler))
r.Handle("/static/img/buddy/{flags}/{imageid}/{idx:.*}", http.StripPrefix(config.B, makeImageHandler(buddyImages, time.Duration(24)*time.Hour))) r.Handle("/static/img/buddy/{flags}/{imageid}/{idx:.*}", http.StripPrefix(config.B, makeImageHandler(buddyImages, time.Duration(24)*time.Hour)))
r.Handle("/static/{path:.*}", http.StripPrefix(config.B, httputils.FileStaticServer(http.Dir(rootFolder)))) r.Handle("/static/{path:.*}", http.StripPrefix(config.B, httputils.FileStaticServer(http.Dir(rootFolder))))

29
src/app/spreed-webrtc-server/room_manager.go

@ -29,12 +29,12 @@ import (
type RoomStatusManager interface { type RoomStatusManager interface {
RoomUsers(*Session) []*DataSession RoomUsers(*Session) []*DataSession
JoinRoom(string, *DataRoomCredentials, *Session, Sender) (*DataRoom, error) JoinRoom(string, *DataRoomCredentials, *Session, Sender) (*DataRoom, error)
LeaveRoom(*Session) LeaveRoom(roomID, sessionID string)
UpdateRoom(*Session, *DataRoom) (*DataRoom, error) UpdateRoom(*Session, *DataRoom) (*DataRoom, error)
} }
type Broadcaster interface { type Broadcaster interface {
Broadcast(*Session, interface{}) Broadcast(sessionID, roomID string, outgoing *DataOutgoing)
} }
type RoomStats interface { type RoomStats interface {
@ -84,9 +84,9 @@ func (rooms *roomManager) JoinRoom(id string, credentials *DataRoomCredentials,
return roomWorker.Join(credentials, session, sender) return roomWorker.Join(credentials, session, sender)
} }
func (rooms *roomManager) LeaveRoom(session *Session) { func (rooms *roomManager) LeaveRoom(roomID, sessionID string) {
if room, ok := rooms.Get(session.Roomid); ok { if room, ok := rooms.Get(roomID); ok {
room.Leave(session) room.Leave(sessionID)
} }
} }
@ -104,29 +104,22 @@ func (rooms *roomManager) UpdateRoom(session *Session, room *DataRoom) (*DataRoo
return room, nil return room, nil
} }
func (rooms *roomManager) Broadcast(session *Session, m interface{}) { func (rooms *roomManager) Broadcast(sessionID, roomID string, outgoing *DataOutgoing) {
outgoing := &DataOutgoing{
From: session.Id,
A: session.Attestation(),
Data: m,
}
message, err := rooms.EncodeOutgoing(outgoing) message, err := rooms.EncodeOutgoing(outgoing)
if err != nil { if err != nil {
return return
} }
id := session.Roomid if roomID != "" && roomID == rooms.globalRoomID {
if id != "" && id == rooms.globalRoomID {
rooms.RLock() rooms.RLock()
for _, room := range rooms.roomTable { for _, room := range rooms.roomTable {
room.Broadcast(session, message) room.Broadcast(sessionID, message)
} }
rooms.RUnlock() rooms.RUnlock()
} else if room, ok := rooms.Get(id); ok { } else if room, ok := rooms.Get(roomID); ok {
room.Broadcast(session, message) room.Broadcast(sessionID, message)
} else { } else {
log.Printf("No room named %s found for broadcast message %#v", id, m) log.Printf("No room named %s found for broadcast %#v", roomID, outgoing)
} }
message.Decref() message.Decref()
} }

18
src/app/spreed-webrtc-server/roomworker.go

@ -39,9 +39,9 @@ type RoomWorker interface {
Users() []*roomUser Users() []*roomUser
Update(*DataRoom) error Update(*DataRoom) error
GetUsers() []*DataSession GetUsers() []*DataSession
Broadcast(*Session, Buffer) Broadcast(sessionID string, buf Buffer)
Join(*DataRoomCredentials, *Session, Sender) (*DataRoom, error) Join(*DataRoomCredentials, *Session, Sender) (*DataRoom, error)
Leave(*Session) Leave(sessionID string)
} }
type roomWorker struct { type roomWorker struct {
@ -213,12 +213,12 @@ func (r *roomWorker) GetUsers() []*DataSession {
return <-out return <-out
} }
func (r *roomWorker) Broadcast(session *Session, message Buffer) { func (r *roomWorker) Broadcast(sessionID string, message Buffer) {
worker := func() { worker := func() {
r.mutex.RLock() r.mutex.RLock()
for id, user := range r.users { for id, user := range r.users {
if id == session.Id { if id == sessionID {
// Skip broadcast to self. // Skip broadcast to self.
continue continue
} }
@ -261,10 +261,6 @@ func (r *roomWorker) Join(credentials *DataRoomCredentials, session *Session, se
} }
} }
if session.IsDisconnected() {
log.Println("Refusing to join a disconnected session to room", session.Id)
return
}
r.users[session.Id] = &roomUser{session, sender} r.users[session.Id] = &roomUser{session, sender}
// NOTE(lcooper): Needs to be a copy, else we risk races with // NOTE(lcooper): Needs to be a copy, else we risk races with
// a subsequent modification of room properties. // a subsequent modification of room properties.
@ -277,11 +273,11 @@ func (r *roomWorker) Join(credentials *DataRoomCredentials, session *Session, se
return result.DataRoom, result.error return result.DataRoom, result.error
} }
func (r *roomWorker) Leave(session *Session) { func (r *roomWorker) Leave(sessionID string) {
worker := func() { worker := func() {
r.mutex.Lock() r.mutex.Lock()
if _, ok := r.users[session.Id]; ok { if _, ok := r.users[sessionID]; ok {
delete(r.users, session.Id) delete(r.users, sessionID)
} }
r.mutex.Unlock() r.mutex.Unlock()
} }

198
src/app/spreed-webrtc-server/session.go

@ -25,6 +25,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"github.com/gorilla/securecookie" "github.com/gorilla/securecookie"
"strings"
"sync" "sync"
"time" "time"
) )
@ -32,6 +33,11 @@ import (
var sessionNonces *securecookie.SecureCookie var sessionNonces *securecookie.SecureCookie
type Session struct { type Session struct {
SessionManager
Unicaster
Broadcaster
RoomStatusManager
buddyImages ImageCache
Id string Id string
Sid string Sid string
Ua string Ua string
@ -52,9 +58,14 @@ type Session struct {
disconnected bool disconnected bool
} }
func NewSession(attestations *securecookie.SecureCookie, id, sid string) *Session { func NewSession(manager SessionManager, unicaster Unicaster, broadcaster Broadcaster, rooms RoomStatusManager, buddyImages ImageCache, attestations *securecookie.SecureCookie, id, sid string) *Session {
session := &Session{ session := &Session{
SessionManager: manager,
Unicaster: unicaster,
Broadcaster: broadcaster,
RoomStatusManager: rooms,
buddyImages: buddyImages,
Id: id, Id: id,
Sid: sid, Sid: sid,
Prio: 100, Prio: 100,
@ -104,52 +115,144 @@ func (s *Session) RemoveSubscriber(id string) {
s.mutex.Unlock() s.mutex.Unlock()
} }
func (s *Session) RunForAllSubscribers(f func(session *Session)) { func (s *Session) JoinRoom(roomID string, credentials *DataRoomCredentials, sender Sender) (*DataRoom, error) {
s.mutex.Lock()
for _, session := range s.subscribers {
s.mutex.Unlock()
f(session)
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock()
if s.Hello && s.Roomid != roomID {
s.RoomStatusManager.LeaveRoom(s.Roomid, s.Id)
s.Broadcaster.Broadcast(s.Id, s.Roomid, &DataOutgoing{
From: s.Id,
A: s.attestation.Token(),
Data: &DataSession{
Type: "Left",
Id: s.Id,
Status: "soft",
},
})
} }
s.mutex.Unlock()
room, err := s.RoomStatusManager.JoinRoom(roomID, credentials, s, sender)
if err == nil {
s.Hello = true
s.Roomid = roomID
s.Broadcaster.Broadcast(s.Id, s.Roomid, &DataOutgoing{
From: s.Id,
A: s.attestation.Token(),
Data: &DataSession{
Type: "Joined",
Id: s.Id,
Userid: s.userid,
Ua: s.Ua,
Prio: s.Prio,
},
})
} else {
s.Hello = false
}
return room, err
} }
func (s *Session) Disconnect() { func (s *Session) Broadcast(m interface{}) {
s.mutex.RLock()
s.mutex.Lock() if s.Hello {
s.disconnected = true s.Broadcaster.Broadcast(s.Id, s.Roomid, &DataOutgoing{
s.mutex.Unlock() From: s.Id,
A: s.attestation.Token(),
Data: m,
})
}
s.mutex.RUnlock()
} }
func (s *Session) IsDisconnected() bool { func (s *Session) BroadcastStatus() {
s.mutex.RLock()
if s.Hello {
s.Broadcaster.Broadcast(s.Id, s.Roomid, &DataOutgoing{
From: s.Id,
A: s.attestation.Token(),
Data: &DataSession{
Type: "Status",
Id: s.Id,
Userid: s.userid,
Status: s.Status,
Rev: s.UpdateRev,
Prio: s.Prio,
},
})
}
s.mutex.RUnlock()
}
func (s *Session) Unicast(to string, m interface{}) {
s.mutex.RLock() s.mutex.RLock()
defer s.mutex.RUnlock() outgoing := &DataOutgoing{
return s.disconnected From: s.Id,
To: to,
A: s.attestation.Token(),
Data: m,
}
s.mutex.RUnlock()
s.Unicaster.Unicast(to, outgoing)
} }
func (s *Session) Close() { func (s *Session) Close() {
s.mutex.Lock() s.mutex.Lock()
// Remove foreign references.
outgoing := &DataOutgoing{
From: s.Id,
A: s.attestation.Token(),
Data: &DataSession{
Type: "Left",
Id: s.Id,
Status: "hard",
},
}
if s.Hello {
// NOTE(lcooper): If we don't check for Hello here, we could deadlock
// when implicitly creating a room while a user is reconnecting.
s.Broadcaster.Broadcast(s.Id, s.Roomid, outgoing)
s.RoomStatusManager.LeaveRoom(s.Roomid, s.Id)
}
for _, session := range s.subscribers {
s.Unicaster.Unicast(session.Id, outgoing)
}
for _, session := range s.subscriptions { for _, session := range s.subscriptions {
session.RemoveSubscriber(s.Id) session.RemoveSubscriber(s.Id)
} }
// Remove session cross references.
s.Unicaster.OnDisconnect(s.Id)
s.SessionManager.DestroySession(s.Id, s.userid)
s.buddyImages.Delete(s.Id)
s.subscriptions = make(map[string]*Session) s.subscriptions = make(map[string]*Session)
s.subscribers = make(map[string]*Session) s.subscribers = make(map[string]*Session)
s.mutex.Unlock() s.disconnected = true
s.mutex.Unlock()
} }
func (s *Session) Update(update *SessionUpdate) uint64 { func (s *Session) Update(update *SessionUpdate) uint64 {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
if update.Status != nil {
status, ok := update.Status.(map[string]interface{})
if ok && status["buddyPicture"] != nil {
pic := status["buddyPicture"].(string)
if strings.HasPrefix(pic, "data:") {
imageId := s.buddyImages.Update(s.Id, pic[5:])
if imageId != "" {
status["buddyPicture"] = "img:" + imageId
}
}
}
}
for _, key := range update.Types { for _, key := range update.Types {
//fmt.Println("type update", key) //fmt.Println("type update", key)
@ -268,50 +371,6 @@ func (s *Session) SetUseridFake(userid string) {
} }
func (s *Session) DataSessionLeft(state string) *DataSession {
s.mutex.RLock()
defer s.mutex.RUnlock()
return &DataSession{
Type: "Left",
Id: s.Id,
Status: state,
}
}
func (s *Session) DataSessionJoined() *DataSession {
s.mutex.RLock()
defer s.mutex.RUnlock()
return &DataSession{
Type: "Joined",
Id: s.Id,
Userid: s.userid,
Ua: s.Ua,
Prio: s.Prio,
}
}
func (s *Session) DataSessionStatus() *DataSession {
s.mutex.RLock()
defer s.mutex.RUnlock()
return &DataSession{
Type: "Status",
Id: s.Id,
Userid: s.userid,
Status: s.Status,
Rev: s.UpdateRev,
Prio: s.Prio,
}
}
func (s *Session) NewAttestation() { func (s *Session) NewAttestation() {
s.attestation = &SessionAttestation{ s.attestation = &SessionAttestation{
s: s, s: s,
@ -319,13 +378,6 @@ func (s *Session) NewAttestation() {
s.attestation.Update() s.attestation.Update()
} }
func (s *Session) Attestation() (attestation string) {
s.mutex.RLock()
attestation = s.attestation.Token()
s.mutex.RUnlock()
return
}
func (s *Session) UpdateAttestation() { func (s *Session) UpdateAttestation() {
s.mutex.Lock() s.mutex.Lock()
s.attestation.Update() s.attestation.Update()

34
src/app/spreed-webrtc-server/session_manager.go

@ -37,14 +37,18 @@ type SessionManager interface {
UserStats UserStats
RetrieveUsersWith(func(*http.Request) (string, error)) RetrieveUsersWith(func(*http.Request) (string, error))
CreateSession(*http.Request) *Session CreateSession(*http.Request) *Session
DestroySession(*Session) DestroySession(sessionID, userID string)
Authenticate(*Session, *SessionToken, string) error Authenticate(*Session, *SessionToken, string) error
GetUserSessions(session *Session, id string) []*DataSession GetUserSessions(session *Session, id string) []*DataSession
} }
type sessionManager struct { type sessionManager struct {
Tickets
sync.RWMutex sync.RWMutex
Tickets
Unicaster
Broadcaster
RoomStatusManager
buddyImages ImageCache
config *Config config *Config
userTable map[string]*User userTable map[string]*User
fakesessionTable map[string]*Session fakesessionTable map[string]*Session
@ -52,10 +56,14 @@ type sessionManager struct {
attestations *securecookie.SecureCookie attestations *securecookie.SecureCookie
} }
func NewSessionManager(config *Config, tickets Tickets, sessionSecret []byte) SessionManager { func NewSessionManager(config *Config, tickets Tickets, unicaster Unicaster, broadcaster Broadcaster, rooms RoomStatusManager, buddyImages ImageCache, sessionSecret []byte) SessionManager {
sessionManager := &sessionManager{ sessionManager := &sessionManager{
tickets,
sync.RWMutex{}, sync.RWMutex{},
tickets,
unicaster,
broadcaster,
rooms,
buddyImages,
config, config,
make(map[string]*User), make(map[string]*User),
make(map[string]*Session), make(map[string]*Session),
@ -103,7 +111,7 @@ func (sessionManager *sessionManager) CreateSession(request *http.Request) *Sess
} }
} }
session := NewSession(sessionManager.attestations, st.Id, st.Sid) session := NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid)
if userid != "" { if userid != "" {
// XXX(lcooper): Should errors be handled here? // XXX(lcooper): Should errors be handled here?
@ -113,15 +121,15 @@ func (sessionManager *sessionManager) CreateSession(request *http.Request) *Sess
return session return session
} }
func (sessionManager *sessionManager) DestroySession(session *Session) { func (sessionManager *sessionManager) DestroySession(sessionID, userID string) {
session.Close() if userID == "" {
return
}
sessionManager.Lock() sessionManager.Lock()
if suserid := session.Userid(); suserid != "" { user, ok := sessionManager.userTable[userID]
user, ok := sessionManager.userTable[suserid] if ok && user.RemoveSession(sessionID) {
if ok && user.RemoveSession(session) { delete(sessionManager.userTable, userID)
delete(sessionManager.userTable, suserid)
}
} }
sessionManager.Unlock() sessionManager.Unlock()
} }
@ -158,7 +166,7 @@ func (sessionManager *sessionManager) GetUserSessions(session *Session, userid s
session, ok := sessionManager.fakesessionTable[userid] session, ok := sessionManager.fakesessionTable[userid]
if !ok { if !ok {
st := sessionManager.FakeSessionToken(userid) st := sessionManager.FakeSessionToken(userid)
session = NewSession(sessionManager.attestations, st.Id, st.Sid) session = NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid)
session.SetUseridFake(st.Userid) session.SetUseridFake(st.Userid)
sessionManager.fakesessionTable[userid] = session sessionManager.fakesessionTable[userid] = session
} }

4
src/app/spreed-webrtc-server/user.go

@ -57,10 +57,10 @@ func (u *User) AddSession(s *Session) bool {
} }
// Return true if no session left. // Return true if no session left.
func (u *User) RemoveSession(s *Session) bool { func (u *User) RemoveSession(sessionID string) bool {
last := false last := false
u.mutex.Lock() u.mutex.Lock()
delete(u.sessionTable, s.Id) delete(u.sessionTable, sessionID)
if len(u.sessionTable) == 0 { if len(u.sessionTable) == 0 {
log.Println("Last session unregistered for user", u.Id) log.Println("Last session unregistered for user", u.Id)
last = true last = true

2
src/app/spreed-webrtc-server/ws.go

@ -69,7 +69,7 @@ func makeWSHandler(connectionCounter ConnectionCounter, sessionManager SessionMa
// Create a new connection instance. // Create a new connection instance.
session := sessionManager.CreateSession(r) session := sessionManager.CreateSession(r)
defer sessionManager.DestroySession(session) defer session.Close()
client := NewClient(codec, channellingAPI, session) client := NewClient(codec, channellingAPI, session)
conn := NewConnection(connectionCounter.CountConnection(), ws, client) conn := NewConnection(connectionCounter.CountConnection(), ws, client)

Loading…
Cancel
Save