Browse Source

Merge pull request #272 from longsleep/pipeline-support

Pipeline and NATS virtual session support
release-0.25
Simon Eisenmann 9 years ago
parent
commit
9e187c0e95
  1. 2
      .gitignore
  2. 21
      go/channelling/api.go
  3. 29
      go/channelling/api/api.go
  4. 11
      go/channelling/api/api_test.go
  5. 6
      go/channelling/api/handle_chat.go
  6. 2
      go/channelling/api/handle_conference.go
  7. 2
      go/channelling/api/handle_self.go
  8. 41
      go/channelling/bus.go
  9. 228
      go/channelling/bus_manager.go
  10. 7
      go/channelling/client.go
  11. 36
      go/channelling/data.go
  12. 15
      go/channelling/hub.go
  13. 266
      go/channelling/pipeline.go
  14. 234
      go/channelling/pipeline_manager.go
  15. 4
      go/channelling/roomworker.go
  16. 98
      go/channelling/server/pipelines.go
  17. 37
      go/channelling/server/users.go
  18. 56
      go/channelling/session.go
  19. 70
      go/channelling/session_manager.go
  20. 26
      go/channelling/sessioncreator.go
  21. 37
      go/channelling/sink.go
  22. 11
      go/channelling/tickets.go
  23. 2
      go/channelling/unicaster.go
  24. 26
      go/channelling/userstore.go
  25. 3
      go/natsconnection/natsconnection.go
  26. 17
      src/app/spreed-webrtc-server/handler_ws.go
  27. 35
      src/app/spreed-webrtc-server/main.go

2
.gitignore vendored

