You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
297 lines
6.7 KiB
297 lines
6.7 KiB
/* |
|
* Spreed WebRTC. |
|
* Copyright (C) 2013-2015 struktur AG |
|
* |
|
* This file is part of Spreed WebRTC. |
|
* |
|
* This program is free software: you can redistribute it and/or modify |
|
* it under the terms of the GNU Affero General Public License as published by |
|
* the Free Software Foundation, either version 3 of the License, or |
|
* (at your option) any later version. |
|
* |
|
* This program is distributed in the hope that it will be useful, |
|
* but WITHOUT ANY WARRANTY; without even the implied warranty of |
|
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the |
|
* GNU Affero General Public License for more details. |
|
* |
|
* You should have received a copy of the GNU Affero General Public License |
|
* along with this program. If not, see <http://www.gnu.org/licenses/>. |
|
* |
|
*/ |
|
|
|
package channelling |
|
|
|
import ( |
|
"crypto/subtle" |
|
"log" |
|
"sync" |
|
"time" |
|
|
|
"github.com/strukturag/spreed-webrtc/go/buffercache" |
|
) |
|
|
|
const ( |
|
roomMaxWorkers = 10000 |
|
roomExpiryDuration = 60 * time.Second |
|
maxUsersLength = 5000 |
|
) |
|
|
|
type RoomWorker interface { |
|
Start() |
|
SessionIDs() []string |
|
Users() []*roomUser |
|
Update(*DataRoom) error |
|
GetUsers() []*DataSession |
|
Broadcast(sessionID string, buf buffercache.Buffer) |
|
Join(*DataRoomCredentials, *Session, Sender) (*DataRoom, error) |
|
Leave(sessionID string) |
|
GetType() string |
|
} |
|
|
|
type roomWorker struct { |
|
// References. |
|
manager *roomManager |
|
|
|
// Data handling. |
|
workers chan (func()) |
|
expired chan (bool) |
|
users map[string]*roomUser |
|
timer *time.Timer |
|
mutex sync.RWMutex |
|
|
|
// Metadata. |
|
id string |
|
name string |
|
roomType string |
|
credentials *DataRoomCredentials |
|
} |
|
|
|
type roomUser struct { |
|
*Session |
|
Sender |
|
} |
|
|
|
func NewRoomWorker(manager *roomManager, roomID, roomName, roomType string, credentials *DataRoomCredentials) RoomWorker { |
|
log.Printf("Creating worker for room '%s'\n", roomID) |
|
|
|
r := &roomWorker{ |
|
manager: manager, |
|
id: roomID, |
|
name: roomName, |
|
roomType: roomType, |
|
workers: make(chan func(), roomMaxWorkers), |
|
expired: make(chan bool), |
|
users: make(map[string]*roomUser), |
|
} |
|
|
|
if credentials != nil && len(credentials.PIN) > 0 { |
|
r.credentials = credentials |
|
} |
|
|
|
// Create expire timer. |
|
r.timer = time.AfterFunc(roomExpiryDuration, func() { |
|
r.expired <- true |
|
}) |
|
|
|
return r |
|
} |
|
|
|
func (r *roomWorker) Start() { |
|
// Main blocking worker. |
|
L: |
|
for { |
|
r.timer.Reset(roomExpiryDuration) |
|
select { |
|
case w := <-r.workers: |
|
//fmt.Println("Running worker", r.Id, w) |
|
w() |
|
case <-r.expired: |
|
//fmt.Println("Work room expired", r.Id) |
|
//fmt.Println("Work room expired", r.Id, len(r.connections)) |
|
r.mutex.RLock() |
|
if len(r.users) == 0 { |
|
// Cleanup room when it is empty. |
|
r.mutex.RUnlock() |
|
log.Printf("Room worker not in use - cleaning up '%s'\n", r.id) |
|
break L |
|
} else { |
|
r.mutex.RUnlock() |
|
} |
|
} |
|
} |
|
|
|
r.timer.Stop() |
|
close(r.workers) |
|
//fmt.Println("Exit worker", r.Id) |
|
} |
|
|
|
func (r *roomWorker) SessionIDs() []string { |
|
r.mutex.RLock() |
|
defer r.mutex.RUnlock() |
|
sessions := make([]string, 0, len(r.users)) |
|
for id := range r.users { |
|
sessions = append(sessions, id) |
|
} |
|
|
|
return sessions |
|
} |
|
|
|
func (r *roomWorker) Users() []*roomUser { |
|
r.mutex.RLock() |
|
defer r.mutex.RUnlock() |
|
users := make([]*roomUser, 0, len(r.users)) |
|
for _, user := range r.users { |
|
users = append(users, user) |
|
} |
|
|
|
return users |
|
} |
|
|
|
func (r *roomWorker) GetType() string { |
|
return r.roomType |
|
} |
|
|
|
func (r *roomWorker) Run(f func()) bool { |
|
select { |
|
case r.workers <- f: |
|
return true |
|
default: |
|
log.Printf("Room worker channel full or closed '%s'\n", r.id) |
|
return false |
|
} |
|
} |
|
|
|
func (r *roomWorker) Update(room *DataRoom) error { |
|
fault := make(chan error, 1) |
|
worker := func() { |
|
r.mutex.Lock() |
|
// Enforce room type and name. |
|
room.Type = r.roomType |
|
room.Name = r.name |
|
// Update credentials. |
|
if room.Credentials != nil { |
|
if len(room.Credentials.PIN) > 0 { |
|
r.credentials = room.Credentials |
|
} else { |
|
r.credentials = nil |
|
} |
|
} |
|
r.mutex.Unlock() |
|
fault <- nil |
|
} |
|
r.Run(worker) |
|
|
|
return <-fault |
|
} |
|
|
|
func (r *roomWorker) GetUsers() []*DataSession { |
|
out := make(chan []*DataSession, 1) |
|
worker := func() { |
|
var sl []*DataSession |
|
appender := func(user *roomUser) bool { |
|
ecsession := user.Session |
|
if ecsession != nil { |
|
session := ecsession.Data() |
|
session.Type = "Online" |
|
sl = append(sl, session) |
|
if len(sl) > maxUsersLength { |
|
log.Println("Limiting users response length in channel", r.id) |
|
return false |
|
} |
|
} |
|
return true |
|
} |
|
r.mutex.RLock() |
|
sl = make([]*DataSession, 0, len(r.users)) |
|
// Include connections in this room. |
|
for _, user := range r.users { |
|
if !appender(user) { |
|
break |
|
} |
|
} |
|
r.mutex.RUnlock() |
|
if r.id != r.manager.globalRoomID { |
|
// Include connections to global room. |
|
for _, ec := range r.manager.GlobalUsers() { |
|
if !appender(ec) { |
|
break |
|
} |
|
} |
|
} |
|
|
|
out <- sl |
|
} |
|
|
|
r.Run(worker) |
|
return <-out |
|
} |
|
|
|
func (r *roomWorker) Broadcast(sessionID string, message buffercache.Buffer) { |
|
worker := func() { |
|
r.mutex.RLock() |
|
for id, user := range r.users { |
|
if id == sessionID || user.Sender == nil { |
|
// Skip broadcast to self or non existing sender. |
|
continue |
|
} |
|
//fmt.Printf("%s\n", m.Message) |
|
user.Send(message) |
|
} |
|
r.mutex.RUnlock() |
|
message.Decref() |
|
} |
|
|
|
message.Incref() |
|
r.Run(worker) |
|
} |
|
|
|
type joinResult struct { |
|
*DataRoom |
|
error |
|
} |
|
|
|
func (r *roomWorker) Join(credentials *DataRoomCredentials, session *Session, sender Sender) (*DataRoom, error) { |
|
results := make(chan joinResult, 1) |
|
worker := func() { |
|
r.mutex.Lock() |
|
if r.credentials == nil && credentials != nil { |
|
results <- joinResult{nil, NewDataError("authorization_not_required", "No credentials may be provided for this room")} |
|
r.mutex.Unlock() |
|
return |
|
} else if r.credentials != nil { |
|
if credentials == nil { |
|
results <- joinResult{nil, NewDataError("authorization_required", "Valid credentials are required to join this room")} |
|
r.mutex.Unlock() |
|
return |
|
} |
|
|
|
if len(r.credentials.PIN) != len(credentials.PIN) || subtle.ConstantTimeCompare([]byte(r.credentials.PIN), []byte(credentials.PIN)) != 1 { |
|
results <- joinResult{nil, NewDataError("invalid_credentials", "The provided credentials are incorrect")} |
|
r.mutex.Unlock() |
|
return |
|
} |
|
} |
|
|
|
r.users[session.Id] = &roomUser{session, sender} |
|
// NOTE(lcooper): Needs to be a copy, else we risk races with |
|
// a subsequent modification of room properties. |
|
result := joinResult{&DataRoom{Name: r.name, Type: r.roomType}, nil} |
|
r.mutex.Unlock() |
|
results <- result |
|
} |
|
r.Run(worker) |
|
result := <-results |
|
|
|
return result.DataRoom, result.error |
|
} |
|
|
|
func (r *roomWorker) Leave(sessionID string) { |
|
worker := func() { |
|
r.mutex.Lock() |
|
if _, ok := r.users[sessionID]; ok { |
|
delete(r.users, sessionID) |
|
} |
|
r.mutex.Unlock() |
|
} |
|
r.Run(worker) |
|
}
|
|
|