Browse Source

Implemented server sessions lookup.

pull/48/head
Simon Eisenmann 11 years ago committed by Simon Eisenmann
parent
commit
19cc75c0fe
  1. 26
      doc/CHANNELING-API.txt
  2. 12
      src/app/spreed-webrtc-server/channeling.go
  3. 93
      src/app/spreed-webrtc-server/hub.go
  4. 12
      src/app/spreed-webrtc-server/server.go
  5. 34
      src/app/spreed-webrtc-server/session.go
  6. 90
      src/app/spreed-webrtc-server/user.go

26
doc/CHANNELING-API.txt

@ -392,23 +392,35 @@ Information retrieval
"Sessions": { "Sessions": {
"Id": "Client generated request ID", "Id": "Client generated request ID",
"Token": "Request token", "Token": "Request token",
"TokenType": "Token type" "Type": "Token type"
} }
} }
Valid known token types are: "contact", "token". Valid known token types are: "contact".
Sesions (Response with Id, Token and Type from request and Sessions (Response with Id, Token and Type from request and
populated Session list). populated Session list).
{ {
"Type": "Sessions", "Type": "Sessions",
"Sessions": { "Sessions": {
"Id": "ID as from request", "Id": "ID as from request",
"Token": "Token as from request", "Token": "Token as in request",
"TokenType": "Type as from request", "Type": "Type as in request",
"Sessions": [ "Users": [
"1", "2", "3", ... {
"Type": "Online",
"Id": "1",
"Ua": "Firefox 27",
"Status": {...}
},
{
"Type": "Online",
"Id": "3",
"Userid": "u3",
"Ua": "Chrome 28",
"Status": {...}
}, ...
] ]
} }
} }

12
src/app/spreed-webrtc-server/channeling.go

