Browse Source

Implemented pipeline for Offer, Candidate and Bye.

pull/272/head
Simon Eisenmann 9 years ago
parent
commit
336f2eda8f
  1. 28
      go/channelling/api/api.go
  2. 6
      go/channelling/api/api_test.go
  3. 6
      go/channelling/api/handle_chat.go
  4. 2
      go/channelling/api/handle_conference.go
  5. 2
      go/channelling/api/handle_self.go
  6. 43
      go/channelling/bus_manager.go
  7. 5
      go/channelling/client.go
  8. 30
      go/channelling/data.go
  9. 9
      go/channelling/hub.go
  10. 190
      go/channelling/pipeline.go
  11. 120
      go/channelling/pipeline_manager.go
  12. 98
      go/channelling/server/pipelines.go
  13. 37
      go/channelling/server/users.go
  14. 16
      go/channelling/session.go
  15. 53
      go/channelling/session_manager.go
  16. 32
      go/channelling/sink.go
  17. 11
      go/channelling/tickets.go
  18. 2
      go/channelling/unicaster.go
  19. 26
      go/channelling/userstore.go
  20. 17
      src/app/spreed-webrtc-server/handler_ws.go
  21. 18
      src/app/spreed-webrtc-server/main.go

28
go/channelling/api/api.go