@ -4,7 +4,7 @@
/src/github.com /src/github.com
/src/golang.struktur.de /src/golang.struktur.de
/*.pprof /*.pprof
/server.conf /server.conf*
/*.log /*.log
/changelog*.txt /changelog*.txt
/src/styles/.sass-cache /src/styles/.sass-cache

21
go/channelling/api.go

@ -26,3 +26,24 @@ type ChannellingAPI interface {
OnDisconnect(*Client, *Session) OnDisconnect(*Client, *Session)
OnIncoming(Sender, *Session, *DataIncoming) (interface{}, error) OnIncoming(Sender, *Session, *DataIncoming) (interface{}, error)
} }
type ChannellingAPIConsumer interface {
SetChannellingAPI(ChannellingAPI)
GetChannellingAPI() ChannellingAPI
}
type channellingAPIConsumer struct {
ChannellingAPI ChannellingAPI
}
func NewChannellingAPIConsumer() ChannellingAPIConsumer {
return &channellingAPIConsumer{}
}
func (c *channellingAPIConsumer) SetChannellingAPI(api ChannellingAPI) {
c.ChannellingAPI = api
}
func (c *channellingAPIConsumer) GetChannellingAPI() ChannellingAPI {
return c.ChannellingAPI
}

29
go/channelling/api/api.go

@ -41,6 +41,7 @@ type channellingAPI struct {
TurnDataCreator channelling.TurnDataCreator TurnDataCreator channelling.TurnDataCreator
Unicaster channelling.Unicaster Unicaster channelling.Unicaster
BusManager channelling.BusManager BusManager channelling.BusManager
PipelineManager channelling.PipelineManager
config *channelling.Config config *channelling.Config
} }
@ -55,7 +56,8 @@ func New(config *channelling.Config,
contactManager channelling.ContactManager, contactManager channelling.ContactManager,
turnDataCreator channelling.TurnDataCreator, turnDataCreator channelling.TurnDataCreator,
unicaster channelling.Unicaster, unicaster channelling.Unicaster,
busManager channelling.BusManager) channelling.ChannellingAPI { busManager channelling.BusManager,
pipelineManager channelling.PipelineManager) channelling.ChannellingAPI {
return &channellingAPI{ return &channellingAPI{
roomStatus, roomStatus,
sessionEncoder, sessionEncoder,
@ -65,6 +67,7 @@ func New(config *channelling.Config,
turnDataCreator, turnDataCreator,
unicaster, unicaster,
busManager, busManager,
pipelineManager,
config, config,
} }
} }
@ -73,17 +76,18 @@ func (api *channellingAPI) OnConnect(client *channelling.Client, session *channe
api.Unicaster.OnConnect(client, session) api.Unicaster.OnConnect(client, session)
self, err := api.HandleSelf(session) self, err := api.HandleSelf(session)
if err == nil { if err == nil {
api.BusManager.Trigger(channelling.BusManagerConnect, session.Id, "", nil) api.BusManager.Trigger(channelling.BusManagerConnect, session.Id, "", nil, nil)
} }
return self, err return self, err
} }
func (api *channellingAPI) OnDisconnect(client *channelling.Client, session *channelling.Session) { func (api *channellingAPI) OnDisconnect(client *channelling.Client, session *channelling.Session) {
api.Unicaster.OnDisconnect(client, session) api.Unicaster.OnDisconnect(client, session)
api.BusManager.Trigger(channelling.BusManagerDisconnect, session.Id, "", nil) api.BusManager.Trigger(channelling.BusManagerDisconnect, session.Id, "", nil, nil)
} }
func (api *channellingAPI) OnIncoming(sender channelling.Sender, session *channelling.Session, msg *channelling.DataIncoming) (interface{}, error) { func (api *channellingAPI) OnIncoming(sender channelling.Sender, session *channelling.Session, msg *channelling.DataIncoming) (interface{}, error) {
var pipeline *channelling.Pipeline
switch msg.Type { switch msg.Type {
case "Self": case "Self":
return api.HandleSelf(session) return api.HandleSelf(session)
@ -99,31 +103,34 @@ func (api *channellingAPI) OnIncoming(sender channelling.Sender, session *channe
break break
} }
if _, ok := msg.Offer.Offer["_token"]; !ok { if _, ok := msg.Offer.Offer["_token"]; !ok {
pipeline = api.PipelineManager.GetPipeline(channelling.PipelineNamespaceCall, sender, session, msg.Offer.To)
// Trigger offer event when offer has no token, so this is // Trigger offer event when offer has no token, so this is
// not triggered for peerxfer and peerscreenshare offers. // not triggered for peerxfer and peerscreenshare offers.
api.BusManager.Trigger(channelling.BusManagerOffer, session.Id, msg.Offer.To, nil) api.BusManager.Trigger(channelling.BusManagerOffer, session.Id, msg.Offer.To, nil, pipeline)
} }
session.Unicast(msg.Offer.To, msg.Offer) session.Unicast(msg.Offer.To, msg.Offer, pipeline)
case "Candidate": case "Candidate":
if msg.Candidate == nil || msg.Candidate.Candidate == nil { if msg.Candidate == nil || msg.Candidate.Candidate == nil {
log.Println("Received invalid candidate message.", msg) log.Println("Received invalid candidate message.", msg)
break break
} }
session.Unicast(msg.Candidate.To, msg.Candidate) pipeline = api.PipelineManager.GetPipeline(channelling.PipelineNamespaceCall, sender, session, msg.Candidate.To)
session.Unicast(msg.Candidate.To, msg.Candidate, pipeline)
case "Answer": case "Answer":
if msg.Answer == nil || msg.Answer.Answer == nil { if msg.Answer == nil || msg.Answer.Answer == nil {
log.Println("Received invalid answer message.", msg) log.Println("Received invalid answer message.", msg)
break break
} }
if _, ok := msg.Answer.Answer["_token"]; !ok { if _, ok := msg.Answer.Answer["_token"]; !ok {
pipeline = api.PipelineManager.GetPipeline(channelling.PipelineNamespaceCall, sender, session, msg.Answer.To)
// Trigger answer event when answer has no token. so this is // Trigger answer event when answer has no token. so this is
// not triggered for peerxfer and peerscreenshare answers. // not triggered for peerxfer and peerscreenshare answers.
api.BusManager.Trigger(channelling.BusManagerAnswer, session.Id, msg.Answer.To, nil) api.BusManager.Trigger(channelling.BusManagerAnswer, session.Id, msg.Answer.To, nil, pipeline)
} }
session.Unicast(msg.Answer.To, msg.Answer) session.Unicast(msg.Answer.To, msg.Answer, pipeline)
case "Users": case "Users":
return api.HandleUsers(session) return api.HandleUsers(session)
case "Authentication": case "Authentication":
@ -137,9 +144,11 @@ func (api *channellingAPI) OnIncoming(sender channelling.Sender, session *channe
log.Println("Received invalid bye message.", msg) log.Println("Received invalid bye message.", msg)
break break
} }
api.BusManager.Trigger(channelling.BusManagerBye, session.Id, msg.Bye.To, nil) pipeline = api.PipelineManager.GetPipeline(channelling.PipelineNamespaceCall, sender, session, msg.Bye.To)
api.BusManager.Trigger(channelling.BusManagerBye, session.Id, msg.Bye.To, nil, pipeline)
session.Unicast(msg.Bye.To, msg.Bye) session.Unicast(msg.Bye.To, msg.Bye, pipeline)
pipeline.Close()
case "Status": case "Status":
if msg.Status == nil { if msg.Status == nil {
log.Println("Received invalid status message.", msg) log.Println("Received invalid status message.", msg)

11
go/channelling/api/api_test.go

@ -35,6 +35,10 @@ import (
type fakeClient struct { type fakeClient struct {
} }
func (fake *fakeClient) Index() uint64 {
return 0
}
func (fake *fakeClient) Send(_ buffercache.Buffer) { func (fake *fakeClient) Send(_ buffercache.Buffer) {
} }
@ -79,11 +83,14 @@ func (fake *fakeRoomManager) MakeRoomID(roomName, roomType string) string {
} }
func NewTestChannellingAPI() (channelling.ChannellingAPI, *fakeClient, *channelling.Session, *fakeRoomManager) { func NewTestChannellingAPI() (channelling.ChannellingAPI, *fakeClient, *channelling.Session, *fakeRoomManager) {
apiConsumer := channelling.NewChannellingAPIConsumer()
client, roomManager := &fakeClient{}, &fakeRoomManager{} client, roomManager := &fakeClient{}, &fakeRoomManager{}
sessionNonces := securecookie.New(securecookie.GenerateRandomKey(64), nil) sessionNonces := securecookie.New(securecookie.GenerateRandomKey(64), nil)
session := channelling.NewSession(nil, nil, roomManager, roomManager, nil, sessionNonces, "", "") session := channelling.NewSession(nil, nil, roomManager, roomManager, nil, sessionNonces, "", "")
busManager := channelling.NewBusManager("", false, "") busManager := channelling.NewBusManager(apiConsumer, "", false, "")
return New(nil, roomManager, nil, nil, nil, nil, nil, nil, busManager), client, session, roomManager api := New(nil, roomManager, nil, nil, nil, nil, nil, nil, busManager, nil)
apiConsumer.SetChannellingAPI(api)
return api, client, session, roomManager
} }
func Test_ChannellingAPI_OnIncoming_HelloMessage_JoinsTheSelectedRoom(t *testing.T) { func Test_ChannellingAPI_OnIncoming_HelloMessage_JoinsTheSelectedRoom(t *testing.T) {

6
go/channelling/api/handle_chat.go

@ -34,7 +34,7 @@ func (api *channellingAPI) HandleChat(session *channelling.Session, chat *channe
to := chat.To to := chat.To
if !msg.NoEcho { if !msg.NoEcho {
session.Unicast(session.Id, chat) session.Unicast(session.Id, chat, nil)
} }
msg.Time = time.Now().Format(time.RFC3339) msg.Time = time.Now().Format(time.RFC3339)
if to == "" { if to == "" {
@ -59,10 +59,10 @@ func (api *channellingAPI) HandleChat(session *channelling.Session, chat *channe
api.StatsCounter.CountUnicastChat() api.StatsCounter.CountUnicastChat()
} }
session.Unicast(to, chat) session.Unicast(to, chat, nil)
if msg.Mid != "" { if msg.Mid != "" {
// Send out delivery confirmation status chat message. // Send out delivery confirmation status chat message.
session.Unicast(session.Id, &channelling.DataChat{To: to, Type: "Chat", Chat: &channelling.DataChatMessage{Mid: msg.Mid, Status: &channelling.DataChatStatus{State: "sent"}}}) session.Unicast(session.Id, &channelling.DataChat{To: to, Type: "Chat", Chat: &channelling.DataChatMessage{Mid: msg.Mid, Status: &channelling.DataChatStatus{State: "sent"}}}, nil)
} }
} }
} }

2
go/channelling/api/handle_conference.go

@ -37,7 +37,7 @@ func (api *channellingAPI) HandleConference(session *channelling.Session, confer
// Send conference update to anyone. // Send conference update to anyone.
for _, id := range conference.Conference { for _, id := range conference.Conference {
if id != session.Id { if id != session.Id {
session.Unicast(id, conference) session.Unicast(id, conference, nil)
} }
} }
} }

2
go/channelling/api/handle_self.go

@ -47,7 +47,7 @@ func (api *channellingAPI) HandleSelf(session *channelling.Session) (*channellin
Turn: api.TurnDataCreator.CreateTurnData(session), Turn: api.TurnDataCreator.CreateTurnData(session),
Stun: api.config.StunURIs, Stun: api.config.StunURIs,
} }
api.BusManager.Trigger(channelling.BusManagerSession, session.Id, session.Userid(), nil) api.BusManager.Trigger(channelling.BusManagerSession, session.Id, session.Userid(), nil, nil)
return self, nil return self, nil
} }

41
go/channelling/bus.go

@ -0,0 +1,41 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
type SessionCreateRequest struct {
Id string
Session *DataSession
Room *DataRoom
SetAsDefault bool
}
type DataSink struct {
SubjectOut string `json:subject_out"`
SubjectIn string `json:subject_in"`
}
type DataSinkOutgoing struct {
Outgoing *DataOutgoing
ToUserid string
FromUserid string
Pipe string `json:",omitempty"`
}

228
go/channelling/bus_manager.go

@ -25,6 +25,10 @@ import (
"errors" "errors"
"fmt" "fmt"
"log" "log"
"sync"
"time"
"github.com/nats-io/nats"
"github.com/strukturag/spreed-webrtc/go/natsconnection" "github.com/strukturag/spreed-webrtc/go/natsconnection"
) )
@ -41,17 +45,27 @@ const (
// A BusManager provides the API to interact with a bus. // A BusManager provides the API to interact with a bus.
type BusManager interface { type BusManager interface {
Trigger(name, from, payload string, data interface{}) error ChannellingAPIConsumer
Start()
Publish(subject string, v interface{}) error
Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error
Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error
Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error)
BindRecvChan(subject string, channel interface{}) (*nats.Subscription, error)
BindSendChan(subject string, channel interface{}) error
PrefixSubject(string) string
CreateSink(string) Sink
} }
// A BusTrigger is a container to serialize trigger events // A BusTrigger is a container to serialize trigger events
// for the bus backend. // for the bus backend.
type BusTrigger struct { type BusTrigger struct {
Id string Id string
Name string Name string
From string From string
Payload string `json:",omitempty"` Payload string `json:",omitempty"`
Data interface{} `json:",omitempty"` Data interface{} `json:",omitempty"`
Pipeline string `json:",omitempty"`
} }
// BusSubjectTrigger returns the bus subject for trigger payloads. // BusSubjectTrigger returns the bus subject for trigger payloads.
@ -59,50 +73,77 @@ func BusSubjectTrigger(prefix, suffix string) string {
return fmt.Sprintf("%s.%s", prefix, suffix) return fmt.Sprintf("%s.%s", prefix, suffix)
} }
type busManager struct {
BusManager
}
// NewBusManager creates and initializes a new BusMager with the // NewBusManager creates and initializes a new BusMager with the
// provided flags for NATS support. It is intended to connect the // provided flags for NATS support. It is intended to connect the
// backend bus with a easy to use API to send and receive bus data. // backend bus with a easy to use API to send and receive bus data.
func NewBusManager(id string, useNats bool, subjectPrefix string) BusManager { func NewBusManager(apiConsumer ChannellingAPIConsumer, id string, useNats bool, subjectPrefix string) BusManager {
var b BusManager var b BusManager
var err error var err error
if useNats { if useNats {
b, err = newNatsBus(id, subjectPrefix) b, err = newNatsBus(apiConsumer, id, subjectPrefix)
if err == nil { if err == nil {
log.Println("NATS bus connected") log.Println("NATS bus connected")
} else { } else {
log.Println("Error connecting NATS bus", err) log.Println("Error connecting NATS bus", err)
b = &noopBus{id} b = &noopBus{apiConsumer, id}
} }
} else { } else {
b = &noopBus{id} b = &noopBus{apiConsumer, id}
}
if err == nil {
b.Trigger(BusManagerStartup, id, "", nil)
} }
return &busManager{b} return b
} }
type noopBus struct { type noopBus struct {
ChannellingAPIConsumer
id string id string
} }
func (bus *noopBus) Trigger(name, from, payload string, data interface{}) error { func (bus *noopBus) Start() {
// noop
}
func (bus *noopBus) Publish(subject string, v interface{}) error {
return nil
}
func (bus *noopBus) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error {
return nil
}
func (bus *noopBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error {
return nil
}
func (bus *noopBus) PrefixSubject(subject string) string {
return subject
}
func (bus *noopBus) BindRecvChan(subject string, channel interface{}) (*nats.Subscription, error) {
return nil, nil
}
func (bus *noopBus) BindSendChan(subject string, channel interface{}) error {
return nil
}
func (bus *noopBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) {
return nil, nil
}
func (bus *noopBus) CreateSink(id string) Sink {
return nil return nil
} }
type natsBus struct { type natsBus struct {
ChannellingAPIConsumer
id string id string
prefix string prefix string
ec *natsconnection.EncodedConnection ec *natsconnection.EncodedConnection
triggerQueue chan *busQueueEntry triggerQueue chan *busQueueEntry
} }
func newNatsBus(id, prefix string) (*natsBus, error) { func newNatsBus(apiConsumer ChannellingAPIConsumer, id, prefix string) (*natsBus, error) {
ec, err := natsconnection.EstablishJSONEncodedConnection(nil) ec, err := natsconnection.EstablishJSONEncodedConnection(nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -112,34 +153,68 @@ func newNatsBus(id, prefix string) (*natsBus, error) {
} }
// Create buffered channel for outbound NATS data. // Create buffered channel for outbound NATS data.
triggerQueue := make(chan *busQueueEntry, 50) triggerQueue := make(chan *busQueueEntry, 50)
return &natsBus{apiConsumer, id, prefix, ec, triggerQueue}, nil
}
func (bus *natsBus) Start() {
// Start go routine to process outbount NATS publishing. // Start go routine to process outbount NATS publishing.
go chPublish(ec, triggerQueue) go chPublish(bus.ec, bus.triggerQueue)
bus.Trigger(BusManagerStartup, bus.id, "", nil, nil)
}
return &natsBus{id, prefix, ec, triggerQueue}, nil func (bus *natsBus) Publish(subject string, v interface{}) error {
return bus.ec.Publish(subject, v)
} }
func (bus *natsBus) Trigger(name, from, payload string, data interface{}) (err error) { func (bus *natsBus) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error {
if bus.ec != nil { return bus.ec.Request(subject, v, vPtr, timeout)
trigger := &BusTrigger{ }
Id: bus.id,
Name: name, func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) (err error) {
From: from, trigger := &BusTrigger{
Payload: payload, Id: bus.id,
Data: data, Name: name,
} From: from,
entry := &busQueueEntry{BusSubjectTrigger(bus.prefix, name), trigger} Payload: payload,
select { Data: data,
case bus.triggerQueue <- entry: }
// sent ok if pipeline != nil {
default: trigger.Pipeline = pipeline.GetID()
log.Println("Failed to queue NATS event - queue full?") }
err = errors.New("NATS trigger queue full") entry := &busQueueEntry{BusSubjectTrigger(bus.prefix, name), trigger}
} select {
case bus.triggerQueue <- entry:
// sent ok
default:
log.Println("Failed to queue NATS event - queue full?")
err = errors.New("NATS trigger queue full")
} }
return err return err
} }
func (bus *natsBus) PrefixSubject(sub string) string {
return fmt.Sprintf("%s.%s", bus.prefix, sub)
}
func (bus *natsBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) {
return bus.ec.Subscribe(subject, cb)
}
func (bus *natsBus) BindRecvChan(subject string, channel interface{}) (*nats.Subscription, error) {
return bus.ec.BindRecvChan(subject, channel)
}
func (bus *natsBus) BindSendChan(subject string, channel interface{}) error {
return bus.ec.BindSendChan(subject, channel)
}
func (bus *natsBus) CreateSink(id string) (sink Sink) {
sink = newNatsSink(bus, id)
return
}
type busQueueEntry struct { type busQueueEntry struct {
subject string subject string
data interface{} data interface{}
@ -154,3 +229,78 @@ func chPublish(ec *natsconnection.EncodedConnection, channel chan (*busQueueEntr
} }
} }
} }
type natsSink struct {
sync.RWMutex
id string
bm BusManager
closed bool
SubjectOut string
SubjectIn string
sub *nats.Subscription
sendQueue chan *DataSinkOutgoing
}
func newNatsSink(bm BusManager, id string) *natsSink {
sink := &natsSink{
id: id,
bm: bm,
SubjectOut: bm.PrefixSubject(fmt.Sprintf("sink.%s.out", id)),
SubjectIn: bm.PrefixSubject(fmt.Sprintf("sink.%s.in", id)),
}
sink.sendQueue = make(chan *DataSinkOutgoing, 100)
bm.BindSendChan(sink.SubjectOut, sink.sendQueue)
return sink
}
func (sink *natsSink) Write(outgoing *DataSinkOutgoing) (err error) {
if sink.Enabled() {
log.Println("Sending via NATS sink", sink.SubjectOut, outgoing)
sink.sendQueue <- outgoing
}
return err
}
func (sink *natsSink) Enabled() bool {
sink.RLock()
defer sink.RUnlock()
return sink.closed == false
}
func (sink *natsSink) Close() {
sink.Lock()
defer sink.Unlock()
if sink.sub != nil {
err := sink.sub.Unsubscribe()
if err != nil {
log.Println("Failed to unsubscribe NATS sink", err)
} else {
sink.sub = nil
}
}
sink.closed = true
}
func (sink *natsSink) Export() *DataSink {
return &DataSink{
SubjectOut: sink.SubjectOut,
SubjectIn: sink.SubjectIn,
}
}
func (sink *natsSink) BindRecvChan(channel interface{}) (*nats.Subscription, error) {
sink.Lock()
defer sink.Unlock()
if sink.sub != nil {
sink.sub.Unsubscribe()
sink.sub = nil
}
sub, err := sink.bm.BindRecvChan(sink.SubjectIn, channel)
if err != nil {
return nil, err
}
sink.sub = sub
return sub, nil
}

7
go/channelling/client.go

@ -28,14 +28,15 @@ import (
) )
type Sender interface { type Sender interface {
Index() uint64
Send(buffercache.Buffer) Send(buffercache.Buffer)
} }
type Client struct { type Client struct {
Codec
ChannellingAPI
Connection Connection
session *Session Codec
ChannellingAPI ChannellingAPI
session *Session
} }
func NewClient(codec Codec, api ChannellingAPI, session *Session) *Client { func NewClient(codec Codec, api ChannellingAPI, session *Session) *Client {

36
go/channelling/data.go

@ -182,27 +182,27 @@ type DataAutoCall struct {
type DataIncoming struct { type DataIncoming struct {
Type string Type string
Hello *DataHello Hello *DataHello `json:",omitempty"`
Offer *DataOffer Offer *DataOffer `json:",omitempty"`
Candidate *DataCandidate Candidate *DataCandidate `json:",omitempty"`
Answer *DataAnswer Answer *DataAnswer `json:",omitempty"`
Bye *DataBye Bye *DataBye `json:",omitempty"`
Status *DataStatus Status *DataStatus `json:",omitempty"`
Chat *DataChat Chat *DataChat `json:",omitempty"`
Conference *DataConference Conference *DataConference `json:",omitempty"`
Alive *DataAlive Alive *DataAlive `json:",omitempty"`
Authentication *DataAuthentication Authentication *DataAuthentication `json:",omitempty"`
Sessions *DataSessions Sessions *DataSessions `json:",omitempty"`
Room *DataRoom Room *DataRoom `json:",omitempty"`
Iid string `json:",omitempty"` Iid string `json:",omitempty"`
} }
type DataOutgoing struct { type DataOutgoing struct {
Data interface{} Data interface{} `json:",omitempty"`
From string From string `json:",omitempty"`
To string To string `json:",omitempty"`
Iid string `json:",omitempty"` Iid string `json:",omitempty"`
A string `json:",omitempty"` A string `json:",omitempty"`
} }
type DataSessions struct { type DataSessions struct {

15
go/channelling/hub.go

@ -155,13 +155,18 @@ func (h *hub) GetClient(sessionID string) (client *Client, ok bool) {
return return
} }
func (h *hub) Unicast(to string, outgoing *DataOutgoing) { func (h *hub) Unicast(to string, outgoing *DataOutgoing, pipeline *Pipeline) {
if message, err := h.EncodeOutgoing(outgoing); err == nil { client, ok := h.GetClient(to)
client, ok := h.GetClient(to) if pipeline != nil {
if !ok { if complete := pipeline.FlushOutgoing(h, client, to, outgoing); complete {
log.Println("Unicast To not found", to)
return return
} }
}
if !ok {
log.Println("Unicast To not found", to)
return
}
if message, err := h.EncodeOutgoing(outgoing); err == nil {
client.Send(message) client.Send(message)
message.Decref() message.Decref()
} }

266
go/channelling/pipeline.go

@ -0,0 +1,266 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
import (
"bytes"
"encoding/json"
"errors"
"log"
"sync"
"time"
"github.com/strukturag/spreed-webrtc/go/buffercache"
)
type PipelineFeedLine struct {
Seq int
Msg *DataOutgoing
}
type Pipeline struct {
PipelineManager PipelineManager
mutex sync.RWMutex
namespace string
id string
from *Session
to *Session
expires *time.Time
data []*DataSinkOutgoing
sink Sink
recvQueue chan *DataIncoming
closed bool
}
func NewPipeline(manager PipelineManager,
namespace string,
id string,
from *Session,
duration time.Duration) *Pipeline {
pipeline := &Pipeline{
PipelineManager: manager,
namespace: namespace,
id: id,
from: from,
recvQueue: make(chan *DataIncoming, 100),
}
go pipeline.receive()
pipeline.Refresh(duration)
return pipeline
}
func (pipeline *Pipeline) receive() {
// TODO(longsleep): Call to ToSession() should be avoided because it locks.
api := pipeline.PipelineManager.GetChannellingAPI()
for data := range pipeline.recvQueue {
_, err := api.OnIncoming(nil, pipeline.ToSession(), data)
if err != nil {
// TODO(longsleep): Handle reply and error.
log.Println("Pipeline receive incoming error", err)
}
}
log.Println("Pipeline receive done")
}
func (pipeline *Pipeline) GetID() string {
return pipeline.id
}
func (pipeline *Pipeline) Refresh(duration time.Duration) {
pipeline.mutex.Lock()
pipeline.refresh(duration)
pipeline.mutex.Unlock()
}
func (pipeline *Pipeline) refresh(duration time.Duration) {
expiration := time.Now().Add(duration)
pipeline.expires = &expiration
}
func (pipeline *Pipeline) Add(msg *DataSinkOutgoing) *Pipeline {
msg.Pipe = pipeline.id
pipeline.mutex.Lock()
pipeline.data = append(pipeline.data, msg)
pipeline.refresh(30 * time.Second)
pipeline.mutex.Unlock()
return pipeline
}
func (pipeline *Pipeline) Send(b buffercache.Buffer) {
// Noop.
}
func (pipeline *Pipeline) Index() uint64 {
return 0
}
func (pipeline *Pipeline) Close() {
pipeline.mutex.Lock()
if !pipeline.closed {
pipeline.expires = nil
if pipeline.sink != nil {
pipeline.sink = nil
}
close(pipeline.recvQueue)
pipeline.closed = true
log.Println("Closed pipeline")
}
pipeline.mutex.Unlock()
}
func (pipeline *Pipeline) Expired() bool {
var expired bool
pipeline.mutex.RLock()
if pipeline.expires == nil {
expired = true
} else {
expired = pipeline.expires.Before(time.Now())
}
pipeline.mutex.RUnlock()
return expired
}
func (pipeline *Pipeline) FromSession() *Session {
pipeline.mutex.RLock()
defer pipeline.mutex.RUnlock()
return pipeline.from
}
func (pipeline *Pipeline) ToSession() *Session {
pipeline.mutex.RLock()
defer pipeline.mutex.RUnlock()
return pipeline.to
}
func (pipeline *Pipeline) JSONFeed(since, limit int) ([]byte, error) {
pipeline.mutex.RLock()
var lineRaw []byte
var line *PipelineFeedLine
var buffer bytes.Buffer
var err error
data := pipeline.data[since:]
count := 0
for seq, msg := range data {
line = &PipelineFeedLine{
Seq: seq + since,
Msg: msg.Outgoing,
}
lineRaw, err = json.Marshal(line)
if err != nil {
return nil, err
}
buffer.Write(lineRaw)
buffer.WriteString("\n")
count++
if limit > 0 && count >= limit {
break
}
}
pipeline.mutex.RUnlock()
return buffer.Bytes(), nil
}
func (pipeline *Pipeline) FlushOutgoing(hub Hub, client *Client, to string, outgoing *DataOutgoing) bool {
//log.Println("Flush outgoing via pipeline", to, client == nil)
if client == nil {
sinkOutgoing := &DataSinkOutgoing{
Outgoing: outgoing,
}
pipeline.mutex.Lock()
sink := pipeline.sink
toSession := pipeline.to
fromSession := pipeline.from
for {
if sink != nil && sink.Enabled() {
// Sink it.
pipeline.mutex.Unlock()
break
}
sink, toSession = pipeline.PipelineManager.FindSinkAndSession(to)
if sink != nil {
pipeline.to = toSession
err := pipeline.attach(sink)
if err == nil {
pipeline.mutex.Unlock()
// Create incoming receiver.
sink.BindRecvChan(pipeline.recvQueue)
// Sink it.
break
}
}
// Not pipelined, do nothing.
pipeline.mutex.Unlock()
break
}
if fromSession != nil {
sinkOutgoing.FromUserid = fromSession.Userid()
}
if toSession != nil {
sinkOutgoing.ToUserid = toSession.Userid()
}
pipeline.Add(sinkOutgoing)
if sink != nil {
// Pipelined, sink data.
sink.Write(sinkOutgoing)
return true
}
}
return false
}
func (pipeline *Pipeline) Attach(sink Sink) error {
pipeline.mutex.Lock()
defer pipeline.mutex.Unlock()
// Sink existing data first.
log.Println("Attach sink to pipeline", pipeline.id)
err := pipeline.attach(sink)
if err == nil {
for _, msg := range pipeline.data {
log.Println("Flushing pipeline to sink after attach", len(pipeline.data))
sink.Write(msg)
}
}
return err
}
func (pipeline *Pipeline) attach(sink Sink) error {
if pipeline.sink != nil {
return errors.New("pipeline already attached to sink")
}
pipeline.sink = sink
return nil
}

234
go/channelling/pipeline_manager.go

@ -0,0 +1,234 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
import (
"fmt"
"log"
"sync"
"time"
)
const (
PipelineNamespaceCall = "call"
)
type PipelineManager interface {
BusManager
SessionStore
UserStore
SessionCreator
GetPipelineByID(id string) (pipeline *Pipeline, ok bool)
GetPipeline(namespace string, sender Sender, session *Session, to string) *Pipeline
FindSinkAndSession(to string) (Sink, *Session)
}
type pipelineManager struct {
BusManager
SessionStore
UserStore
SessionCreator
mutex sync.RWMutex
pipelineTable map[string]*Pipeline
sessionTable map[string]*Session
sessionByBusIDTable map[string]*Session
sessionSinkTable map[string]Sink
duration time.Duration
defaultSinkID string
}
func NewPipelineManager(busManager BusManager, sessionStore SessionStore, userStore UserStore, sessionCreator SessionCreator) PipelineManager {
plm := &pipelineManager{
BusManager: busManager,
SessionStore: sessionStore,
UserStore: userStore,
SessionCreator: sessionCreator,
pipelineTable: make(map[string]*Pipeline),
sessionTable: make(map[string]*Session),
sessionByBusIDTable: make(map[string]*Session),
sessionSinkTable: make(map[string]Sink),
duration: 60 * time.Second,
}
plm.start()
plm.Subscribe("channelling.session.create", plm.sessionCreate)
plm.Subscribe("channelling.session.close", plm.sessionClose)
return plm
}
func (plm *pipelineManager) cleanup() {
plm.mutex.Lock()
for id, pipeline := range plm.pipelineTable {
if pipeline.Expired() {
pipeline.Close()
delete(plm.pipelineTable, id)
}
}
plm.mutex.Unlock()
}
func (plm *pipelineManager) start() {
c := time.Tick(30 * time.Second)
go func() {
for _ = range c {
plm.cleanup()
}
}()
}
func (plm *pipelineManager) sessionCreate(subject, reply string, msg *SessionCreateRequest) {
log.Println("sessionCreate via NATS", subject, reply, msg)
if msg.Session == nil || msg.Id == "" {
return
}
var sink Sink
plm.mutex.Lock()
session, ok := plm.sessionByBusIDTable[msg.Id]
if ok {
// Remove existing session with same ID.
delete(plm.sessionTable, session.Id)
sink, _ = plm.sessionSinkTable[session.Id]
delete(plm.sessionSinkTable, session.Id)
session.Close()
}
session = plm.CreateSession(nil, "")
plm.sessionByBusIDTable[msg.Id] = session
plm.sessionTable[session.Id] = session
if sink == nil {
sink = plm.CreateSink(msg.Id)
log.Println("Created NATS sink", msg.Id)
}
if reply != "" {
// Always reply with our sink data
plm.Publish(reply, sink.Export())
}
plm.sessionSinkTable[session.Id] = sink
if msg.SetAsDefault {
plm.defaultSinkID = session.Id
log.Println("Using NATS sink as default session", session.Id)
}
plm.mutex.Unlock()
if msg.Session.Status != nil {
session.Status = msg.Session.Status
}
if msg.Session.Userid != "" {
session.SetUseridFake(msg.Session.Userid)
}
if msg.Room != nil {
room, err := session.JoinRoom(msg.Room.Name, msg.Room.Type, msg.Room.Credentials, nil)
log.Println("Joined NATS session to room", room, err)
}
session.BroadcastStatus()
}
func (plm *pipelineManager) sessionClose(subject, reply string, id string) {
log.Println("sessionClose via NATS", subject, reply, id)
if id == "" {
return
}
plm.mutex.Lock()
session, ok := plm.sessionByBusIDTable[id]
if ok {
delete(plm.sessionByBusIDTable, id)
delete(plm.sessionTable, session.Id)
if sink, ok := plm.sessionSinkTable[session.Id]; ok {
delete(plm.sessionSinkTable, session.Id)
sink.Close()
}
}
plm.mutex.Unlock()
if ok {
session.Close()
}
}
func (plm *pipelineManager) GetPipelineByID(id string) (*Pipeline, bool) {
plm.mutex.RLock()
pipeline, ok := plm.pipelineTable[id]
plm.mutex.RUnlock()
return pipeline, ok
}
func (plm *pipelineManager) PipelineID(namespace string, sender Sender, session *Session, to string) string {
return fmt.Sprintf("%s.%s.%s", namespace, session.Id, to)
}
func (plm *pipelineManager) GetPipeline(namespace string, sender Sender, session *Session, to string) *Pipeline {
id := plm.PipelineID(namespace, sender, session, to)
plm.mutex.Lock()
pipeline, ok := plm.pipelineTable[id]
if ok {
// Refresh. We do not care if the pipeline is expired.
pipeline.Refresh(plm.duration)
plm.mutex.Unlock()
return pipeline
}
log.Println("Creating pipeline", namespace, id)
pipeline = NewPipeline(plm, namespace, id, session, plm.duration)
plm.pipelineTable[id] = pipeline
plm.mutex.Unlock()
return pipeline
}
func (plm *pipelineManager) FindSinkAndSession(to string) (sink Sink, session *Session) {
plm.mutex.RLock()
var found bool
if sink, found = plm.sessionSinkTable[to]; found {
session, _ = plm.sessionTable[to]
plm.mutex.RUnlock()
if sink.Enabled() {
log.Println("Pipeline sink found via manager", sink)
return sink, session
}
} else {
plm.mutex.RUnlock()
}
if plm.defaultSinkID != "" && to != plm.defaultSinkID {
// Keep target to while returning a the default sink.
log.Println("Find sink via default sink ID", plm.defaultSinkID)
sink, _ = plm.FindSinkAndSession(plm.defaultSinkID)
if sink != nil {
if session, found = plm.GetSession(to); found {
return
}
}
}
return nil, nil
}

4
go/channelling/roomworker.go

@ -225,8 +225,8 @@ func (r *roomWorker) Broadcast(sessionID string, message buffercache.Buffer) {
worker := func() { worker := func() {
r.mutex.RLock() r.mutex.RLock()
for id, user := range r.users { for id, user := range r.users {
if id == sessionID { if id == sessionID || user.Sender == nil {
// Skip broadcast to self. // Skip broadcast to self or non existing sender.
continue continue
} }
//fmt.Printf("%s\n", m.Message) //fmt.Printf("%s\n", m.Message)

98
go/channelling/server/pipelines.go

@ -0,0 +1,98 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package server
import (
"encoding/json"
"net/http"
"strconv"
"github.com/strukturag/spreed-webrtc/go/channelling"
"github.com/gorilla/mux"
)
type Pipelines struct {
channelling.PipelineManager
API channelling.ChannellingAPI
}
func (pipelines *Pipelines) Get(request *http.Request) (int, interface{}, http.Header) {
vars := mux.Vars(request)
id, ok := vars["id"]
if !ok {
return http.StatusNotFound, "", nil
}
pipeline, ok := pipelines.GetPipelineByID(id)
if !ok {
return http.StatusNotFound, "", nil
}
since := 0
limit := 0
if sinceParam := request.Form.Get("since"); sinceParam != "" {
since, _ = strconv.Atoi(sinceParam)
}
if limitParam := request.Form.Get("limit"); limitParam != "" {
limit, _ = strconv.Atoi(limitParam)
}
result, err := pipeline.JSONFeed(since, limit)
if err != nil {
return http.StatusInternalServerError, err.Error(), nil
}
return http.StatusOK, result, nil
}
func (pipelines *Pipelines) Post(request *http.Request) (int, interface{}, http.Header) {
vars := mux.Vars(request)
id, ok := vars["id"]
if !ok {
return http.StatusNotFound, "", nil
}
pipeline, ok := pipelines.GetPipelineByID(id)
if !ok {
return http.StatusNotFound, "", nil
}
var incoming channelling.DataIncoming
dec := json.NewDecoder(request.Body)
if err := dec.Decode(&incoming); err != nil {
return http.StatusBadRequest, err.Error(), nil
}
result := &channelling.DataOutgoing{
From: pipeline.FromSession().Id,
Iid: incoming.Iid,
}
reply, err := pipelines.API.OnIncoming(pipeline, pipeline.ToSession(), &incoming)
if err == nil {
result.Data = reply
} else {
result.Data = err
}
return http.StatusOK, result, nil
}

37
go/channelling/server/users.go

@ -293,41 +293,27 @@ func (un *UserNonce) Response() (int, interface{}, http.Header) {
} }
type Users struct { type Users struct {
channelling.SessionStore
channelling.SessionValidator channelling.SessionValidator
channelling.SessionManager channelling.SessionManager
channelling.SessionStore
realm string realm string
handler UsersHandler handler UsersHandler
} }
func NewUsers(sessionStore channelling.SessionStore, sessionValidator channelling.SessionValidator, sessionManager channelling.SessionManager, mode, realm string, runtime phoenix.Runtime) *Users { func NewUsers(sessionStore channelling.SessionStore, sessionValidator channelling.SessionValidator, sessionManager channelling.SessionManager, mode, realm string, runtime phoenix.Runtime) *Users {
var users = &Users{ var users = &Users{
sessionStore,
sessionValidator, sessionValidator,
sessionManager, sessionManager,
sessionStore,
realm, realm,
nil, nil,
} }
// Create handler based on mode.
var handler UsersHandler var handler UsersHandler
var err error var err error
// Create handler based on mode.
if handler, err = users.createHandler(mode, runtime); handler != nil && err == nil { if handler, err = users.createHandler(mode, runtime); handler != nil && err == nil {
users.handler = handler users.handler = handler
// Register handler Get.
sessionManager.RetrieveUsersWith(func(request *http.Request) (userid string, err error) {
userid, err = handler.Get(request)
if err != nil {
log.Printf("Failed to get userid from handler: %s", err)
} else {
if userid != "" {
log.Printf("Users handler get success: %s\n", userid)
}
}
return
})
log.Printf("Enabled users handler '%s'\n", mode) log.Printf("Enabled users handler '%s'\n", mode)
} else if err != nil { } else if err != nil {
log.Printf("Failed to enable handler '%s': %s\n", mode, err) log.Printf("Failed to enable handler '%s': %s\n", mode, err)
@ -422,6 +408,21 @@ func (users *Users) createHandler(mode string, runtime phoenix.Runtime) (handler
} }
func (users *Users) GetUserID(request *http.Request) (userid string, err error) {
if users.handler == nil {
return
}
userid, err = users.handler.Get(request)
if err != nil {
log.Printf("Failed to get userid from handler: %s", err)
} else {
if userid != "" {
log.Printf("Users handler get success: %s\n", userid)
}
}
return
}
// Post is used to create new userids for this server. // Post is used to create new userids for this server.
func (users *Users) Post(request *http.Request) (int, interface{}, http.Header) { func (users *Users) Post(request *http.Request) (int, interface{}, http.Header) {
@ -465,7 +466,7 @@ func (users *Users) Post(request *http.Request) (int, interface{}, http.Header)
nonce string nonce string
err error err error
) )
if session, ok := users.GetSession(snr.Id); ok { if session, ok := users.SessionStore.GetSession(snr.Id); ok {
nonce, err = session.Authorize(users.Realm(), &channelling.SessionToken{Id: snr.Id, Sid: snr.Sid, Userid: userid}) nonce, err = session.Authorize(users.Realm(), &channelling.SessionToken{Id: snr.Id, Sid: snr.Sid, Userid: userid})
} else { } else {
err = errors.New("no such session") err = errors.New("no such session")

56
go/channelling/session.go

@ -33,30 +33,30 @@ import (
var sessionNonces *securecookie.SecureCookie var sessionNonces *securecookie.SecureCookie
type Session struct { type Session struct {
SessionManager SessionManager SessionManager
Unicaster Unicaster Unicaster
Broadcaster Broadcaster Broadcaster
RoomStatusManager RoomStatusManager RoomStatusManager
buddyImages ImageCache buddyImages ImageCache
Id string Id string
Sid string Sid string
Ua string Ua string
UpdateRev uint64 UpdateRev uint64
Status interface{} Status interface{}
Nonce string Nonce string
Prio int Prio int
Hello bool Hello bool
Roomid string Roomid string
mutex sync.RWMutex mutex sync.RWMutex
userid string userid string
fake bool fake bool
stamp int64 stamp int64
attestation *SessionAttestation attestation *SessionAttestation
attestations *securecookie.SecureCookie attestations *securecookie.SecureCookie
subscriptions map[string]*Session subscriptions map[string]*Session
subscribers map[string]*Session subscribers map[string]*Session
disconnected bool disconnected bool
replaced bool replaced bool
} }
func NewSession(manager SessionManager, func NewSession(manager SessionManager,
@ -194,7 +194,7 @@ func (s *Session) BroadcastStatus() {
s.mutex.RUnlock() s.mutex.RUnlock()
} }
func (s *Session) Unicast(to string, m interface{}) { func (s *Session) Unicast(to string, m interface{}, pipeline *Pipeline) {
s.mutex.RLock() s.mutex.RLock()
outgoing := &DataOutgoing{ outgoing := &DataOutgoing{
From: s.Id, From: s.Id,
@ -204,7 +204,7 @@ func (s *Session) Unicast(to string, m interface{}) {
} }
s.mutex.RUnlock() s.mutex.RUnlock()
s.Unicaster.Unicast(to, outgoing) s.Unicaster.Unicast(to, outgoing, pipeline)
} }
func (s *Session) Close() { func (s *Session) Close() {
@ -235,12 +235,12 @@ func (s *Session) Close() {
} }
for _, session := range s.subscribers { for _, session := range s.subscribers {
s.Unicaster.Unicast(session.Id, outgoing) s.Unicaster.Unicast(session.Id, outgoing, nil)
} }
for _, session := range s.subscriptions { for _, session := range s.subscriptions {
session.RemoveSubscriber(s.Id) session.RemoveSubscriber(s.Id)
s.Unicaster.Unicast(session.Id, outgoing) s.Unicaster.Unicast(session.Id, outgoing, nil)
} }
s.SessionManager.DestroySession(s.Id, s.userid) s.SessionManager.DestroySession(s.Id, s.userid)

70
go/channelling/session_manager.go

@ -35,11 +35,13 @@ type UserStats interface {
type SessionManager interface { type SessionManager interface {
UserStats UserStats
RetrieveUsersWith(func(*http.Request) (string, error)) SessionStore
CreateSession(*http.Request) *Session UserStore
SessionCreator
DestroySession(sessionID, userID string) DestroySession(sessionID, userID string)
Authenticate(*Session, *SessionToken, string) error Authenticate(*Session, *SessionToken, string) error
GetUserSessions(session *Session, id string) []*DataSession GetUserSessions(session *Session, id string) []*DataSession
DecodeSessionToken(token string) (st *SessionToken)
} }
type sessionManager struct { type sessionManager struct {
@ -48,12 +50,13 @@ type sessionManager struct {
Unicaster Unicaster
Broadcaster Broadcaster
RoomStatusManager RoomStatusManager
buddyImages ImageCache buddyImages ImageCache
config *Config config *Config
userTable map[string]*User userTable map[string]*User
fakesessionTable map[string]*Session sessionTable map[string]*Session
useridRetriever func(*http.Request) (string, error) sessionByUserIDTable map[string]*Session
attestations *securecookie.SecureCookie useridRetriever func(*http.Request) (string, error)
attestations *securecookie.SecureCookie
} }
func NewSessionManager(config *Config, tickets Tickets, unicaster Unicaster, broadcaster Broadcaster, rooms RoomStatusManager, buddyImages ImageCache, sessionSecret []byte) SessionManager { func NewSessionManager(config *Config, tickets Tickets, unicaster Unicaster, broadcaster Broadcaster, rooms RoomStatusManager, buddyImages ImageCache, sessionSecret []byte) SessionManager {
@ -67,6 +70,7 @@ func NewSessionManager(config *Config, tickets Tickets, unicaster Unicaster, bro
config, config,
make(map[string]*User), make(map[string]*User),
make(map[string]*Session), make(map[string]*Session),
make(map[string]*Session),
nil, nil,
nil, nil,
} }
@ -93,29 +97,31 @@ func (sessionManager *sessionManager) UserInfo(details bool) (userCount int, use
return return
} }
func (sessionManager *sessionManager) RetrieveUsersWith(retriever func(*http.Request) (string, error)) { // GetSession returns the client-less sessions created directly by sessionManager.
sessionManager.useridRetriever = retriever func (sessionManager *sessionManager) GetSession(id string) (*Session, bool) {
sessionManager.RLock()
defer sessionManager.RUnlock()
session, ok := sessionManager.sessionTable[id]
return session, ok
} }
func (sessionManager *sessionManager) CreateSession(request *http.Request) *Session { func (sessionManager *sessionManager) GetUser(id string) (*User, bool) {
request.ParseForm() sessionManager.RLock()
token := request.FormValue("t") defer sessionManager.RUnlock()
st := sessionManager.DecodeSessionToken(token)
user, ok := sessionManager.userTable[id]
var userid string return user, ok
if sessionManager.config.UsersEnabled { }
if sessionManager.useridRetriever != nil {
userid, _ = sessionManager.useridRetriever(request)
if userid == "" {
userid = st.Userid
}
}
}
func (sessionManager *sessionManager) CreateSession(st *SessionToken, userid string) *Session {
if st == nil {
st = sessionManager.DecodeSessionToken("")
}
session := NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid) session := NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid)
if userid != "" { if userid != "" {
// XXX(lcooper): Should errors be handled here? // Errors are ignored here, session is returned without userID when auth failed.
sessionManager.Authenticate(session, st, userid) sessionManager.Authenticate(session, st, userid)
} }
@ -128,10 +134,15 @@ func (sessionManager *sessionManager) DestroySession(sessionID, userID string) {
} }
sessionManager.Lock() sessionManager.Lock()
user, ok := sessionManager.userTable[userID] if user, ok := sessionManager.userTable[userID]; ok && user.RemoveSession(sessionID) {
if ok && user.RemoveSession(sessionID) {
delete(sessionManager.userTable, userID) delete(sessionManager.userTable, userID)
} }
if _, ok := sessionManager.sessionTable[sessionID]; ok {
delete(sessionManager.sessionTable, sessionID)
}
if session, ok := sessionManager.sessionByUserIDTable[userID]; ok && session.Id == sessionID {
delete(sessionManager.sessionByUserIDTable, sessionID)
}
sessionManager.Unlock() sessionManager.Unlock()
} }
@ -165,12 +176,13 @@ func (sessionManager *sessionManager) GetUserSessions(session *Session, userid s
if !ok { if !ok {
// No user. Create fake session. // No user. Create fake session.
sessionManager.Lock() sessionManager.Lock()
session, ok := sessionManager.fakesessionTable[userid] session, ok := sessionManager.sessionByUserIDTable[userid]
if !ok { if !ok {
st := sessionManager.FakeSessionToken(userid) st := sessionManager.FakeSessionToken(userid)
session = NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid) session = NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid)
session.SetUseridFake(st.Userid) session.SetUseridFake(st.Userid)
sessionManager.fakesessionTable[userid] = session sessionManager.sessionByUserIDTable[userid] = session
sessionManager.sessionTable[session.Id] = session
} }
sessionManager.Unlock() sessionManager.Unlock()
users = make([]*DataSession, 1, 1) users = make([]*DataSession, 1, 1)

26
go/channelling/sessioncreator.go

@ -0,0 +1,26 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
type SessionCreator interface {
CreateSession(st *SessionToken, userid string) *Session
}

37
go/channelling/sink.go

@ -0,0 +1,37 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
import (
"github.com/nats-io/nats"
)
// Sink connects a Pipeline with end points in both directions by
// getting attached to a Pipeline.
type Sink interface {
// Write sends outgoing data on the sink
Write(*DataSinkOutgoing) error
Enabled() bool
Close()
Export() *DataSink
BindRecvChan(channel interface{}) (*nats.Subscription, error)
}

11
go/channelling/tickets.go

@ -96,13 +96,12 @@ func (tickets *tickets) DecodeSessionToken(token string) (st *SessionToken) {
return return
} }
func (tickets *tickets) FakeSessionToken(userid string) *SessionToken { func (tickets *tickets) FakeSessionToken(userid string) (st *SessionToken) {
st := &SessionToken{} sid := fmt.Sprintf("fake-%s", randomstring.NewRandomString(27))
st.Sid = fmt.Sprintf("fake-%s", randomstring.NewRandomString(27)) id, _ := tickets.Encode("id", sid)
st.Id, _ = tickets.Encode("id", st.Sid) st = &SessionToken{Id: id, Sid: sid, Userid: userid}
st.Userid = userid
log.Println("Created new fake session id", st.Id) log.Println("Created new fake session id", st.Id)
return st return
} }
func (tickets *tickets) ValidateSession(id, sid string) bool { func (tickets *tickets) ValidateSession(id, sid string) bool {

2
go/channelling/unicaster.go

@ -25,5 +25,5 @@ type Unicaster interface {
SessionStore SessionStore
OnConnect(*Client, *Session) OnConnect(*Client, *Session)
OnDisconnect(*Client, *Session) OnDisconnect(*Client, *Session)
Unicast(to string, outgoing *DataOutgoing) Unicast(to string, outgoing *DataOutgoing, pipeline *Pipeline)
} }

26
go/channelling/userstore.go

@ -0,0 +1,26 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
type UserStore interface {
GetUser(id string) (user *User, ok bool)
}

3
go/natsconnection/natsconnection.go

@ -48,6 +48,9 @@ func NewConnection() (*Connection, error) {
ReconnectedCB: func(conn *nats.Conn) { ReconnectedCB: func(conn *nats.Conn) {
log.Println("NATS reconnected") log.Println("NATS reconnected")
}, },
AsyncErrorCB: func(conn *nats.Conn, sub *nats.Subscription, err error) {
log.Println("NATS async error", sub, err)
},
} }
nc, err := opts.Connect() nc, err := opts.Connect()

17
src/app/spreed-webrtc-server/handler_ws.go

@ -26,6 +26,7 @@ import (
"net/http" "net/http"
"github.com/strukturag/spreed-webrtc/go/channelling" "github.com/strukturag/spreed-webrtc/go/channelling"
"github.com/strukturag/spreed-webrtc/go/channelling/server"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
@ -52,7 +53,7 @@ var (
} }
) )
func makeWSHandler(connectionCounter channelling.ConnectionCounter, sessionManager channelling.SessionManager, codec channelling.Codec, channellingAPI channelling.ChannellingAPI) http.HandlerFunc { func makeWSHandler(connectionCounter channelling.ConnectionCounter, sessionManager channelling.SessionManager, codec channelling.Codec, channellingAPI channelling.ChannellingAPI, users *server.Users) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
// Validate incoming request. // Validate incoming request.
if r.Method != "GET" { if r.Method != "GET" {
@ -69,8 +70,20 @@ func makeWSHandler(connectionCounter channelling.ConnectionCounter, sessionManag
return return
} }
r.ParseForm()
token := r.FormValue("t")
st := sessionManager.DecodeSessionToken(token)
var userid string
if users != nil {
userid, _ = users.GetUserID(r)
if userid == "" {
userid = st.Userid
}
}
// Create a new connection instance. // Create a new connection instance.
session := sessionManager.CreateSession(r) session := sessionManager.CreateSession(st, userid)
client := channelling.NewClient(codec, channellingAPI, session) client := channelling.NewClient(codec, channellingAPI, session)
conn := channelling.NewConnection(connectionCounter.CountConnection(), ws, client) conn := channelling.NewConnection(connectionCounter.CountConnection(), ws, client)

35
src/app/spreed-webrtc-server/main.go

@ -80,6 +80,11 @@ func runner(runtime phoenix.Runtime) error {
statsEnabled = false statsEnabled = false
} }
pipelinesEnabled, err := runtime.GetBool("http", "pipelines")
if err != nil {
pipelinesEnabled = false
}
pprofListen, err := runtime.GetString("http", "pprofListen") pprofListen, err := runtime.GetString("http", "pprofListen")
if err == nil && pprofListen != "" { if err == nil && pprofListen != "" {
log.Printf("Starting pprof HTTP server on %s", pprofListen) log.Printf("Starting pprof HTTP server on %s", pprofListen)
@ -258,6 +263,7 @@ func runner(runtime phoenix.Runtime) error {
} }
// Prepare services. // Prepare services.
apiConsumer := channelling.NewChannellingAPIConsumer()
buddyImages := channelling.NewImageCache() buddyImages := channelling.NewImageCache()
codec := channelling.NewCodec(incomingCodecLimit) codec := channelling.NewCodec(incomingCodecLimit)
roomManager := channelling.NewRoomManager(config, codec) roomManager := channelling.NewRoomManager(config, codec)
@ -265,8 +271,15 @@ func runner(runtime phoenix.Runtime) error {
tickets := channelling.NewTickets(sessionSecret, encryptionSecret, computedRealm) tickets := channelling.NewTickets(sessionSecret, encryptionSecret, computedRealm)
sessionManager := channelling.NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret) sessionManager := channelling.NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret)
statsManager := channelling.NewStatsManager(hub, roomManager, sessionManager) statsManager := channelling.NewStatsManager(hub, roomManager, sessionManager)
busManager := channelling.NewBusManager(natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject) busManager := channelling.NewBusManager(apiConsumer, natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject)
channellingAPI := api.New(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager) pipelineManager := channelling.NewPipelineManager(busManager, sessionManager, sessionManager, sessionManager)
// Create API.
channellingAPI := api.New(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager, pipelineManager)
apiConsumer.SetChannellingAPI(channellingAPI)
// Start bus.
busManager.Start()
// Add handlers. // Add handlers.
r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler)) r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler))
@ -274,12 +287,8 @@ func runner(runtime phoenix.Runtime) error {
r.Handle("/static/{path:.*}", http.StripPrefix(config.B, httputils.FileStaticServer(http.Dir(rootFolder)))) r.Handle("/static/{path:.*}", http.StripPrefix(config.B, httputils.FileStaticServer(http.Dir(rootFolder))))
r.Handle("/robots.txt", http.StripPrefix(config.B, http.FileServer(http.Dir(path.Join(rootFolder, "static"))))) r.Handle("/robots.txt", http.StripPrefix(config.B, http.FileServer(http.Dir(path.Join(rootFolder, "static")))))
r.Handle("/favicon.ico", http.StripPrefix(config.B, http.FileServer(http.Dir(path.Join(rootFolder, "static", "img"))))) r.Handle("/favicon.ico", http.StripPrefix(config.B, http.FileServer(http.Dir(path.Join(rootFolder, "static", "img")))))
r.Handle("/ws", makeWSHandler(statsManager, sessionManager, codec, channellingAPI))
r.HandleFunc("/.well-known/spreed-configuration", wellKnownHandler) r.HandleFunc("/.well-known/spreed-configuration", wellKnownHandler)
// Simple room handler.
r.HandleFunc("/{room}", httputils.MakeGzipHandler(roomHandler))
// Sandbox handler. // Sandbox handler.
r.HandleFunc("/sandbox/{origin_scheme}/{origin_host}/{sandbox}.html", httputils.MakeGzipHandler(sandboxHandler)) r.HandleFunc("/sandbox/{origin_scheme}/{origin_host}/{sandbox}.html", httputils.MakeGzipHandler(sandboxHandler))
@ -289,9 +298,11 @@ func runner(runtime phoenix.Runtime) error {
rest.AddResource(&server.Rooms{}, "/rooms") rest.AddResource(&server.Rooms{}, "/rooms")
rest.AddResource(config, "/config") rest.AddResource(config, "/config")
rest.AddResourceWithWrapper(&server.Tokens{tokenProvider}, httputils.MakeGzipHandler, "/tokens") rest.AddResourceWithWrapper(&server.Tokens{tokenProvider}, httputils.MakeGzipHandler, "/tokens")
var users *server.Users
if config.UsersEnabled { if config.UsersEnabled {
// Create Users handler. // Create Users handler.
users := server.NewUsers(hub, tickets, sessionManager, config.UsersMode, serverRealm, runtime) users = server.NewUsers(hub, tickets, sessionManager, config.UsersMode, serverRealm, runtime)
rest.AddResource(&server.Sessions{tickets, hub, users}, "/sessions/{id}/") rest.AddResource(&server.Sessions{tickets, hub, users}, "/sessions/{id}/")
if config.UsersAllowRegistration { if config.UsersAllowRegistration {
rest.AddResource(users, "/users") rest.AddResource(users, "/users")
@ -301,6 +312,10 @@ func runner(runtime phoenix.Runtime) error {
rest.AddResourceWithWrapper(&server.Stats{statsManager}, httputils.MakeGzipHandler, "/stats") rest.AddResourceWithWrapper(&server.Stats{statsManager}, httputils.MakeGzipHandler, "/stats")
log.Println("Stats are enabled!") log.Println("Stats are enabled!")
} }
if pipelinesEnabled {
rest.AddResource(&server.Pipelines{pipelineManager, channellingAPI}, "/pipelines/{id}")
log.Println("Pipelines are enabled!")
}
// Add extra/static support if configured and exists. // Add extra/static support if configured and exists.
if extraFolder != "" { if extraFolder != "" {
@ -311,6 +326,12 @@ func runner(runtime phoenix.Runtime) error {
} }
} }
// Finally add websocket handler.
r.Handle("/ws", makeWSHandler(statsManager, sessionManager, codec, channellingAPI, users))
// Simple room handler.
r.HandleFunc("/{room}", httputils.MakeGzipHandler(roomHandler))
// Map everything else to a room when it is a GET. // Map everything else to a room when it is a GET.
rooms := r.PathPrefix("/").Methods("GET").Subrouter() rooms := r.PathPrefix("/").Methods("GET").Subrouter()
rooms.HandleFunc("/{room:.*}", httputils.MakeGzipHandler(roomHandler)) rooms.HandleFunc("/{room:.*}", httputils.MakeGzipHandler(roomHandler))

Loading…
Cancel
Save