@ -75,6 +75,11 @@ type DataSession struct {
Status interface{} Status interface{}
} }
type DataUser struct {
Id string
Sessions int
}
type DataBye struct { type DataBye struct {
Type string Type string
To string To string
@ -142,6 +147,7 @@ type DataIncoming struct {
Conference *DataConference Conference *DataConference
Alive *DataAlive Alive *DataAlive
Authentication *DataAuthentication Authentication *DataAuthentication
Sessions *DataSessions
} }
type DataOutgoing struct { type DataOutgoing struct {
@ -153,8 +159,10 @@ type DataOutgoing struct {
type DataSessions struct { type DataSessions struct {
Type string Type string
Users []*DataSession Users []*DataSession
Index uint64 Id string `json:",omitempty"`
Batch uint64 Token string `json:",omitempty"`
Index uint64 `json:",omitempty"`
Batch uint64 `json:",omitempty"`
} }
type DataConference struct { type DataConference struct {

93
src/app/spreed-webrtc-server/hub.go

@ -57,11 +57,13 @@ type HubStat struct {
Rooms int `json:"rooms"` Rooms int `json:"rooms"`
Connections int `json:"connections"` Connections int `json:"connections"`
Sessions int `json:"sessions"` Sessions int `json:"sessions"`
Users int `json:"users"`
Count uint64 `json:"count"` Count uint64 `json:"count"`
BroadcastChatMessages uint64 `json:"broadcastchatmessages"` BroadcastChatMessages uint64 `json:"broadcastchatmessages"`
UnicastChatMessages uint64 `json:"unicastchatmessages"` UnicastChatMessages uint64 `json:"unicastchatmessages"`
IdsInRoom map[string][]string `json:"idsinroom,omitempty"` IdsInRoom map[string][]string `json:"idsinroom,omitempty"`
SessionsById map[string]*DataSession `json:"sessionsbyid,omitempty"` SessionsById map[string]*DataSession `json:"sessionsbyid,omitempty"`
UsersById map[string]*DataUser `json:"usersbyid,omitempty"`
ConnectionsByIdx map[string]string `json:"connectionsbyidx,omitempty"` ConnectionsByIdx map[string]string `json:"connectionsbyidx,omitempty"`
} }
@ -70,6 +72,7 @@ type Hub struct {
connectionTable map[string]*Connection connectionTable map[string]*Connection
sessionTable map[string]*Session sessionTable map[string]*Session
roomTable map[string]*RoomWorker roomTable map[string]*RoomWorker
userTable map[string]*User
version string version string
config *Config config *Config
sessionSecret []byte sessionSecret []byte
@ -94,6 +97,7 @@ func NewHub(version string, config *Config, sessionSecret, encryptionSecret, tur
connectionTable: make(map[string]*Connection), connectionTable: make(map[string]*Connection),
sessionTable: make(map[string]*Session), sessionTable: make(map[string]*Session),
roomTable: make(map[string]*RoomWorker), roomTable: make(map[string]*RoomWorker),
userTable: make(map[string]*User),
version: version, version: version,
config: config, config: config,
sessionSecret: []byte(sessionSecret), sessionSecret: []byte(sessionSecret),
@ -128,6 +132,7 @@ func (h *Hub) Stat(details bool) *HubStat {
Rooms: len(h.roomTable), Rooms: len(h.roomTable),
Connections: len(h.connectionTable), Connections: len(h.connectionTable),
Sessions: len(h.sessionTable), Sessions: len(h.sessionTable),
Users: len(h.userTable),
Count: h.count, Count: h.count,
BroadcastChatMessages: atomic.LoadUint64(&h.broadcastChatMessages), BroadcastChatMessages: atomic.LoadUint64(&h.broadcastChatMessages),
UnicastChatMessages: atomic.LoadUint64(&h.unicastChatMessages), UnicastChatMessages: atomic.LoadUint64(&h.unicastChatMessages),
@ -147,6 +152,11 @@ func (h *Hub) Stat(details bool) *HubStat {
sessions[sessionid] = session.Data() sessions[sessionid] = session.Data()
} }
stat.SessionsById = sessions stat.SessionsById = sessions
users := make(map[string]*DataUser)
for userid, user := range h.userTable {
users[userid] = user.Data()
}
stat.UsersById = users
connections := make(map[string]string) connections := make(map[string]string)
for id, connection := range h.connectionTable { for id, connection := range h.connectionTable {
connections[fmt.Sprintf("%d", connection.Idx)] = id connections[fmt.Sprintf("%d", connection.Idx)] = id
@ -203,8 +213,8 @@ func (h *Hub) CreateSession(request *http.Request, st *SessionToken) *Session {
if st == nil { if st == nil {
sid := NewRandomString(32) sid := NewRandomString(32)
id, _ := h.tickets.Encode("id", sid) id, _ := h.tickets.Encode("id", sid)
session = NewSession(id, sid, userid) session = NewSession(id, sid)
log.Println("Created new session id", len(id), id, sid, userid) log.Println("Created new session id", len(id), id, sid)
} else { } else {
if userid == "" { if userid == "" {
userid = st.Userid userid = st.Userid
@ -212,7 +222,11 @@ func (h *Hub) CreateSession(request *http.Request, st *SessionToken) *Session {
if !usersEnabled { if !usersEnabled {
userid = "" userid = ""
} }
session = NewSession(st.Id, st.Sid, userid) session = NewSession(st.Id, st.Sid)
}
if userid != "" {
h.authenticateHandler(session, st, userid)
} }
return session return session
@ -359,6 +373,15 @@ func (h *Hub) unregisterHandler(c *Connection) {
session := c.Session session := c.Session
delete(h.connectionTable, c.Id) delete(h.connectionTable, c.Id)
delete(h.sessionTable, c.Id) delete(h.sessionTable, c.Id)
if session != nil && session.Userid != "" {
user, ok := h.userTable[session.Userid]
if ok {
empty := user.RemoveSession(session)
if empty {
delete(h.userTable, session.Userid)
}
}
}
h.mutex.Unlock() h.mutex.Unlock()
if session != nil { if session != nil {
h.buddyImages.Delete(session.Id) h.buddyImages.Delete(session.Id)
@ -397,6 +420,51 @@ func (h *Hub) aliveHandler(c *Connection, alive *DataAlive) {
} }
func (h *Hub) sessionsHandler(c *Connection, sessions *DataSessions) {
reply := false
switch sessions.Type {
case "contact":
contact := &Contact{}
err := h.contacts.Decode("contactConfirmed", sessions.Token, contact)
if err != nil {
log.Println("Failed to decode incoming contact token", err)
return
}
if contact.A != c.Session.Userid && contact.B != c.Session.Userid {
log.Println("Ignoring foreign contact token")
return
}
h.mutex.RLock()
defer h.mutex.RUnlock()
user, ok := h.userTable[c.Session.Userid]
if !ok {
return
}
sessions.Users = user.SessionsData()
reply = true
default:
log.Println("Unkown incoming sessions request type", sessions.Type)
}
if reply {
sessionsJson := h.buffers.New()
encoder := json.NewEncoder(sessionsJson)
err := encoder.Encode(&DataOutgoing{From: c.Id, Data: sessions})
if err != nil {
log.Println("Sessions error while encoding JSON", err)
sessionsJson.Decref()
return
}
c.send(sessionsJson)
sessionsJson.Decref()
}
}
func (h *Hub) sessionupdateHandler(s *SessionUpdate) uint64 { func (h *Hub) sessionupdateHandler(s *SessionUpdate) uint64 {
//fmt.Println("Userupdate", u) //fmt.Println("Userupdate", u)
@ -444,6 +512,25 @@ func (h *Hub) sessiontokenHandler(st *SessionToken) (string, error) {
} }
func (h *Hub) authenticateHandler(session *Session, st *SessionToken, userid string) error {
err := session.Authenticate(h.realm, st, userid)
if err == nil {
// Authentication success.
h.mutex.Lock()
user, ok := h.userTable[session.Userid]
if !ok {
user = NewUser(session.Userid)
h.userTable[session.Userid] = user
}
h.mutex.Unlock()
user.AddSession(session)
}
return err
}
func (h *Hub) contactrequestHandler(c *Connection, to string, cr *DataContactRequest) error { func (h *Hub) contactrequestHandler(c *Connection, to string, cr *DataContactRequest) error {
var err error var err error

12
src/app/spreed-webrtc-server/server.go

@ -168,6 +168,8 @@ func (s *Server) OnText(c *Connection, b Buffer) {
} }
case "Alive": case "Alive":
s.Alive(c, msg.Alive) s.Alive(c, msg.Alive)
case "Sessions":
s.Sessions(c, msg.Sessions)
default: default:
log.Println("OnText unhandled message type", msg.Type) log.Println("OnText unhandled message type", msg.Type)
} }
@ -196,6 +198,12 @@ func (s *Server) Alive(c *Connection, alive *DataAlive) {
} }
func (s *Server) Sessions(c *Connection, sessions *DataSessions) {
c.h.sessionsHandler(c, sessions)
}
func (s *Server) UpdateSession(c *Connection, su *SessionUpdate) uint64 { func (s *Server) UpdateSession(c *Connection, su *SessionUpdate) uint64 {
su.Id = c.Id su.Id = c.Id
@ -243,9 +251,9 @@ func (s *Server) Users(c *Connection) {
func (s *Server) Authenticate(c *Connection, st *SessionToken) bool { func (s *Server) Authenticate(c *Connection, st *SessionToken) bool {
err := c.Session.Authenticate(c.h.realm, st) err := c.h.authenticateHandler(c.Session, st, "")
if err == nil { if err == nil {
log.Println("Authentication success", c.Id, c.Idx, st.Userid) log.Println("Authentication success", c.Id, c.Idx, c.Session.Userid)
return true return true
} else { } else {
log.Println("Authentication failed", err, c.Id, c.Idx, st.Userid, st.Nonce) log.Println("Authentication failed", err, c.Id, c.Idx, st.Userid, st.Nonce)

34
src/app/spreed-webrtc-server/session.go

@ -42,12 +42,11 @@ type Session struct {
mutex sync.RWMutex mutex sync.RWMutex
} }
func NewSession(id, sid, userid string) *Session { func NewSession(id, sid string) *Session {
return &Session{ return &Session{
Id: id, Id: id,
Sid: sid, Sid: sid,
Userid: userid,
} }
} }
@ -96,7 +95,7 @@ func (s *Session) Authorize(realm string, st *SessionToken) (string, error) {
} }
func (s *Session) Authenticate(realm string, st *SessionToken) error { func (s *Session) Authenticate(realm string, st *SessionToken, userid string) error {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
@ -104,20 +103,21 @@ func (s *Session) Authenticate(realm string, st *SessionToken) error {
if s.Userid != "" { if s.Userid != "" {
return errors.New("session already authenticated") return errors.New("session already authenticated")
} }
if s.Nonce == "" || s.Nonce != st.Nonce { if userid == "" {
return errors.New("nonce validation failed") if s.Nonce == "" || s.Nonce != st.Nonce {
} return errors.New("nonce validation failed")
var userid string }
err := sessionNonces.Decode(fmt.Sprintf("%s@%s", s.Sid, realm), st.Nonce, &userid) err := sessionNonces.Decode(fmt.Sprintf("%s@%s", s.Sid, realm), st.Nonce, &userid)
if err != nil { if err != nil {
return err return err
} }
if st.Userid != userid { if st.Userid != userid {
return errors.New("user id mismatch") return errors.New("user id mismatch")
}
s.Nonce = ""
} }
s.Nonce = "" s.Userid = userid
s.Userid = st.Userid
s.UpdateRev++ s.UpdateRev++
return nil return nil

90
src/app/spreed-webrtc-server/user.go

@ -0,0 +1,90 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2014 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"fmt"
"sync"
)
type User struct {
Id string
sessionTable map[string]*Session
mutex sync.RWMutex
}
func NewUser(id string) *User {
user := &User{
Id: id,
sessionTable: make(map[string]*Session),
}
return user
}
// Return true if first session.
func (u *User) AddSession(s *Session) bool {
first := false
u.mutex.Lock()
u.sessionTable[s.Id] = s
if len(u.sessionTable) == 1 {
fmt.Println("First session registered for user", u.Id)
first = true
}
u.mutex.Unlock()
return first
}
// Return true if no session left.
func (u *User) RemoveSession(s *Session) bool {
last := false
u.mutex.Lock()
delete(u.sessionTable, s.Id)
if len(u.sessionTable) == 0 {
fmt.Println("Last session unregistered for user", u.Id)
last = true
}
u.mutex.Unlock()
return last
}
func (u *User) Data() *DataUser {
u.mutex.RLock()
defer u.mutex.RUnlock()
return &DataUser{
Id: u.Id,
Sessions: len(u.sessionTable),
}
}
func (u *User) SessionsData() []*DataSession {
sessions := make([]*DataSession, 0, len(u.sessionTable))
u.mutex.RLock()
defer u.mutex.RUnlock()
for _, session := range u.sessionTable {
sessions = append(sessions, session.Data())
}
return sessions
}
Loading…
Cancel
Save