@ -41,6 +41,7 @@ type channellingAPI struct {
TurnDataCreator channelling.TurnDataCreator TurnDataCreator channelling.TurnDataCreator
Unicaster channelling.Unicaster Unicaster channelling.Unicaster
BusManager channelling.BusManager BusManager channelling.BusManager
PipelineManager channelling.PipelineManager
config *channelling.Config config *channelling.Config
} }
@ -55,7 +56,8 @@ func New(config *channelling.Config,
contactManager channelling.ContactManager, contactManager channelling.ContactManager,
turnDataCreator channelling.TurnDataCreator, turnDataCreator channelling.TurnDataCreator,
unicaster channelling.Unicaster, unicaster channelling.Unicaster,
busManager channelling.BusManager) channelling.ChannellingAPI { busManager channelling.BusManager,
pipelineManager channelling.PipelineManager) channelling.ChannellingAPI {
return &channellingAPI{ return &channellingAPI{
roomStatus, roomStatus,
sessionEncoder, sessionEncoder,
@ -65,6 +67,7 @@ func New(config *channelling.Config,
turnDataCreator, turnDataCreator,
unicaster, unicaster,
busManager, busManager,
pipelineManager,
config, config,
} }
} }
@ -73,17 +76,18 @@ func (api *channellingAPI) OnConnect(client *channelling.Client, session *channe
api.Unicaster.OnConnect(client, session) api.Unicaster.OnConnect(client, session)
self, err := api.HandleSelf(session) self, err := api.HandleSelf(session)
if err == nil { if err == nil {
api.BusManager.Trigger(channelling.BusManagerConnect, session.Id, "", nil) api.BusManager.Trigger(channelling.BusManagerConnect, session.Id, "", nil, nil)
} }
return self, err return self, err
} }
func (api *channellingAPI) OnDisconnect(client *channelling.Client, session *channelling.Session) { func (api *channellingAPI) OnDisconnect(client *channelling.Client, session *channelling.Session) {
api.Unicaster.OnDisconnect(client, session) api.Unicaster.OnDisconnect(client, session)
api.BusManager.Trigger(channelling.BusManagerDisconnect, session.Id, "", nil) api.BusManager.Trigger(channelling.BusManagerDisconnect, session.Id, "", nil, nil)
} }
func (api *channellingAPI) OnIncoming(sender channelling.Sender, session *channelling.Session, msg *channelling.DataIncoming) (interface{}, error) { func (api *channellingAPI) OnIncoming(sender channelling.Sender, session *channelling.Session, msg *channelling.DataIncoming) (interface{}, error) {
var pipeline *channelling.Pipeline
switch msg.Type { switch msg.Type {
case "Self": case "Self":
return api.HandleSelf(session) return api.HandleSelf(session)
@ -99,19 +103,21 @@ func (api *channellingAPI) OnIncoming(sender channelling.Sender, session *channe
break break
} }
if _, ok := msg.Offer.Offer["_token"]; !ok { if _, ok := msg.Offer.Offer["_token"]; !ok {
pipeline = api.PipelineManager.GetPipeline(channelling.PipelineNamespaceCall, sender, session, msg.Offer.To)
// Trigger offer event when offer has no token, so this is // Trigger offer event when offer has no token, so this is
// not triggered for peerxfer and peerscreenshare offers. // not triggered for peerxfer and peerscreenshare offers.
api.BusManager.Trigger(channelling.BusManagerOffer, session.Id, msg.Offer.To, nil) api.BusManager.Trigger(channelling.BusManagerOffer, session.Id, msg.Offer.To, nil, pipeline)
} }
session.Unicast(msg.Offer.To, msg.Offer) session.Unicast(msg.Offer.To, msg.Offer, pipeline)
case "Candidate": case "Candidate":
if msg.Candidate == nil || msg.Candidate.Candidate == nil { if msg.Candidate == nil || msg.Candidate.Candidate == nil {
log.Println("Received invalid candidate message.", msg) log.Println("Received invalid candidate message.", msg)
break break
} }
session.Unicast(msg.Candidate.To, msg.Candidate) pipeline = api.PipelineManager.GetPipeline(channelling.PipelineNamespaceCall, sender, session, msg.Candidate.To)
session.Unicast(msg.Candidate.To, msg.Candidate, pipeline)
case "Answer": case "Answer":
if msg.Answer == nil || msg.Answer.Answer == nil { if msg.Answer == nil || msg.Answer.Answer == nil {
log.Println("Received invalid answer message.", msg) log.Println("Received invalid answer message.", msg)
@ -120,10 +126,10 @@ func (api *channellingAPI) OnIncoming(sender channelling.Sender, session *channe
if _, ok := msg.Answer.Answer["_token"]; !ok { if _, ok := msg.Answer.Answer["_token"]; !ok {
// Trigger answer event when answer has no token. so this is // Trigger answer event when answer has no token. so this is
// not triggered for peerxfer and peerscreenshare answers. // not triggered for peerxfer and peerscreenshare answers.
api.BusManager.Trigger(channelling.BusManagerAnswer, session.Id, msg.Answer.To, nil) api.BusManager.Trigger(channelling.BusManagerAnswer, session.Id, msg.Answer.To, nil, pipeline)
} }
session.Unicast(msg.Answer.To, msg.Answer) session.Unicast(msg.Answer.To, msg.Answer, pipeline)
case "Users": case "Users":
return api.HandleUsers(session) return api.HandleUsers(session)
case "Authentication": case "Authentication":
@ -137,9 +143,11 @@ func (api *channellingAPI) OnIncoming(sender channelling.Sender, session *channe
log.Println("Received invalid bye message.", msg) log.Println("Received invalid bye message.", msg)
break break
} }
api.BusManager.Trigger(channelling.BusManagerBye, session.Id, msg.Bye.To, nil) pipeline = api.PipelineManager.GetPipeline(channelling.PipelineNamespaceCall, sender, session, msg.Bye.To)
api.BusManager.Trigger(channelling.BusManagerBye, session.Id, msg.Bye.To, nil, pipeline)
session.Unicast(msg.Bye.To, msg.Bye) session.Unicast(msg.Bye.To, msg.Bye, pipeline)
pipeline.Close()
case "Status": case "Status":
if msg.Status == nil { if msg.Status == nil {
log.Println("Received invalid status message.", msg) log.Println("Received invalid status message.", msg)

6
go/channelling/api/api_test.go

@ -35,6 +35,10 @@ import (
type fakeClient struct { type fakeClient struct {
} }
func (fake *fakeClient) Index() uint64 {
return 0
}
func (fake *fakeClient) Send(_ buffercache.Buffer) { func (fake *fakeClient) Send(_ buffercache.Buffer) {
} }
@ -83,7 +87,7 @@ func NewTestChannellingAPI() (channelling.ChannellingAPI, *fakeClient, *channell
sessionNonces := securecookie.New(securecookie.GenerateRandomKey(64), nil) sessionNonces := securecookie.New(securecookie.GenerateRandomKey(64), nil)
session := channelling.NewSession(nil, nil, roomManager, roomManager, nil, sessionNonces, "", "") session := channelling.NewSession(nil, nil, roomManager, roomManager, nil, sessionNonces, "", "")
busManager := channelling.NewBusManager("", false, "") busManager := channelling.NewBusManager("", false, "")
return New(nil, roomManager, nil, nil, nil, nil, nil, nil, busManager), client, session, roomManager return New(nil, roomManager, nil, nil, nil, nil, nil, nil, busManager, nil), client, session, roomManager
} }
func Test_ChannellingAPI_OnIncoming_HelloMessage_JoinsTheSelectedRoom(t *testing.T) { func Test_ChannellingAPI_OnIncoming_HelloMessage_JoinsTheSelectedRoom(t *testing.T) {

6
go/channelling/api/handle_chat.go

@ -34,7 +34,7 @@ func (api *channellingAPI) HandleChat(session *channelling.Session, chat *channe
to := chat.To to := chat.To
if !msg.NoEcho { if !msg.NoEcho {
session.Unicast(session.Id, chat) session.Unicast(session.Id, chat, nil)
} }
msg.Time = time.Now().Format(time.RFC3339) msg.Time = time.Now().Format(time.RFC3339)
if to == "" { if to == "" {
@ -59,10 +59,10 @@ func (api *channellingAPI) HandleChat(session *channelling.Session, chat *channe
api.StatsCounter.CountUnicastChat() api.StatsCounter.CountUnicastChat()
} }
session.Unicast(to, chat) session.Unicast(to, chat, nil)
if msg.Mid != "" { if msg.Mid != "" {
// Send out delivery confirmation status chat message. // Send out delivery confirmation status chat message.
session.Unicast(session.Id, &channelling.DataChat{To: to, Type: "Chat", Chat: &channelling.DataChatMessage{Mid: msg.Mid, Status: &channelling.DataChatStatus{State: "sent"}}}) session.Unicast(session.Id, &channelling.DataChat{To: to, Type: "Chat", Chat: &channelling.DataChatMessage{Mid: msg.Mid, Status: &channelling.DataChatStatus{State: "sent"}}}, nil)
} }
} }
} }

2
go/channelling/api/handle_conference.go

@ -37,7 +37,7 @@ func (api *channellingAPI) HandleConference(session *channelling.Session, confer
// Send conference update to anyone. // Send conference update to anyone.
for _, id := range conference.Conference { for _, id := range conference.Conference {
if id != session.Id { if id != session.Id {
session.Unicast(id, conference) session.Unicast(id, conference, nil)
} }
} }
} }

2
go/channelling/api/handle_self.go

@ -47,7 +47,7 @@ func (api *channellingAPI) HandleSelf(session *channelling.Session) (*channellin
Turn: api.TurnDataCreator.CreateTurnData(session), Turn: api.TurnDataCreator.CreateTurnData(session),
Stun: api.config.StunURIs, Stun: api.config.StunURIs,
} }
api.BusManager.Trigger(channelling.BusManagerSession, session.Id, session.Userid(), nil) api.BusManager.Trigger(channelling.BusManagerSession, session.Id, session.Userid(), nil, nil)
return self, nil return self, nil
} }

43
go/channelling/bus_manager.go

@ -25,7 +25,9 @@ import (
"errors" "errors"
"fmt" "fmt"
"log" "log"
"time"
"github.com/nats-io/nats"
"github.com/strukturag/spreed-webrtc/go/natsconnection" "github.com/strukturag/spreed-webrtc/go/natsconnection"
) )
@ -41,7 +43,10 @@ const (
// A BusManager provides the API to interact with a bus. // A BusManager provides the API to interact with a bus.
type BusManager interface { type BusManager interface {
Trigger(name, from, payload string, data interface{}) error Publish(subject string, v interface{}) error
Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error
Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error
Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error)
} }
// A BusTrigger is a container to serialize trigger events // A BusTrigger is a container to serialize trigger events
@ -52,6 +57,7 @@ type BusTrigger struct {
From string From string
Payload string `json:",omitempty"` Payload string `json:",omitempty"`
Data interface{} `json:",omitempty"` Data interface{} `json:",omitempty"`
Pipeline string `json:",omitempty"`
} }
// BusSubjectTrigger returns the bus subject for trigger payloads. // BusSubjectTrigger returns the bus subject for trigger payloads.
@ -81,7 +87,7 @@ func NewBusManager(id string, useNats bool, subjectPrefix string) BusManager {
b = &noopBus{id} b = &noopBus{id}
} }
if err == nil { if err == nil {
b.Trigger(BusManagerStartup, id, "", nil) b.Trigger(BusManagerStartup, id, "", nil, nil)
} }
return &busManager{b} return &busManager{b}
@ -91,10 +97,22 @@ type noopBus struct {
id string id string
} }
func (bus *noopBus) Trigger(name, from, payload string, data interface{}) error { func (bus *noopBus) Publish(subject string, v interface{}) error {
return nil return nil
} }
func (bus *noopBus) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error {
return nil
}
func (bus *noopBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error {
return nil
}
func (bus *noopBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) {
return nil, nil
}
type natsBus struct { type natsBus struct {
id string id string
prefix string prefix string
@ -118,8 +136,15 @@ func newNatsBus(id, prefix string) (*natsBus, error) {
return &natsBus{id, prefix, ec, triggerQueue}, nil return &natsBus{id, prefix, ec, triggerQueue}, nil
} }
func (bus *natsBus) Trigger(name, from, payload string, data interface{}) (err error) { func (bus *natsBus) Publish(subject string, v interface{}) error {
if bus.ec != nil { return bus.ec.Publish(subject, v)
}
func (bus *natsBus) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error {
return bus.ec.Request(subject, v, vPtr, timeout)
}
func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) (err error) {
trigger := &BusTrigger{ trigger := &BusTrigger{
Id: bus.id, Id: bus.id,
Name: name, Name: name,
@ -127,6 +152,9 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}) (err e
Payload: payload, Payload: payload,
Data: data, Data: data,
} }
if pipeline != nil {
trigger.Pipeline = pipeline.GetID()
}
entry := &busQueueEntry{BusSubjectTrigger(bus.prefix, name), trigger} entry := &busQueueEntry{BusSubjectTrigger(bus.prefix, name), trigger}
select { select {
case bus.triggerQueue <- entry: case bus.triggerQueue <- entry:
@ -135,11 +163,14 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}) (err e
log.Println("Failed to queue NATS event - queue full?") log.Println("Failed to queue NATS event - queue full?")
err = errors.New("NATS trigger queue full") err = errors.New("NATS trigger queue full")
} }
}
return err return err
} }
func (bus *natsBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) {
return bus.ec.Subscribe(subject, cb)
}
type busQueueEntry struct { type busQueueEntry struct {
subject string subject string
data interface{} data interface{}

5
go/channelling/client.go

@ -28,13 +28,14 @@ import (
) )
type Sender interface { type Sender interface {
Index() uint64
Send(buffercache.Buffer) Send(buffercache.Buffer)
} }
type Client struct { type Client struct {
Codec
ChannellingAPI
Connection Connection
Codec
ChannellingAPI ChannellingAPI
session *Session session *Session
} }

30
go/channelling/data.go

@ -182,25 +182,25 @@ type DataAutoCall struct {
type DataIncoming struct { type DataIncoming struct {
Type string Type string
Hello *DataHello Hello *DataHello `json:",omitempty"`
Offer *DataOffer Offer *DataOffer `json:",omitempty"`
Candidate *DataCandidate Candidate *DataCandidate `json:",omitempty"`
Answer *DataAnswer Answer *DataAnswer `json:",omitempty"`
Bye *DataBye Bye *DataBye `json:",omitempty"`
Status *DataStatus Status *DataStatus `json:",omitempty"`
Chat *DataChat Chat *DataChat `json:",omitempty"`
Conference *DataConference Conference *DataConference `json:",omitempty"`
Alive *DataAlive Alive *DataAlive `json:",omitempty"`
Authentication *DataAuthentication Authentication *DataAuthentication `json:",omitempty"`
Sessions *DataSessions Sessions *DataSessions `json:",omitempty"`
Room *DataRoom Room *DataRoom `json:",omitempty"`
Iid string `json:",omitempty"` Iid string `json:",omitempty"`
} }
type DataOutgoing struct { type DataOutgoing struct {
Data interface{} Data interface{} `json:",omitempty"`
From string From string `json:",omitempty"`
To string To string `json:",omitempty"`
Iid string `json:",omitempty"` Iid string `json:",omitempty"`
A string `json:",omitempty"` A string `json:",omitempty"`
} }

9
go/channelling/hub.go

@ -155,13 +155,18 @@ func (h *hub) GetClient(sessionID string) (client *Client, ok bool) {
return return
} }
func (h *hub) Unicast(to string, outgoing *DataOutgoing) { func (h *hub) Unicast(to string, outgoing *DataOutgoing, pipeline *Pipeline) {
if message, err := h.EncodeOutgoing(outgoing); err == nil {
client, ok := h.GetClient(to) client, ok := h.GetClient(to)
if pipeline != nil {
if complete := pipeline.FlushOutgoing(h, client, to, outgoing); complete {
return
}
}
if !ok { if !ok {
log.Println("Unicast To not found", to) log.Println("Unicast To not found", to)
return return
} }
if message, err := h.EncodeOutgoing(outgoing); err == nil {
client.Send(message) client.Send(message)
message.Decref() message.Decref()
} }

190
go/channelling/pipeline.go

@ -0,0 +1,190 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
import (
"bytes"
"encoding/json"
"errors"
"log"
"sync"
"time"
"github.com/strukturag/spreed-webrtc/go/buffercache"
)
type PipelineFeedLine struct {
Seq int
Msg *DataOutgoing
}
type Pipeline struct {
PipelineManager PipelineManager
mutex sync.RWMutex
namespace string
id string
from *Session
expires *time.Time
data []*DataOutgoing
sink Sink
}
func NewPipeline(manager PipelineManager,
namespace string,
id string,
from *Session,
duration time.Duration) *Pipeline {
pipeline := &Pipeline{
PipelineManager: manager,
namespace: namespace,
id: id,
from: from,
}
pipeline.Refresh(duration)
return pipeline
}
func (pipeline *Pipeline) GetID() string {
return pipeline.id
}
func (pipeline *Pipeline) Refresh(duration time.Duration) {
pipeline.mutex.Lock()
expiration := time.Now().Add(duration)
pipeline.expires = &expiration
pipeline.mutex.Unlock()
}
func (pipeline *Pipeline) Add(msg *DataOutgoing) *Pipeline {
pipeline.mutex.Lock()
pipeline.data = append(pipeline.data, msg)
pipeline.mutex.Unlock()
return pipeline
}
func (pipeline *Pipeline) Send(b buffercache.Buffer) {
// noop for now
}
func (pipeline *Pipeline) Index() uint64 {
return 0
}
func (pipeline *Pipeline) Close() {
pipeline.mutex.Lock()
pipeline.expires = nil
if pipeline.sink != nil {
pipeline.sink.Close()
pipeline.sink = nil
}
pipeline.mutex.Unlock()
}
func (pipeline *Pipeline) Expired() bool {
var expired bool
pipeline.mutex.RLock()
if pipeline.expires == nil {
expired = true
} else {
expired = pipeline.expires.Before(time.Now())
}
pipeline.mutex.RUnlock()
return expired
}
func (pipeline *Pipeline) Session() *Session {
return pipeline.from
}
func (pipeline *Pipeline) JSONFeed(since, limit int) ([]byte, error) {
pipeline.mutex.RLock()
var lineRaw []byte
var line *PipelineFeedLine
var buffer bytes.Buffer
var err error
data := pipeline.data[since:]
count := 0
for seq, msg := range data {
line = &PipelineFeedLine{
Seq: seq + since,
Msg: msg,
}
lineRaw, err = json.Marshal(line)
if err != nil {
return nil, err
}
buffer.Write(lineRaw)
buffer.WriteString("\n")
count++
if limit > 0 && count >= limit {
break
}
}
pipeline.mutex.RUnlock()
return buffer.Bytes(), nil
}
func (pipeline *Pipeline) FlushOutgoing(hub Hub, client *Client, to string, outgoing *DataOutgoing) bool {
pipeline.mutex.RLock()
log.Println("Flush outgoing via pipeline", to, client == nil)
if client == nil {
sink := pipeline.sink
pipeline.mutex.RUnlock()
pipeline.Add(outgoing)
// It is possible to retrieve the userid for fake sessions here.
if session, found := pipeline.PipelineManager.GetSession(to); found {
log.Println("Pipeline found userid via manager", session.Userid())
}
if sink == nil {
return true
}
// Sink it.
pipeline.sink.Write(outgoing)
return true
}
pipeline.mutex.RUnlock()
return false
}
func (pipeline *Pipeline) Attach(sink Sink) error {
pipeline.mutex.Lock()
defer pipeline.mutex.Unlock()
if pipeline.sink != nil {
return errors.New("pipeline already attached to sink")
}
pipeline.sink = sink
// Sink existing data first.
log.Println("Attach sink to pipeline", pipeline.id)
for _, msg := range pipeline.data {
sink.Write(msg)
}
return nil
}

120
go/channelling/pipeline_manager.go

@ -0,0 +1,120 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
import (
"fmt"
"log"
"sync"
"time"
)
const (
PipelineNamespaceCall = "call"
)
type PipelineManager interface {
BusManager
SessionStore
UserStore
GetPipelineByID(id string) (pipeline *Pipeline, ok bool)
GetPipeline(namespace string, sender Sender, session *Session, to string) *Pipeline
}
type pipelineManager struct {
BusManager
SessionStore
UserStore
mutex sync.RWMutex
pipelines map[string]*Pipeline
duration time.Duration
}
func NewPipelineManager(busManager BusManager, sessionStore SessionStore, userStore UserStore) PipelineManager {
plm := &pipelineManager{
BusManager: busManager,
SessionStore: sessionStore,
UserStore: userStore,
pipelines: make(map[string]*Pipeline),
duration: 30 * time.Minute,
}
plm.start()
return plm
}
func (plm *pipelineManager) cleanup() {
plm.mutex.Lock()
for id, pipeline := range plm.pipelines {
if pipeline.Expired() {
pipeline.Close()
delete(plm.pipelines, id)
}
}
plm.mutex.Unlock()
}
func (plm *pipelineManager) start() {
c := time.Tick(30 * time.Second)
go func() {
for _ = range c {
plm.cleanup()
}
}()
}
func (plm *pipelineManager) GetPipelineByID(id string) (*Pipeline, bool) {
plm.mutex.RLock()
pipeline, ok := plm.pipelines[id]
if !ok {
// XXX(longsleep): Hack for development
for _, pipeline = range plm.pipelines {
ok = true
break
}
}
plm.mutex.RUnlock()
return pipeline, ok
}
func (plm *pipelineManager) PipelineID(namespace string, sender Sender, session *Session, to string) string {
return fmt.Sprintf("%s.%s.%s", namespace, session.Id, to)
}
func (plm *pipelineManager) GetPipeline(namespace string, sender Sender, session *Session, to string) *Pipeline {
id := plm.PipelineID(namespace, sender, session, to)
plm.mutex.Lock()
pipeline, ok := plm.pipelines[id]
if ok {
// Refresh. We do not care if the pipeline is expired.
pipeline.Refresh(plm.duration)
plm.mutex.Unlock()
return pipeline
}
log.Println("Creating pipeline", namespace, id)
pipeline = NewPipeline(plm, namespace, id, session, plm.duration)
plm.pipelines[id] = pipeline
plm.mutex.Unlock()
return pipeline
}

98
go/channelling/server/pipelines.go

@ -0,0 +1,98 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package server
import (
"encoding/json"
"net/http"
"strconv"
"github.com/strukturag/spreed-webrtc/go/channelling"
"github.com/gorilla/mux"
)
type Pipelines struct {
channelling.PipelineManager
API channelling.ChannellingAPI
}
func (pipelines *Pipelines) Get(request *http.Request) (int, interface{}, http.Header) {
vars := mux.Vars(request)
id, ok := vars["id"]
if !ok {
return http.StatusNotFound, "", nil
}
pipeline, ok := pipelines.GetPipelineByID(id)
if !ok {
return http.StatusNotFound, "", nil
}
since := 0
limit := 0
if sinceParam := request.Form.Get("since"); sinceParam != "" {
since, _ = strconv.Atoi(sinceParam)
}
if limitParam := request.Form.Get("limit"); limitParam != "" {
limit, _ = strconv.Atoi(limitParam)
}
result, err := pipeline.JSONFeed(since, limit)
if err != nil {
return http.StatusInternalServerError, err.Error(), nil
}
return http.StatusOK, result, nil
}
func (pipelines *Pipelines) Post(request *http.Request) (int, interface{}, http.Header) {
vars := mux.Vars(request)
id, ok := vars["id"]
if !ok {
return http.StatusNotFound, "", nil
}
pipeline, ok := pipelines.GetPipelineByID(id)
if !ok {
return http.StatusNotFound, "", nil
}
var incoming channelling.DataIncoming
dec := json.NewDecoder(request.Body)
if err := dec.Decode(&incoming); err != nil {
return http.StatusBadRequest, err.Error(), nil
}
result := &channelling.DataOutgoing{
From: pipeline.Session().Id,
Iid: incoming.Iid,
}
reply, err := pipelines.API.OnIncoming(pipeline, pipeline.Session(), &incoming)
if err == nil {
result.Data = reply
} else {
result.Data = err
}
return http.StatusOK, result, nil
}

37
go/channelling/server/users.go

@ -293,41 +293,27 @@ func (un *UserNonce) Response() (int, interface{}, http.Header) {
} }
type Users struct { type Users struct {
channelling.SessionStore
channelling.SessionValidator channelling.SessionValidator
channelling.SessionManager channelling.SessionManager
channelling.SessionStore
realm string realm string
handler UsersHandler handler UsersHandler
} }
func NewUsers(sessionStore channelling.SessionStore, sessionValidator channelling.SessionValidator, sessionManager channelling.SessionManager, mode, realm string, runtime phoenix.Runtime) *Users { func NewUsers(sessionStore channelling.SessionStore, sessionValidator channelling.SessionValidator, sessionManager channelling.SessionManager, mode, realm string, runtime phoenix.Runtime) *Users {
var users = &Users{ var users = &Users{
sessionStore,
sessionValidator, sessionValidator,
sessionManager, sessionManager,
sessionStore,
realm, realm,
nil, nil,
} }
// Create handler based on mode.
var handler UsersHandler var handler UsersHandler
var err error var err error
// Create handler based on mode.
if handler, err = users.createHandler(mode, runtime); handler != nil && err == nil { if handler, err = users.createHandler(mode, runtime); handler != nil && err == nil {
users.handler = handler users.handler = handler
// Register handler Get.
sessionManager.RetrieveUsersWith(func(request *http.Request) (userid string, err error) {
userid, err = handler.Get(request)
if err != nil {
log.Printf("Failed to get userid from handler: %s", err)
} else {
if userid != "" {
log.Printf("Users handler get success: %s\n", userid)
}
}
return
})
log.Printf("Enabled users handler '%s'\n", mode) log.Printf("Enabled users handler '%s'\n", mode)
} else if err != nil { } else if err != nil {
log.Printf("Failed to enable handler '%s': %s\n", mode, err) log.Printf("Failed to enable handler '%s': %s\n", mode, err)
@ -422,6 +408,21 @@ func (users *Users) createHandler(mode string, runtime phoenix.Runtime) (handler
} }
func (users *Users) GetUserID(request *http.Request) (userid string, err error) {
if users.handler == nil {
return
}
userid, err = users.handler.Get(request)
if err != nil {
log.Printf("Failed to get userid from handler: %s", err)
} else {
if userid != "" {
log.Printf("Users handler get success: %s\n", userid)
}
}
return
}
// Post is used to create new userids for this server. // Post is used to create new userids for this server.
func (users *Users) Post(request *http.Request) (int, interface{}, http.Header) { func (users *Users) Post(request *http.Request) (int, interface{}, http.Header) {
@ -465,7 +466,7 @@ func (users *Users) Post(request *http.Request) (int, interface{}, http.Header)
nonce string nonce string
err error err error
) )
if session, ok := users.GetSession(snr.Id); ok { if session, ok := users.SessionStore.GetSession(snr.Id); ok {
nonce, err = session.Authorize(users.Realm(), &channelling.SessionToken{Id: snr.Id, Sid: snr.Sid, Userid: userid}) nonce, err = session.Authorize(users.Realm(), &channelling.SessionToken{Id: snr.Id, Sid: snr.Sid, Userid: userid})
} else { } else {
err = errors.New("no such session") err = errors.New("no such session")

16
go/channelling/session.go

@ -33,10 +33,10 @@ import (
var sessionNonces *securecookie.SecureCookie var sessionNonces *securecookie.SecureCookie
type Session struct { type Session struct {
SessionManager SessionManager SessionManager
Unicaster Unicaster Unicaster
Broadcaster Broadcaster Broadcaster
RoomStatusManager RoomStatusManager RoomStatusManager
buddyImages ImageCache buddyImages ImageCache
Id string Id string
Sid string Sid string
@ -194,7 +194,7 @@ func (s *Session) BroadcastStatus() {
s.mutex.RUnlock() s.mutex.RUnlock()
} }
func (s *Session) Unicast(to string, m interface{}) { func (s *Session) Unicast(to string, m interface{}, pipeline *Pipeline) {
s.mutex.RLock() s.mutex.RLock()
outgoing := &DataOutgoing{ outgoing := &DataOutgoing{
From: s.Id, From: s.Id,
@ -204,7 +204,7 @@ func (s *Session) Unicast(to string, m interface{}) {
} }
s.mutex.RUnlock() s.mutex.RUnlock()
s.Unicaster.Unicast(to, outgoing) s.Unicaster.Unicast(to, outgoing, pipeline)
} }
func (s *Session) Close() { func (s *Session) Close() {
@ -235,12 +235,12 @@ func (s *Session) Close() {
} }
for _, session := range s.subscribers { for _, session := range s.subscribers {
s.Unicaster.Unicast(session.Id, outgoing) s.Unicaster.Unicast(session.Id, outgoing, nil)
} }
for _, session := range s.subscriptions { for _, session := range s.subscriptions {
session.RemoveSubscriber(s.Id) session.RemoveSubscriber(s.Id)
s.Unicaster.Unicast(session.Id, outgoing) s.Unicaster.Unicast(session.Id, outgoing, nil)
} }
s.SessionManager.DestroySession(s.Id, s.userid) s.SessionManager.DestroySession(s.Id, s.userid)

53
go/channelling/session_manager.go

@ -35,11 +35,13 @@ type UserStats interface {
type SessionManager interface { type SessionManager interface {
UserStats UserStats
RetrieveUsersWith(func(*http.Request) (string, error)) SessionStore
CreateSession(*http.Request) *Session UserStore
CreateSession(st *SessionToken, userid string) *Session
DestroySession(sessionID, userID string) DestroySession(sessionID, userID string)
Authenticate(*Session, *SessionToken, string) error Authenticate(*Session, *SessionToken, string) error
GetUserSessions(session *Session, id string) []*DataSession GetUserSessions(session *Session, id string) []*DataSession
DecodeSessionToken(token string) (st *SessionToken)
} }
type sessionManager struct { type sessionManager struct {
@ -51,7 +53,8 @@ type sessionManager struct {
buddyImages ImageCache buddyImages ImageCache
config *Config config *Config
userTable map[string]*User userTable map[string]*User
fakesessionTable map[string]*Session sessionTable map[string]*Session
sessionByUserIDTable map[string]*Session
useridRetriever func(*http.Request) (string, error) useridRetriever func(*http.Request) (string, error)
attestations *securecookie.SecureCookie attestations *securecookie.SecureCookie
} }
@ -67,6 +70,7 @@ func NewSessionManager(config *Config, tickets Tickets, unicaster Unicaster, bro
config, config,
make(map[string]*User), make(map[string]*User),
make(map[string]*Session), make(map[string]*Session),
make(map[string]*Session),
nil, nil,
nil, nil,
} }
@ -93,29 +97,28 @@ func (sessionManager *sessionManager) UserInfo(details bool) (userCount int, use
return return
} }
func (sessionManager *sessionManager) RetrieveUsersWith(retriever func(*http.Request) (string, error)) { // GetSession returns the client-less sessions created directly by sessionManager.
sessionManager.useridRetriever = retriever func (sessionManager *sessionManager) GetSession(id string) (*Session, bool) {
sessionManager.RLock()
defer sessionManager.RUnlock()
session, ok := sessionManager.sessionTable[id]
return session, ok
} }
func (sessionManager *sessionManager) CreateSession(request *http.Request) *Session { func (sessionManager *sessionManager) GetUser(id string) (*User, bool) {
request.ParseForm() sessionManager.RLock()
token := request.FormValue("t") defer sessionManager.RUnlock()
st := sessionManager.DecodeSessionToken(token)
var userid string user, ok := sessionManager.userTable[id]
if sessionManager.config.UsersEnabled { return user, ok
if sessionManager.useridRetriever != nil {
userid, _ = sessionManager.useridRetriever(request)
if userid == "" {
userid = st.Userid
}
}
} }
func (sessionManager *sessionManager) CreateSession(st *SessionToken, userid string) *Session {
session := NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid) session := NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid)
if userid != "" { if userid != "" {
// XXX(lcooper): Should errors be handled here? // Errors are ignored here, session is returned without userID when auth failed.
sessionManager.Authenticate(session, st, userid) sessionManager.Authenticate(session, st, userid)
} }
@ -128,10 +131,15 @@ func (sessionManager *sessionManager) DestroySession(sessionID, userID string) {
} }
sessionManager.Lock() sessionManager.Lock()
user, ok := sessionManager.userTable[userID] if user, ok := sessionManager.userTable[userID]; ok && user.RemoveSession(sessionID) {
if ok && user.RemoveSession(sessionID) {
delete(sessionManager.userTable, userID) delete(sessionManager.userTable, userID)
} }
if _, ok := sessionManager.sessionTable[sessionID]; ok {
delete(sessionManager.sessionTable, sessionID)
}
if session, ok := sessionManager.sessionByUserIDTable[userID]; ok && session.Id == sessionID {
delete(sessionManager.sessionByUserIDTable, sessionID)
}
sessionManager.Unlock() sessionManager.Unlock()
} }
@ -165,12 +173,13 @@ func (sessionManager *sessionManager) GetUserSessions(session *Session, userid s
if !ok { if !ok {
// No user. Create fake session. // No user. Create fake session.
sessionManager.Lock() sessionManager.Lock()
session, ok := sessionManager.fakesessionTable[userid] session, ok := sessionManager.sessionByUserIDTable[userid]
if !ok { if !ok {
st := sessionManager.FakeSessionToken(userid) st := sessionManager.FakeSessionToken(userid)
session = NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid) session = NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid)
session.SetUseridFake(st.Userid) session.SetUseridFake(st.Userid)
sessionManager.fakesessionTable[userid] = session sessionManager.sessionByUserIDTable[userid] = session
sessionManager.sessionTable[session.Id] = session
} }
sessionManager.Unlock() sessionManager.Unlock()
users = make([]*DataSession, 1, 1) users = make([]*DataSession, 1, 1)

32
go/channelling/sink.go

@ -0,0 +1,32 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
import ()
// Sink connects a Pipeline with end points in both directions by
// getting attached to a Pipeline.
type Sink interface {
// Write sends outgoing data on the sink from the
Write(interface{})
Close()
}

11
go/channelling/tickets.go

@ -96,13 +96,12 @@ func (tickets *tickets) DecodeSessionToken(token string) (st *SessionToken) {
return return
} }
func (tickets *tickets) FakeSessionToken(userid string) *SessionToken { func (tickets *tickets) FakeSessionToken(userid string) (st *SessionToken) {
st := &SessionToken{} sid := fmt.Sprintf("fake-%s", randomstring.NewRandomString(27))
st.Sid = fmt.Sprintf("fake-%s", randomstring.NewRandomString(27)) id, _ := tickets.Encode("id", sid)
st.Id, _ = tickets.Encode("id", st.Sid) st = &SessionToken{Id: id, Sid: sid, Userid: userid}
st.Userid = userid
log.Println("Created new fake session id", st.Id) log.Println("Created new fake session id", st.Id)
return st return
} }
func (tickets *tickets) ValidateSession(id, sid string) bool { func (tickets *tickets) ValidateSession(id, sid string) bool {

2
go/channelling/unicaster.go

@ -25,5 +25,5 @@ type Unicaster interface {
SessionStore SessionStore
OnConnect(*Client, *Session) OnConnect(*Client, *Session)
OnDisconnect(*Client, *Session) OnDisconnect(*Client, *Session)
Unicast(to string, outgoing *DataOutgoing) Unicast(to string, outgoing *DataOutgoing, pipeline *Pipeline)
} }

26
go/channelling/userstore.go

@ -0,0 +1,26 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
type UserStore interface {
GetUser(id string) (user *User, ok bool)
}

17
src/app/spreed-webrtc-server/handler_ws.go

@ -26,6 +26,7 @@ import (
"net/http" "net/http"
"github.com/strukturag/spreed-webrtc/go/channelling" "github.com/strukturag/spreed-webrtc/go/channelling"
"github.com/strukturag/spreed-webrtc/go/channelling/server"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
@ -52,7 +53,7 @@ var (
} }
) )
func makeWSHandler(connectionCounter channelling.ConnectionCounter, sessionManager channelling.SessionManager, codec channelling.Codec, channellingAPI channelling.ChannellingAPI) http.HandlerFunc { func makeWSHandler(connectionCounter channelling.ConnectionCounter, sessionManager channelling.SessionManager, codec channelling.Codec, channellingAPI channelling.ChannellingAPI, users *server.Users) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
// Validate incoming request. // Validate incoming request.
if r.Method != "GET" { if r.Method != "GET" {
@ -69,8 +70,20 @@ func makeWSHandler(connectionCounter channelling.ConnectionCounter, sessionManag
return return
} }
r.ParseForm()
token := r.FormValue("t")
st := sessionManager.DecodeSessionToken(token)
var userid string
if users != nil {
userid, _ = users.GetUserID(r)
if userid == "" {
userid = st.Userid
}
}
// Create a new connection instance. // Create a new connection instance.
session := sessionManager.CreateSession(r) session := sessionManager.CreateSession(st, userid)
client := channelling.NewClient(codec, channellingAPI, session) client := channelling.NewClient(codec, channellingAPI, session)
conn := channelling.NewConnection(connectionCounter.CountConnection(), ws, client) conn := channelling.NewConnection(connectionCounter.CountConnection(), ws, client)

18
src/app/spreed-webrtc-server/main.go

@ -266,7 +266,8 @@ func runner(runtime phoenix.Runtime) error {
sessionManager := channelling.NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret) sessionManager := channelling.NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret)
statsManager := channelling.NewStatsManager(hub, roomManager, sessionManager) statsManager := channelling.NewStatsManager(hub, roomManager, sessionManager)
busManager := channelling.NewBusManager(natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject) busManager := channelling.NewBusManager(natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject)
channellingAPI := api.New(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager) pipelineManager := channelling.NewPipelineManager(busManager, sessionManager, sessionManager)
channellingAPI := api.New(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager, pipelineManager)
// Add handlers. // Add handlers.
r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler)) r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler))
@ -274,12 +275,8 @@ func runner(runtime phoenix.Runtime) error {
r.Handle("/static/{path:.*}", http.StripPrefix(config.B, httputils.FileStaticServer(http.Dir(rootFolder)))) r.Handle("/static/{path:.*}", http.StripPrefix(config.B, httputils.FileStaticServer(http.Dir(rootFolder))))
r.Handle("/robots.txt", http.StripPrefix(config.B, http.FileServer(http.Dir(path.Join(rootFolder, "static"))))) r.Handle("/robots.txt", http.StripPrefix(config.B, http.FileServer(http.Dir(path.Join(rootFolder, "static")))))
r.Handle("/favicon.ico", http.StripPrefix(config.B, http.FileServer(http.Dir(path.Join(rootFolder, "static", "img"))))) r.Handle("/favicon.ico", http.StripPrefix(config.B, http.FileServer(http.Dir(path.Join(rootFolder, "static", "img")))))
r.Handle("/ws", makeWSHandler(statsManager, sessionManager, codec, channellingAPI))
r.HandleFunc("/.well-known/spreed-configuration", wellKnownHandler) r.HandleFunc("/.well-known/spreed-configuration", wellKnownHandler)
// Simple room handler.
r.HandleFunc("/{room}", httputils.MakeGzipHandler(roomHandler))
// Sandbox handler. // Sandbox handler.
r.HandleFunc("/sandbox/{origin_scheme}/{origin_host}/{sandbox}.html", httputils.MakeGzipHandler(sandboxHandler)) r.HandleFunc("/sandbox/{origin_scheme}/{origin_host}/{sandbox}.html", httputils.MakeGzipHandler(sandboxHandler))
@ -289,9 +286,11 @@ func runner(runtime phoenix.Runtime) error {
rest.AddResource(&server.Rooms{}, "/rooms") rest.AddResource(&server.Rooms{}, "/rooms")
rest.AddResource(config, "/config") rest.AddResource(config, "/config")
rest.AddResourceWithWrapper(&server.Tokens{tokenProvider}, httputils.MakeGzipHandler, "/tokens") rest.AddResourceWithWrapper(&server.Tokens{tokenProvider}, httputils.MakeGzipHandler, "/tokens")
var users *server.Users
if config.UsersEnabled { if config.UsersEnabled {
// Create Users handler. // Create Users handler.
users := server.NewUsers(hub, tickets, sessionManager, config.UsersMode, serverRealm, runtime) users = server.NewUsers(hub, tickets, sessionManager, config.UsersMode, serverRealm, runtime)
rest.AddResource(&server.Sessions{tickets, hub, users}, "/sessions/{id}/") rest.AddResource(&server.Sessions{tickets, hub, users}, "/sessions/{id}/")
if config.UsersAllowRegistration { if config.UsersAllowRegistration {
rest.AddResource(users, "/users") rest.AddResource(users, "/users")
@ -301,6 +300,7 @@ func runner(runtime phoenix.Runtime) error {
rest.AddResourceWithWrapper(&server.Stats{statsManager}, httputils.MakeGzipHandler, "/stats") rest.AddResourceWithWrapper(&server.Stats{statsManager}, httputils.MakeGzipHandler, "/stats")
log.Println("Stats are enabled!") log.Println("Stats are enabled!")
} }
rest.AddResource(&server.Pipelines{pipelineManager, channellingAPI}, "/pipelines/{id}")
// Add extra/static support if configured and exists. // Add extra/static support if configured and exists.
if extraFolder != "" { if extraFolder != "" {
@ -311,6 +311,12 @@ func runner(runtime phoenix.Runtime) error {
} }
} }
// Finally add websocket handler.
r.Handle("/ws", makeWSHandler(statsManager, sessionManager, codec, channellingAPI, users))
// Simple room handler.
r.HandleFunc("/{room}", httputils.MakeGzipHandler(roomHandler))
// Map everything else to a room when it is a GET. // Map everything else to a room when it is a GET.
rooms := r.PathPrefix("/").Methods("GET").Subrouter() rooms := r.PathPrefix("/").Methods("GET").Subrouter()
rooms.HandleFunc("/{room:.*}", httputils.MakeGzipHandler(roomHandler)) rooms.HandleFunc("/{room:.*}", httputils.MakeGzipHandler(roomHandler))

Loading…
Cancel
Save