Browse Source

Implemented session subscription.

pull/68/head
Simon Eisenmann 11 years ago
parent
commit
0424635e15
  1. 3
      src/app/spreed-webrtc-server/connection.go
  2. 13
      src/app/spreed-webrtc-server/hub.go
  3. 12
      src/app/spreed-webrtc-server/server.go
  4. 100
      src/app/spreed-webrtc-server/session.go
  5. 4
      src/app/spreed-webrtc-server/user.go
  6. 2
      static/js/mediastream/api.js
  7. 31
      static/js/services/buddylist.js
  8. 6
      static/js/services/buddysession.js

3
src/app/spreed-webrtc-server/connection.go

@ -65,6 +65,7 @@ type Connection struct { @@ -65,6 +65,7 @@ type Connection struct {
queue list.List
mutex sync.Mutex
isClosed bool
isClosing bool
// Metadata.
Id string
@ -93,6 +94,7 @@ func (c *Connection) close() { @@ -93,6 +94,7 @@ func (c *Connection) close() {
if !c.isClosed {
c.ws.Close()
c.Session.Close()
c.mutex.Lock()
c.Session = nil
c.isClosed = true
@ -132,6 +134,7 @@ func (c *Connection) reregister(token string) error { @@ -132,6 +134,7 @@ func (c *Connection) reregister(token string) error {
}
func (c *Connection) unregister() {
c.isClosing = true
c.h.unregisterHandler(c)
}

13
src/app/spreed-webrtc-server/hub.go

@ -376,23 +376,20 @@ func (h *Hub) unregisterHandler(c *Connection) { @@ -376,23 +376,20 @@ func (h *Hub) unregisterHandler(c *Connection) {
h.mutex.Unlock()
return
}
session := c.Session
suserid := session.Userid()
suserid := c.Session.Userid()
delete(h.connectionTable, c.Id)
delete(h.sessionTable, c.Id)
if session != nil && suserid != "" {
if suserid != "" {
user, ok := h.userTable[suserid]
if ok {
empty := user.RemoveSession(session)
empty := user.RemoveSession(c.Session)
if empty {
delete(h.userTable, suserid)
}
}
}
h.mutex.Unlock()
if session != nil {
h.buddyImages.Delete(session.Id)
}
h.buddyImages.Delete(c.Id)
//log.Printf("Unregister (%d) from %s: %s\n", c.Idx, c.RemoteAddr, c.Id)
h.server.OnUnregister(c)
c.close()
@ -459,7 +456,7 @@ func (h *Hub) sessionsHandler(c *Connection, srq *DataSessionsRequest, iid strin @@ -459,7 +456,7 @@ func (h *Hub) sessionsHandler(c *Connection, srq *DataSessionsRequest, iid strin
return
}
// Add sessions for forein user.
users = user.SessionsData()
users = user.SubscribeSessions(c.Session)
case "session":
id, err := c.Session.attestation.Decode(srq.Token)
if err != nil {

12
src/app/spreed-webrtc-server/server.go

@ -58,12 +58,15 @@ func (s *Server) OnRegister(c *Connection) { @@ -58,12 +58,15 @@ func (s *Server) OnRegister(c *Connection) {
func (s *Server) OnUnregister(c *Connection) {
//log.Println("OnUnregister", c.id)
dsl := c.Session.DataSessionLeft("hard")
if c.Hello {
s.UpdateRoomConnection(c, &RoomConnectionUpdate{Id: c.Roomid})
s.Broadcast(c, c.Session.DataSessionLeft("hard"))
} else {
//log.Println("Ingoring OnUnregister because of no Hello", c.Idx)
s.Broadcast(c, dsl)
}
c.Session.RunForAllSubscribers(func(session *Session) {
log.Println("Notifying subscriber that we are gone", c.Id, session.Id)
s.Unicast(c, session.Id, dsl)
})
}
func (s *Server) OnText(c *Connection, b Buffer) {
@ -179,7 +182,7 @@ func (s *Server) OnText(c *Connection, b Buffer) { @@ -179,7 +182,7 @@ func (s *Server) OnText(c *Connection, b Buffer) {
func (s *Server) Unicast(c *Connection, to string, m interface{}) {
outgoing := &DataOutgoing{From: c.Id, To: to, Data: m}
if c.Id != to {
if !c.isClosing && c.Id != to {
outgoing.A = c.Session.Attestation()
}
b := c.h.buffers.New()
@ -190,6 +193,7 @@ func (s *Server) Unicast(c *Connection, to string, m interface{}) { @@ -190,6 +193,7 @@ func (s *Server) Unicast(c *Connection, to string, m interface{}) {
log.Println("Unicast error while encoding JSON", err)
return
}
log.Println("Unicast", b)
var msg = &MessageRequest{From: c.Id, To: to, Message: b}
c.h.unicastHandler(msg)

100
src/app/spreed-webrtc-server/session.go

@ -32,34 +32,100 @@ import ( @@ -32,34 +32,100 @@ import (
var sessionNonces *securecookie.SecureCookie
type Session struct {
Id string
Sid string
Ua string
UpdateRev uint64
Status interface{}
Nonce string
Prio int
mutex sync.RWMutex
userid string
stamp int64
attestation *SessionAttestation
h *Hub
Id string
Sid string
Ua string
UpdateRev uint64
Status interface{}
Nonce string
Prio int
mutex sync.RWMutex
userid string
stamp int64
attestation *SessionAttestation
subscriptions map[string]*Session
subscribers map[string]*Session
h *Hub
}
func NewSession(h *Hub, id, sid string) *Session {
session := &Session{
Id: id,
Sid: sid,
Prio: 100,
stamp: time.Now().Unix(),
h: h,
Id: id,
Sid: sid,
Prio: 100,
stamp: time.Now().Unix(),
subscriptions: make(map[string]*Session),
subscribers: make(map[string]*Session),
h: h,
}
session.NewAttestation()
return session
}
func (s *Session) Subscribe(session *Session) {
s.mutex.Lock()
s.subscriptions[session.Id] = session
s.mutex.Unlock()
session.AddSubscriber(s)
}
func (s *Session) Unsubscribe(id string) {
s.mutex.Lock()
if session, ok := s.subscriptions[id]; ok {
delete(s.subscriptions, id)
s.mutex.Unlock()
session.RemoveSubscriber(id)
} else {
s.mutex.Unlock()
}
}
func (s *Session) AddSubscriber(session *Session) {
s.mutex.Lock()
s.subscribers[session.Id] = session
s.mutex.Unlock()
}
func (s *Session) RemoveSubscriber(id string) {
s.mutex.Lock()
if _, ok := s.subscribers[id]; ok {
delete(s.subscribers, id)
}
s.mutex.Unlock()
}
func (s *Session) RunForAllSubscribers(f func(session *Session)) {
s.mutex.Lock()
for _, session := range s.subscribers {
s.mutex.Unlock()
f(session)
s.mutex.Lock()
}
s.mutex.Unlock()
}
func (s *Session) Close() {
s.mutex.Lock()
// Remove foreign references.
for _, session := range s.subscriptions {
session.RemoveSubscriber(s.Id)
}
// Remove session cross references.
s.subscriptions = make(map[string]*Session)
s.subscribers = make(map[string]*Session)
s.mutex.Unlock()
}
func (s *Session) Update(update *SessionUpdate) uint64 {
s.mutex.Lock()

4
src/app/spreed-webrtc-server/user.go

@ -78,12 +78,14 @@ func (u *User) Data() *DataUser { @@ -78,12 +78,14 @@ func (u *User) Data() *DataUser {
}
}
func (u *User) SessionsData() []*DataSession {
func (u *User) SubscribeSessions(from *Session) []*DataSession {
sessions := make([]*DataSession, 0, len(u.sessionTable))
u.mutex.RLock()
defer u.mutex.RUnlock()
for _, session := range u.sessionTable {
// TODO(longsleep): This does lots of locks - check if these can be streamlined.
from.Subscribe(session)
sessions = append(sessions, session.Data())
}
sort.Sort(ByPrioAndStamp(sessions))

2
static/js/mediastream/api.js

@ -169,7 +169,7 @@ define(['jquery', 'underscore'], function($, _) { @@ -169,7 +169,7 @@ define(['jquery', 'underscore'], function($, _) {
break;
case "Joined":
case "Left":
//console.log("User action received", dataType, data);
console.log("User action received", dataType, data);
this.e.triggerHandler("received.userleftorjoined", [dataType, data]);
break;
case "Status":

31
static/js/services/buddylist.js

@ -557,9 +557,6 @@ define(['underscore', 'modernizr', 'avltree', 'text!partials/buddy.html', 'text! @@ -557,9 +557,6 @@ define(['underscore', 'modernizr', 'avltree', 'text!partials/buddy.html', 'text!
//console.warn("Trying to remove buddy with no registered scope", session);
return;
}
if (buddyCount > 0) {
buddyCount--;
}
// Remove current id from tree.
this.tree.remove(id);
buddyData.del(id);
@ -576,6 +573,9 @@ define(['underscore', 'modernizr', 'avltree', 'text!partials/buddy.html', 'text! @@ -576,6 +573,9 @@ define(['underscore', 'modernizr', 'avltree', 'text!partials/buddy.html', 'text!
}
delete this.actionElements[id];
scope.$destroy();
if (buddyCount > 0) {
buddyCount--;
}
} else {
// Update display stuff if a session is left. This can
// return no session in case when we got this as contact.
@ -703,10 +703,29 @@ define(['underscore', 'modernizr', 'avltree', 'text!partials/buddy.html', 'text! @@ -703,10 +703,29 @@ define(['underscore', 'modernizr', 'avltree', 'text!partials/buddy.html', 'text!
if (contact && contact.Token) {
mediaStream.api.sendSessions(contact.Token, "contact", function(event, type, data) {
//console.log("oooooooooooooooo", type, data);
var tmpSessionData = null;
if (data.Users && data.Users.length > 0) {
var s = data.Users[0];
buddyData.set(s.Id, scope);
deferred.resolve(s.Id);
/*
_.each(data.Users, function(s) {
buddyData.set(s.Id, scope);
// NOTE(longsleep): Not sure if its a good idea to add the retrieved sessions here.
session.add(s.Id, s);
});
sessionData = session.get();
deferred.resolve(sessionData.Id);
*/
tmpSessionData = data.Users[0];
}
// Check if we got a session in the meantime.
sessionData = session.get();
if (!sessionData && tmpSessionData) {
// Use temporary session as received.
buddyData.set(tmpSessionData.Id, scope);
sessionData = tmpSessionData;
}
if (sessionData) {
// Resolve with whatever we found.
deferred.resolve(sessionData.Id);
}
});
}

6
static/js/services/buddysession.js

@ -60,8 +60,10 @@ define(["underscore"], function(_) { @@ -60,8 +60,10 @@ define(["underscore"], function(_) {
};
BuddySession.prototype.rm = function(id) {
delete this.sessions[id];
this.count--;
if (this.sessions.hasOwnProperty(id)) {
delete this.sessions[id];
this.count--;
}
delete sessions[id];
};

Loading…
Cancel
Save