diff --git a/go/channelling/api/api.go b/go/channelling/api/api.go
index 0c34ab9e..d7a9f244 100644
--- a/go/channelling/api/api.go
+++ b/go/channelling/api/api.go
@@ -41,6 +41,7 @@ type channellingAPI struct {
TurnDataCreator channelling.TurnDataCreator
Unicaster channelling.Unicaster
BusManager channelling.BusManager
+ PipelineManager channelling.PipelineManager
config *channelling.Config
}
@@ -55,7 +56,8 @@ func New(config *channelling.Config,
contactManager channelling.ContactManager,
turnDataCreator channelling.TurnDataCreator,
unicaster channelling.Unicaster,
- busManager channelling.BusManager) channelling.ChannellingAPI {
+ busManager channelling.BusManager,
+ pipelineManager channelling.PipelineManager) channelling.ChannellingAPI {
return &channellingAPI{
roomStatus,
sessionEncoder,
@@ -65,6 +67,7 @@ func New(config *channelling.Config,
turnDataCreator,
unicaster,
busManager,
+ pipelineManager,
config,
}
}
@@ -73,17 +76,18 @@ func (api *channellingAPI) OnConnect(client *channelling.Client, session *channe
api.Unicaster.OnConnect(client, session)
self, err := api.HandleSelf(session)
if err == nil {
- api.BusManager.Trigger(channelling.BusManagerConnect, session.Id, "", nil)
+ api.BusManager.Trigger(channelling.BusManagerConnect, session.Id, "", nil, nil)
}
return self, err
}
func (api *channellingAPI) OnDisconnect(client *channelling.Client, session *channelling.Session) {
api.Unicaster.OnDisconnect(client, session)
- api.BusManager.Trigger(channelling.BusManagerDisconnect, session.Id, "", nil)
+ api.BusManager.Trigger(channelling.BusManagerDisconnect, session.Id, "", nil, nil)
}
func (api *channellingAPI) OnIncoming(sender channelling.Sender, session *channelling.Session, msg *channelling.DataIncoming) (interface{}, error) {
+ var pipeline *channelling.Pipeline
switch msg.Type {
case "Self":
return api.HandleSelf(session)
@@ -99,19 +103,21 @@ func (api *channellingAPI) OnIncoming(sender channelling.Sender, session *channe
break
}
if _, ok := msg.Offer.Offer["_token"]; !ok {
+ pipeline = api.PipelineManager.GetPipeline(channelling.PipelineNamespaceCall, sender, session, msg.Offer.To)
// Trigger offer event when offer has no token, so this is
// not triggered for peerxfer and peerscreenshare offers.
- api.BusManager.Trigger(channelling.BusManagerOffer, session.Id, msg.Offer.To, nil)
+ api.BusManager.Trigger(channelling.BusManagerOffer, session.Id, msg.Offer.To, nil, pipeline)
}
- session.Unicast(msg.Offer.To, msg.Offer)
+ session.Unicast(msg.Offer.To, msg.Offer, pipeline)
case "Candidate":
if msg.Candidate == nil || msg.Candidate.Candidate == nil {
log.Println("Received invalid candidate message.", msg)
break
}
- session.Unicast(msg.Candidate.To, msg.Candidate)
+ pipeline = api.PipelineManager.GetPipeline(channelling.PipelineNamespaceCall, sender, session, msg.Candidate.To)
+ session.Unicast(msg.Candidate.To, msg.Candidate, pipeline)
case "Answer":
if msg.Answer == nil || msg.Answer.Answer == nil {
log.Println("Received invalid answer message.", msg)
@@ -120,10 +126,10 @@ func (api *channellingAPI) OnIncoming(sender channelling.Sender, session *channe
if _, ok := msg.Answer.Answer["_token"]; !ok {
// Trigger answer event when answer has no token. so this is
// not triggered for peerxfer and peerscreenshare answers.
- api.BusManager.Trigger(channelling.BusManagerAnswer, session.Id, msg.Answer.To, nil)
+ api.BusManager.Trigger(channelling.BusManagerAnswer, session.Id, msg.Answer.To, nil, pipeline)
}
- session.Unicast(msg.Answer.To, msg.Answer)
+ session.Unicast(msg.Answer.To, msg.Answer, pipeline)
case "Users":
return api.HandleUsers(session)
case "Authentication":
@@ -137,9 +143,11 @@ func (api *channellingAPI) OnIncoming(sender channelling.Sender, session *channe
log.Println("Received invalid bye message.", msg)
break
}
- api.BusManager.Trigger(channelling.BusManagerBye, session.Id, msg.Bye.To, nil)
+ pipeline = api.PipelineManager.GetPipeline(channelling.PipelineNamespaceCall, sender, session, msg.Bye.To)
+ api.BusManager.Trigger(channelling.BusManagerBye, session.Id, msg.Bye.To, nil, pipeline)
- session.Unicast(msg.Bye.To, msg.Bye)
+ session.Unicast(msg.Bye.To, msg.Bye, pipeline)
+ pipeline.Close()
case "Status":
if msg.Status == nil {
log.Println("Received invalid status message.", msg)
diff --git a/go/channelling/api/api_test.go b/go/channelling/api/api_test.go
index 7f3bea0d..7e0ccb23 100644
--- a/go/channelling/api/api_test.go
+++ b/go/channelling/api/api_test.go
@@ -35,6 +35,10 @@ import (
type fakeClient struct {
}
+func (fake *fakeClient) Index() uint64 {
+ return 0
+}
+
func (fake *fakeClient) Send(_ buffercache.Buffer) {
}
@@ -83,7 +87,7 @@ func NewTestChannellingAPI() (channelling.ChannellingAPI, *fakeClient, *channell
sessionNonces := securecookie.New(securecookie.GenerateRandomKey(64), nil)
session := channelling.NewSession(nil, nil, roomManager, roomManager, nil, sessionNonces, "", "")
busManager := channelling.NewBusManager("", false, "")
- return New(nil, roomManager, nil, nil, nil, nil, nil, nil, busManager), client, session, roomManager
+ return New(nil, roomManager, nil, nil, nil, nil, nil, nil, busManager, nil), client, session, roomManager
}
func Test_ChannellingAPI_OnIncoming_HelloMessage_JoinsTheSelectedRoom(t *testing.T) {
diff --git a/go/channelling/api/handle_chat.go b/go/channelling/api/handle_chat.go
index b1782cd8..de4eee1c 100644
--- a/go/channelling/api/handle_chat.go
+++ b/go/channelling/api/handle_chat.go
@@ -34,7 +34,7 @@ func (api *channellingAPI) HandleChat(session *channelling.Session, chat *channe
to := chat.To
if !msg.NoEcho {
- session.Unicast(session.Id, chat)
+ session.Unicast(session.Id, chat, nil)
}
msg.Time = time.Now().Format(time.RFC3339)
if to == "" {
@@ -59,10 +59,10 @@ func (api *channellingAPI) HandleChat(session *channelling.Session, chat *channe
api.StatsCounter.CountUnicastChat()
}
- session.Unicast(to, chat)
+ session.Unicast(to, chat, nil)
if msg.Mid != "" {
// Send out delivery confirmation status chat message.
- session.Unicast(session.Id, &channelling.DataChat{To: to, Type: "Chat", Chat: &channelling.DataChatMessage{Mid: msg.Mid, Status: &channelling.DataChatStatus{State: "sent"}}})
+ session.Unicast(session.Id, &channelling.DataChat{To: to, Type: "Chat", Chat: &channelling.DataChatMessage{Mid: msg.Mid, Status: &channelling.DataChatStatus{State: "sent"}}}, nil)
}
}
}
diff --git a/go/channelling/api/handle_conference.go b/go/channelling/api/handle_conference.go
index ed961dfb..f913d1d6 100644
--- a/go/channelling/api/handle_conference.go
+++ b/go/channelling/api/handle_conference.go
@@ -37,7 +37,7 @@ func (api *channellingAPI) HandleConference(session *channelling.Session, confer
// Send conference update to anyone.
for _, id := range conference.Conference {
if id != session.Id {
- session.Unicast(id, conference)
+ session.Unicast(id, conference, nil)
}
}
}
diff --git a/go/channelling/api/handle_self.go b/go/channelling/api/handle_self.go
index 504e5557..1c5e118d 100644
--- a/go/channelling/api/handle_self.go
+++ b/go/channelling/api/handle_self.go
@@ -47,7 +47,7 @@ func (api *channellingAPI) HandleSelf(session *channelling.Session) (*channellin
Turn: api.TurnDataCreator.CreateTurnData(session),
Stun: api.config.StunURIs,
}
- api.BusManager.Trigger(channelling.BusManagerSession, session.Id, session.Userid(), nil)
+ api.BusManager.Trigger(channelling.BusManagerSession, session.Id, session.Userid(), nil, nil)
return self, nil
}
diff --git a/go/channelling/bus_manager.go b/go/channelling/bus_manager.go
index 50b24fec..ec99c60f 100644
--- a/go/channelling/bus_manager.go
+++ b/go/channelling/bus_manager.go
@@ -25,7 +25,9 @@ import (
"errors"
"fmt"
"log"
+ "time"
+ "github.com/nats-io/nats"
"github.com/strukturag/spreed-webrtc/go/natsconnection"
)
@@ -41,17 +43,21 @@ const (
// A BusManager provides the API to interact with a bus.
type BusManager interface {
- Trigger(name, from, payload string, data interface{}) error
+ Publish(subject string, v interface{}) error
+ Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error
+ Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error
+ Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error)
}
// A BusTrigger is a container to serialize trigger events
// for the bus backend.
type BusTrigger struct {
- Id string
- Name string
- From string
- Payload string `json:",omitempty"`
- Data interface{} `json:",omitempty"`
+ Id string
+ Name string
+ From string
+ Payload string `json:",omitempty"`
+ Data interface{} `json:",omitempty"`
+ Pipeline string `json:",omitempty"`
}
// BusSubjectTrigger returns the bus subject for trigger payloads.
@@ -81,7 +87,7 @@ func NewBusManager(id string, useNats bool, subjectPrefix string) BusManager {
b = &noopBus{id}
}
if err == nil {
- b.Trigger(BusManagerStartup, id, "", nil)
+ b.Trigger(BusManagerStartup, id, "", nil, nil)
}
return &busManager{b}
@@ -91,10 +97,22 @@ type noopBus struct {
id string
}
-func (bus *noopBus) Trigger(name, from, payload string, data interface{}) error {
+func (bus *noopBus) Publish(subject string, v interface{}) error {
return nil
}
+func (bus *noopBus) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error {
+ return nil
+}
+
+func (bus *noopBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error {
+ return nil
+}
+
+func (bus *noopBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) {
+ return nil, nil
+}
+
type natsBus struct {
id string
prefix string
@@ -118,28 +136,41 @@ func newNatsBus(id, prefix string) (*natsBus, error) {
return &natsBus{id, prefix, ec, triggerQueue}, nil
}
-func (bus *natsBus) Trigger(name, from, payload string, data interface{}) (err error) {
- if bus.ec != nil {
- trigger := &BusTrigger{
- Id: bus.id,
- Name: name,
- From: from,
- Payload: payload,
- Data: data,
- }
- entry := &busQueueEntry{BusSubjectTrigger(bus.prefix, name), trigger}
- select {
- case bus.triggerQueue <- entry:
- // sent ok
- default:
- log.Println("Failed to queue NATS event - queue full?")
- err = errors.New("NATS trigger queue full")
- }
+func (bus *natsBus) Publish(subject string, v interface{}) error {
+ return bus.ec.Publish(subject, v)
+}
+
+func (bus *natsBus) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error {
+ return bus.ec.Request(subject, v, vPtr, timeout)
+}
+
+func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) (err error) {
+ trigger := &BusTrigger{
+ Id: bus.id,
+ Name: name,
+ From: from,
+ Payload: payload,
+ Data: data,
+ }
+ if pipeline != nil {
+ trigger.Pipeline = pipeline.GetID()
+ }
+ entry := &busQueueEntry{BusSubjectTrigger(bus.prefix, name), trigger}
+ select {
+ case bus.triggerQueue <- entry:
+ // sent ok
+ default:
+ log.Println("Failed to queue NATS event - queue full?")
+ err = errors.New("NATS trigger queue full")
}
return err
}
+func (bus *natsBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) {
+ return bus.ec.Subscribe(subject, cb)
+}
+
type busQueueEntry struct {
subject string
data interface{}
diff --git a/go/channelling/client.go b/go/channelling/client.go
index 3310dcd3..f845bb37 100644
--- a/go/channelling/client.go
+++ b/go/channelling/client.go
@@ -28,14 +28,15 @@ import (
)
type Sender interface {
+ Index() uint64
Send(buffercache.Buffer)
}
type Client struct {
- Codec
- ChannellingAPI
Connection
- session *Session
+ Codec
+ ChannellingAPI ChannellingAPI
+ session *Session
}
func NewClient(codec Codec, api ChannellingAPI, session *Session) *Client {
diff --git a/go/channelling/data.go b/go/channelling/data.go
index 8456a839..960231b7 100644
--- a/go/channelling/data.go
+++ b/go/channelling/data.go
@@ -182,27 +182,27 @@ type DataAutoCall struct {
type DataIncoming struct {
Type string
- Hello *DataHello
- Offer *DataOffer
- Candidate *DataCandidate
- Answer *DataAnswer
- Bye *DataBye
- Status *DataStatus
- Chat *DataChat
- Conference *DataConference
- Alive *DataAlive
- Authentication *DataAuthentication
- Sessions *DataSessions
- Room *DataRoom
- Iid string `json:",omitempty"`
+ Hello *DataHello `json:",omitempty"`
+ Offer *DataOffer `json:",omitempty"`
+ Candidate *DataCandidate `json:",omitempty"`
+ Answer *DataAnswer `json:",omitempty"`
+ Bye *DataBye `json:",omitempty"`
+ Status *DataStatus `json:",omitempty"`
+ Chat *DataChat `json:",omitempty"`
+ Conference *DataConference `json:",omitempty"`
+ Alive *DataAlive `json:",omitempty"`
+ Authentication *DataAuthentication `json:",omitempty"`
+ Sessions *DataSessions `json:",omitempty"`
+ Room *DataRoom `json:",omitempty"`
+ Iid string `json:",omitempty"`
}
type DataOutgoing struct {
- Data interface{}
- From string
- To string
- Iid string `json:",omitempty"`
- A string `json:",omitempty"`
+ Data interface{} `json:",omitempty"`
+ From string `json:",omitempty"`
+ To string `json:",omitempty"`
+ Iid string `json:",omitempty"`
+ A string `json:",omitempty"`
}
type DataSessions struct {
diff --git a/go/channelling/hub.go b/go/channelling/hub.go
index c06088cd..057fcb60 100644
--- a/go/channelling/hub.go
+++ b/go/channelling/hub.go
@@ -155,13 +155,18 @@ func (h *hub) GetClient(sessionID string) (client *Client, ok bool) {
return
}
-func (h *hub) Unicast(to string, outgoing *DataOutgoing) {
- if message, err := h.EncodeOutgoing(outgoing); err == nil {
- client, ok := h.GetClient(to)
- if !ok {
- log.Println("Unicast To not found", to)
+func (h *hub) Unicast(to string, outgoing *DataOutgoing, pipeline *Pipeline) {
+ client, ok := h.GetClient(to)
+ if pipeline != nil {
+ if complete := pipeline.FlushOutgoing(h, client, to, outgoing); complete {
return
}
+ }
+ if !ok {
+ log.Println("Unicast To not found", to)
+ return
+ }
+ if message, err := h.EncodeOutgoing(outgoing); err == nil {
client.Send(message)
message.Decref()
}
diff --git a/go/channelling/pipeline.go b/go/channelling/pipeline.go
new file mode 100644
index 00000000..611849ba
--- /dev/null
+++ b/go/channelling/pipeline.go
@@ -0,0 +1,190 @@
+/*
+ * Spreed WebRTC.
+ * Copyright (C) 2013-2015 struktur AG
+ *
+ * This file is part of Spreed WebRTC.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+package channelling
+
+import (
+ "bytes"
+ "encoding/json"
+ "errors"
+ "log"
+ "sync"
+ "time"
+
+ "github.com/strukturag/spreed-webrtc/go/buffercache"
+)
+
+type PipelineFeedLine struct {
+ Seq int
+ Msg *DataOutgoing
+}
+
+type Pipeline struct {
+ PipelineManager PipelineManager
+ mutex sync.RWMutex
+ namespace string
+ id string
+ from *Session
+ expires *time.Time
+ data []*DataOutgoing
+ sink Sink
+}
+
+func NewPipeline(manager PipelineManager,
+ namespace string,
+ id string,
+ from *Session,
+ duration time.Duration) *Pipeline {
+ pipeline := &Pipeline{
+ PipelineManager: manager,
+ namespace: namespace,
+ id: id,
+ from: from,
+ }
+ pipeline.Refresh(duration)
+ return pipeline
+}
+
+func (pipeline *Pipeline) GetID() string {
+ return pipeline.id
+}
+
+func (pipeline *Pipeline) Refresh(duration time.Duration) {
+ pipeline.mutex.Lock()
+ expiration := time.Now().Add(duration)
+ pipeline.expires = &expiration
+ pipeline.mutex.Unlock()
+}
+
+func (pipeline *Pipeline) Add(msg *DataOutgoing) *Pipeline {
+ pipeline.mutex.Lock()
+ pipeline.data = append(pipeline.data, msg)
+ pipeline.mutex.Unlock()
+
+ return pipeline
+}
+
+func (pipeline *Pipeline) Send(b buffercache.Buffer) {
+ // noop for now
+}
+
+func (pipeline *Pipeline) Index() uint64 {
+ return 0
+}
+
+func (pipeline *Pipeline) Close() {
+ pipeline.mutex.Lock()
+ pipeline.expires = nil
+ if pipeline.sink != nil {
+ pipeline.sink.Close()
+ pipeline.sink = nil
+ }
+ pipeline.mutex.Unlock()
+}
+
+func (pipeline *Pipeline) Expired() bool {
+ var expired bool
+ pipeline.mutex.RLock()
+ if pipeline.expires == nil {
+ expired = true
+ } else {
+ expired = pipeline.expires.Before(time.Now())
+ }
+ pipeline.mutex.RUnlock()
+
+ return expired
+}
+
+func (pipeline *Pipeline) Session() *Session {
+ return pipeline.from
+}
+
+func (pipeline *Pipeline) JSONFeed(since, limit int) ([]byte, error) {
+ pipeline.mutex.RLock()
+ var lineRaw []byte
+ var line *PipelineFeedLine
+ var buffer bytes.Buffer
+ var err error
+ data := pipeline.data[since:]
+ count := 0
+ for seq, msg := range data {
+ line = &PipelineFeedLine{
+ Seq: seq + since,
+ Msg: msg,
+ }
+ lineRaw, err = json.Marshal(line)
+ if err != nil {
+ return nil, err
+ }
+ buffer.Write(lineRaw)
+ buffer.WriteString("\n")
+
+ count++
+ if limit > 0 && count >= limit {
+ break
+ }
+ }
+ pipeline.mutex.RUnlock()
+
+ return buffer.Bytes(), nil
+}
+
+func (pipeline *Pipeline) FlushOutgoing(hub Hub, client *Client, to string, outgoing *DataOutgoing) bool {
+ pipeline.mutex.RLock()
+ log.Println("Flush outgoing via pipeline", to, client == nil)
+ if client == nil {
+ sink := pipeline.sink
+ pipeline.mutex.RUnlock()
+ pipeline.Add(outgoing)
+
+ // It is possible to retrieve the userid for fake sessions here.
+ if session, found := pipeline.PipelineManager.GetSession(to); found {
+ log.Println("Pipeline found userid via manager", session.Userid())
+ }
+
+ if sink == nil {
+ return true
+ }
+ // Sink it.
+ pipeline.sink.Write(outgoing)
+ return true
+ }
+ pipeline.mutex.RUnlock()
+
+ return false
+}
+
+func (pipeline *Pipeline) Attach(sink Sink) error {
+ pipeline.mutex.Lock()
+ defer pipeline.mutex.Unlock()
+ if pipeline.sink != nil {
+ return errors.New("pipeline already attached to sink")
+ }
+ pipeline.sink = sink
+
+ // Sink existing data first.
+ log.Println("Attach sink to pipeline", pipeline.id)
+ for _, msg := range pipeline.data {
+ sink.Write(msg)
+ }
+
+ return nil
+}
diff --git a/go/channelling/pipeline_manager.go b/go/channelling/pipeline_manager.go
new file mode 100644
index 00000000..00f17539
--- /dev/null
+++ b/go/channelling/pipeline_manager.go
@@ -0,0 +1,120 @@
+/*
+ * Spreed WebRTC.
+ * Copyright (C) 2013-2015 struktur AG
+ *
+ * This file is part of Spreed WebRTC.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+package channelling
+
+import (
+ "fmt"
+ "log"
+ "sync"
+ "time"
+)
+
+const (
+ PipelineNamespaceCall = "call"
+)
+
+type PipelineManager interface {
+ BusManager
+ SessionStore
+ UserStore
+ GetPipelineByID(id string) (pipeline *Pipeline, ok bool)
+ GetPipeline(namespace string, sender Sender, session *Session, to string) *Pipeline
+}
+
+type pipelineManager struct {
+ BusManager
+ SessionStore
+ UserStore
+ mutex sync.RWMutex
+ pipelines map[string]*Pipeline
+ duration time.Duration
+}
+
+func NewPipelineManager(busManager BusManager, sessionStore SessionStore, userStore UserStore) PipelineManager {
+ plm := &pipelineManager{
+ BusManager: busManager,
+ SessionStore: sessionStore,
+ UserStore: userStore,
+ pipelines: make(map[string]*Pipeline),
+ duration: 30 * time.Minute,
+ }
+ plm.start()
+ return plm
+}
+
+func (plm *pipelineManager) cleanup() {
+ plm.mutex.Lock()
+ for id, pipeline := range plm.pipelines {
+ if pipeline.Expired() {
+ pipeline.Close()
+ delete(plm.pipelines, id)
+ }
+ }
+ plm.mutex.Unlock()
+}
+
+func (plm *pipelineManager) start() {
+ c := time.Tick(30 * time.Second)
+ go func() {
+ for _ = range c {
+ plm.cleanup()
+ }
+ }()
+}
+
+func (plm *pipelineManager) GetPipelineByID(id string) (*Pipeline, bool) {
+ plm.mutex.RLock()
+ pipeline, ok := plm.pipelines[id]
+ if !ok {
+ // XXX(longsleep): Hack for development
+ for _, pipeline = range plm.pipelines {
+ ok = true
+ break
+ }
+ }
+ plm.mutex.RUnlock()
+ return pipeline, ok
+}
+
+func (plm *pipelineManager) PipelineID(namespace string, sender Sender, session *Session, to string) string {
+ return fmt.Sprintf("%s.%s.%s", namespace, session.Id, to)
+}
+
+func (plm *pipelineManager) GetPipeline(namespace string, sender Sender, session *Session, to string) *Pipeline {
+ id := plm.PipelineID(namespace, sender, session, to)
+
+ plm.mutex.Lock()
+ pipeline, ok := plm.pipelines[id]
+ if ok {
+ // Refresh. We do not care if the pipeline is expired.
+ pipeline.Refresh(plm.duration)
+ plm.mutex.Unlock()
+ return pipeline
+ }
+
+ log.Println("Creating pipeline", namespace, id)
+ pipeline = NewPipeline(plm, namespace, id, session, plm.duration)
+ plm.pipelines[id] = pipeline
+ plm.mutex.Unlock()
+
+ return pipeline
+}
diff --git a/go/channelling/server/pipelines.go b/go/channelling/server/pipelines.go
new file mode 100644
index 00000000..ad590e26
--- /dev/null
+++ b/go/channelling/server/pipelines.go
@@ -0,0 +1,98 @@
+/*
+ * Spreed WebRTC.
+ * Copyright (C) 2013-2015 struktur AG
+ *
+ * This file is part of Spreed WebRTC.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+package server
+
+import (
+ "encoding/json"
+ "net/http"
+ "strconv"
+
+ "github.com/strukturag/spreed-webrtc/go/channelling"
+
+ "github.com/gorilla/mux"
+)
+
+type Pipelines struct {
+ channelling.PipelineManager
+ API channelling.ChannellingAPI
+}
+
+func (pipelines *Pipelines) Get(request *http.Request) (int, interface{}, http.Header) {
+ vars := mux.Vars(request)
+ id, ok := vars["id"]
+ if !ok {
+ return http.StatusNotFound, "", nil
+ }
+
+ pipeline, ok := pipelines.GetPipelineByID(id)
+ if !ok {
+ return http.StatusNotFound, "", nil
+ }
+
+ since := 0
+ limit := 0
+ if sinceParam := request.Form.Get("since"); sinceParam != "" {
+ since, _ = strconv.Atoi(sinceParam)
+ }
+ if limitParam := request.Form.Get("limit"); limitParam != "" {
+ limit, _ = strconv.Atoi(limitParam)
+ }
+
+ result, err := pipeline.JSONFeed(since, limit)
+ if err != nil {
+ return http.StatusInternalServerError, err.Error(), nil
+ }
+
+ return http.StatusOK, result, nil
+}
+
+func (pipelines *Pipelines) Post(request *http.Request) (int, interface{}, http.Header) {
+ vars := mux.Vars(request)
+ id, ok := vars["id"]
+ if !ok {
+ return http.StatusNotFound, "", nil
+ }
+
+ pipeline, ok := pipelines.GetPipelineByID(id)
+ if !ok {
+ return http.StatusNotFound, "", nil
+ }
+
+ var incoming channelling.DataIncoming
+ dec := json.NewDecoder(request.Body)
+ if err := dec.Decode(&incoming); err != nil {
+ return http.StatusBadRequest, err.Error(), nil
+ }
+
+ result := &channelling.DataOutgoing{
+ From: pipeline.Session().Id,
+ Iid: incoming.Iid,
+ }
+ reply, err := pipelines.API.OnIncoming(pipeline, pipeline.Session(), &incoming)
+ if err == nil {
+ result.Data = reply
+ } else {
+ result.Data = err
+ }
+
+ return http.StatusOK, result, nil
+}
diff --git a/go/channelling/server/users.go b/go/channelling/server/users.go
index 3898808e..922d9858 100644
--- a/go/channelling/server/users.go
+++ b/go/channelling/server/users.go
@@ -293,41 +293,27 @@ func (un *UserNonce) Response() (int, interface{}, http.Header) {
}
type Users struct {
+ channelling.SessionStore
channelling.SessionValidator
channelling.SessionManager
- channelling.SessionStore
realm string
handler UsersHandler
}
func NewUsers(sessionStore channelling.SessionStore, sessionValidator channelling.SessionValidator, sessionManager channelling.SessionManager, mode, realm string, runtime phoenix.Runtime) *Users {
-
var users = &Users{
+ sessionStore,
sessionValidator,
sessionManager,
- sessionStore,
realm,
nil,
}
+ // Create handler based on mode.
var handler UsersHandler
var err error
-
- // Create handler based on mode.
if handler, err = users.createHandler(mode, runtime); handler != nil && err == nil {
users.handler = handler
- // Register handler Get.
- sessionManager.RetrieveUsersWith(func(request *http.Request) (userid string, err error) {
- userid, err = handler.Get(request)
- if err != nil {
- log.Printf("Failed to get userid from handler: %s", err)
- } else {
- if userid != "" {
- log.Printf("Users handler get success: %s\n", userid)
- }
- }
- return
- })
log.Printf("Enabled users handler '%s'\n", mode)
} else if err != nil {
log.Printf("Failed to enable handler '%s': %s\n", mode, err)
@@ -422,6 +408,21 @@ func (users *Users) createHandler(mode string, runtime phoenix.Runtime) (handler
}
+func (users *Users) GetUserID(request *http.Request) (userid string, err error) {
+ if users.handler == nil {
+ return
+ }
+ userid, err = users.handler.Get(request)
+ if err != nil {
+ log.Printf("Failed to get userid from handler: %s", err)
+ } else {
+ if userid != "" {
+ log.Printf("Users handler get success: %s\n", userid)
+ }
+ }
+ return
+}
+
// Post is used to create new userids for this server.
func (users *Users) Post(request *http.Request) (int, interface{}, http.Header) {
@@ -465,7 +466,7 @@ func (users *Users) Post(request *http.Request) (int, interface{}, http.Header)
nonce string
err error
)
- if session, ok := users.GetSession(snr.Id); ok {
+ if session, ok := users.SessionStore.GetSession(snr.Id); ok {
nonce, err = session.Authorize(users.Realm(), &channelling.SessionToken{Id: snr.Id, Sid: snr.Sid, Userid: userid})
} else {
err = errors.New("no such session")
diff --git a/go/channelling/session.go b/go/channelling/session.go
index 82c17c39..1dbd77fc 100644
--- a/go/channelling/session.go
+++ b/go/channelling/session.go
@@ -33,30 +33,30 @@ import (
var sessionNonces *securecookie.SecureCookie
type Session struct {
- SessionManager
- Unicaster
- Broadcaster
- RoomStatusManager
- buddyImages ImageCache
- Id string
- Sid string
- Ua string
- UpdateRev uint64
- Status interface{}
- Nonce string
- Prio int
- Hello bool
- Roomid string
- mutex sync.RWMutex
- userid string
- fake bool
- stamp int64
- attestation *SessionAttestation
- attestations *securecookie.SecureCookie
- subscriptions map[string]*Session
- subscribers map[string]*Session
- disconnected bool
- replaced bool
+ SessionManager SessionManager
+ Unicaster Unicaster
+ Broadcaster Broadcaster
+ RoomStatusManager RoomStatusManager
+ buddyImages ImageCache
+ Id string
+ Sid string
+ Ua string
+ UpdateRev uint64
+ Status interface{}
+ Nonce string
+ Prio int
+ Hello bool
+ Roomid string
+ mutex sync.RWMutex
+ userid string
+ fake bool
+ stamp int64
+ attestation *SessionAttestation
+ attestations *securecookie.SecureCookie
+ subscriptions map[string]*Session
+ subscribers map[string]*Session
+ disconnected bool
+ replaced bool
}
func NewSession(manager SessionManager,
@@ -194,7 +194,7 @@ func (s *Session) BroadcastStatus() {
s.mutex.RUnlock()
}
-func (s *Session) Unicast(to string, m interface{}) {
+func (s *Session) Unicast(to string, m interface{}, pipeline *Pipeline) {
s.mutex.RLock()
outgoing := &DataOutgoing{
From: s.Id,
@@ -204,7 +204,7 @@ func (s *Session) Unicast(to string, m interface{}) {
}
s.mutex.RUnlock()
- s.Unicaster.Unicast(to, outgoing)
+ s.Unicaster.Unicast(to, outgoing, pipeline)
}
func (s *Session) Close() {
@@ -235,12 +235,12 @@ func (s *Session) Close() {
}
for _, session := range s.subscribers {
- s.Unicaster.Unicast(session.Id, outgoing)
+ s.Unicaster.Unicast(session.Id, outgoing, nil)
}
for _, session := range s.subscriptions {
session.RemoveSubscriber(s.Id)
- s.Unicaster.Unicast(session.Id, outgoing)
+ s.Unicaster.Unicast(session.Id, outgoing, nil)
}
s.SessionManager.DestroySession(s.Id, s.userid)
diff --git a/go/channelling/session_manager.go b/go/channelling/session_manager.go
index 7decaa9c..78a1c874 100644
--- a/go/channelling/session_manager.go
+++ b/go/channelling/session_manager.go
@@ -35,11 +35,13 @@ type UserStats interface {
type SessionManager interface {
UserStats
- RetrieveUsersWith(func(*http.Request) (string, error))
- CreateSession(*http.Request) *Session
+ SessionStore
+ UserStore
+ CreateSession(st *SessionToken, userid string) *Session
DestroySession(sessionID, userID string)
Authenticate(*Session, *SessionToken, string) error
GetUserSessions(session *Session, id string) []*DataSession
+ DecodeSessionToken(token string) (st *SessionToken)
}
type sessionManager struct {
@@ -48,12 +50,13 @@ type sessionManager struct {
Unicaster
Broadcaster
RoomStatusManager
- buddyImages ImageCache
- config *Config
- userTable map[string]*User
- fakesessionTable map[string]*Session
- useridRetriever func(*http.Request) (string, error)
- attestations *securecookie.SecureCookie
+ buddyImages ImageCache
+ config *Config
+ userTable map[string]*User
+ sessionTable map[string]*Session
+ sessionByUserIDTable map[string]*Session
+ useridRetriever func(*http.Request) (string, error)
+ attestations *securecookie.SecureCookie
}
func NewSessionManager(config *Config, tickets Tickets, unicaster Unicaster, broadcaster Broadcaster, rooms RoomStatusManager, buddyImages ImageCache, sessionSecret []byte) SessionManager {
@@ -67,6 +70,7 @@ func NewSessionManager(config *Config, tickets Tickets, unicaster Unicaster, bro
config,
make(map[string]*User),
make(map[string]*Session),
+ make(map[string]*Session),
nil,
nil,
}
@@ -93,29 +97,28 @@ func (sessionManager *sessionManager) UserInfo(details bool) (userCount int, use
return
}
-func (sessionManager *sessionManager) RetrieveUsersWith(retriever func(*http.Request) (string, error)) {
- sessionManager.useridRetriever = retriever
+// GetSession returns the client-less sessions created directly by sessionManager.
+func (sessionManager *sessionManager) GetSession(id string) (*Session, bool) {
+ sessionManager.RLock()
+ defer sessionManager.RUnlock()
+
+ session, ok := sessionManager.sessionTable[id]
+ return session, ok
}
-func (sessionManager *sessionManager) CreateSession(request *http.Request) *Session {
- request.ParseForm()
- token := request.FormValue("t")
- st := sessionManager.DecodeSessionToken(token)
-
- var userid string
- if sessionManager.config.UsersEnabled {
- if sessionManager.useridRetriever != nil {
- userid, _ = sessionManager.useridRetriever(request)
- if userid == "" {
- userid = st.Userid
- }
- }
- }
+func (sessionManager *sessionManager) GetUser(id string) (*User, bool) {
+ sessionManager.RLock()
+ defer sessionManager.RUnlock()
+
+ user, ok := sessionManager.userTable[id]
+ return user, ok
+}
+func (sessionManager *sessionManager) CreateSession(st *SessionToken, userid string) *Session {
session := NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid)
if userid != "" {
- // XXX(lcooper): Should errors be handled here?
+ // Errors are ignored here, session is returned without userID when auth failed.
sessionManager.Authenticate(session, st, userid)
}
@@ -128,10 +131,15 @@ func (sessionManager *sessionManager) DestroySession(sessionID, userID string) {
}
sessionManager.Lock()
- user, ok := sessionManager.userTable[userID]
- if ok && user.RemoveSession(sessionID) {
+ if user, ok := sessionManager.userTable[userID]; ok && user.RemoveSession(sessionID) {
delete(sessionManager.userTable, userID)
}
+ if _, ok := sessionManager.sessionTable[sessionID]; ok {
+ delete(sessionManager.sessionTable, sessionID)
+ }
+ if session, ok := sessionManager.sessionByUserIDTable[userID]; ok && session.Id == sessionID {
+ delete(sessionManager.sessionByUserIDTable, sessionID)
+ }
sessionManager.Unlock()
}
@@ -165,12 +173,13 @@ func (sessionManager *sessionManager) GetUserSessions(session *Session, userid s
if !ok {
// No user. Create fake session.
sessionManager.Lock()
- session, ok := sessionManager.fakesessionTable[userid]
+ session, ok := sessionManager.sessionByUserIDTable[userid]
if !ok {
st := sessionManager.FakeSessionToken(userid)
session = NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid)
session.SetUseridFake(st.Userid)
- sessionManager.fakesessionTable[userid] = session
+ sessionManager.sessionByUserIDTable[userid] = session
+ sessionManager.sessionTable[session.Id] = session
}
sessionManager.Unlock()
users = make([]*DataSession, 1, 1)
diff --git a/go/channelling/sink.go b/go/channelling/sink.go
new file mode 100644
index 00000000..c2258ff8
--- /dev/null
+++ b/go/channelling/sink.go
@@ -0,0 +1,32 @@
+/*
+ * Spreed WebRTC.
+ * Copyright (C) 2013-2015 struktur AG
+ *
+ * This file is part of Spreed WebRTC.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+package channelling
+
+import ()
+
+// Sink connects a Pipeline with end points in both directions by
+// getting attached to a Pipeline.
+type Sink interface {
+ // Write sends outgoing data on the sink from the
+ Write(interface{})
+ Close()
+}
diff --git a/go/channelling/tickets.go b/go/channelling/tickets.go
index 90145e1e..13f986b7 100644
--- a/go/channelling/tickets.go
+++ b/go/channelling/tickets.go
@@ -96,13 +96,12 @@ func (tickets *tickets) DecodeSessionToken(token string) (st *SessionToken) {
return
}
-func (tickets *tickets) FakeSessionToken(userid string) *SessionToken {
- st := &SessionToken{}
- st.Sid = fmt.Sprintf("fake-%s", randomstring.NewRandomString(27))
- st.Id, _ = tickets.Encode("id", st.Sid)
- st.Userid = userid
+func (tickets *tickets) FakeSessionToken(userid string) (st *SessionToken) {
+ sid := fmt.Sprintf("fake-%s", randomstring.NewRandomString(27))
+ id, _ := tickets.Encode("id", sid)
+ st = &SessionToken{Id: id, Sid: sid, Userid: userid}
log.Println("Created new fake session id", st.Id)
- return st
+ return
}
func (tickets *tickets) ValidateSession(id, sid string) bool {
diff --git a/go/channelling/unicaster.go b/go/channelling/unicaster.go
index 7f69474b..e9f28af1 100644
--- a/go/channelling/unicaster.go
+++ b/go/channelling/unicaster.go
@@ -25,5 +25,5 @@ type Unicaster interface {
SessionStore
OnConnect(*Client, *Session)
OnDisconnect(*Client, *Session)
- Unicast(to string, outgoing *DataOutgoing)
+ Unicast(to string, outgoing *DataOutgoing, pipeline *Pipeline)
}
diff --git a/go/channelling/userstore.go b/go/channelling/userstore.go
new file mode 100644
index 00000000..5da01011
--- /dev/null
+++ b/go/channelling/userstore.go
@@ -0,0 +1,26 @@
+/*
+ * Spreed WebRTC.
+ * Copyright (C) 2013-2015 struktur AG
+ *
+ * This file is part of Spreed WebRTC.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+package channelling
+
+type UserStore interface {
+ GetUser(id string) (user *User, ok bool)
+}
diff --git a/src/app/spreed-webrtc-server/handler_ws.go b/src/app/spreed-webrtc-server/handler_ws.go
index cde32fac..16fb406e 100644
--- a/src/app/spreed-webrtc-server/handler_ws.go
+++ b/src/app/spreed-webrtc-server/handler_ws.go
@@ -26,6 +26,7 @@ import (
"net/http"
"github.com/strukturag/spreed-webrtc/go/channelling"
+ "github.com/strukturag/spreed-webrtc/go/channelling/server"
"github.com/gorilla/websocket"
)
@@ -52,7 +53,7 @@ var (
}
)
-func makeWSHandler(connectionCounter channelling.ConnectionCounter, sessionManager channelling.SessionManager, codec channelling.Codec, channellingAPI channelling.ChannellingAPI) http.HandlerFunc {
+func makeWSHandler(connectionCounter channelling.ConnectionCounter, sessionManager channelling.SessionManager, codec channelling.Codec, channellingAPI channelling.ChannellingAPI, users *server.Users) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
// Validate incoming request.
if r.Method != "GET" {
@@ -69,8 +70,20 @@ func makeWSHandler(connectionCounter channelling.ConnectionCounter, sessionManag
return
}
+ r.ParseForm()
+ token := r.FormValue("t")
+ st := sessionManager.DecodeSessionToken(token)
+
+ var userid string
+ if users != nil {
+ userid, _ = users.GetUserID(r)
+ if userid == "" {
+ userid = st.Userid
+ }
+ }
+
// Create a new connection instance.
- session := sessionManager.CreateSession(r)
+ session := sessionManager.CreateSession(st, userid)
client := channelling.NewClient(codec, channellingAPI, session)
conn := channelling.NewConnection(connectionCounter.CountConnection(), ws, client)
diff --git a/src/app/spreed-webrtc-server/main.go b/src/app/spreed-webrtc-server/main.go
index 9d27fb48..cd5c00a9 100644
--- a/src/app/spreed-webrtc-server/main.go
+++ b/src/app/spreed-webrtc-server/main.go
@@ -266,7 +266,8 @@ func runner(runtime phoenix.Runtime) error {
sessionManager := channelling.NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret)
statsManager := channelling.NewStatsManager(hub, roomManager, sessionManager)
busManager := channelling.NewBusManager(natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject)
- channellingAPI := api.New(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager)
+ pipelineManager := channelling.NewPipelineManager(busManager, sessionManager, sessionManager)
+ channellingAPI := api.New(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager, pipelineManager)
// Add handlers.
r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler))
@@ -274,12 +275,8 @@ func runner(runtime phoenix.Runtime) error {
r.Handle("/static/{path:.*}", http.StripPrefix(config.B, httputils.FileStaticServer(http.Dir(rootFolder))))
r.Handle("/robots.txt", http.StripPrefix(config.B, http.FileServer(http.Dir(path.Join(rootFolder, "static")))))
r.Handle("/favicon.ico", http.StripPrefix(config.B, http.FileServer(http.Dir(path.Join(rootFolder, "static", "img")))))
- r.Handle("/ws", makeWSHandler(statsManager, sessionManager, codec, channellingAPI))
r.HandleFunc("/.well-known/spreed-configuration", wellKnownHandler)
- // Simple room handler.
- r.HandleFunc("/{room}", httputils.MakeGzipHandler(roomHandler))
-
// Sandbox handler.
r.HandleFunc("/sandbox/{origin_scheme}/{origin_host}/{sandbox}.html", httputils.MakeGzipHandler(sandboxHandler))
@@ -289,9 +286,11 @@ func runner(runtime phoenix.Runtime) error {
rest.AddResource(&server.Rooms{}, "/rooms")
rest.AddResource(config, "/config")
rest.AddResourceWithWrapper(&server.Tokens{tokenProvider}, httputils.MakeGzipHandler, "/tokens")
+
+ var users *server.Users
if config.UsersEnabled {
// Create Users handler.
- users := server.NewUsers(hub, tickets, sessionManager, config.UsersMode, serverRealm, runtime)
+ users = server.NewUsers(hub, tickets, sessionManager, config.UsersMode, serverRealm, runtime)
rest.AddResource(&server.Sessions{tickets, hub, users}, "/sessions/{id}/")
if config.UsersAllowRegistration {
rest.AddResource(users, "/users")
@@ -301,6 +300,7 @@ func runner(runtime phoenix.Runtime) error {
rest.AddResourceWithWrapper(&server.Stats{statsManager}, httputils.MakeGzipHandler, "/stats")
log.Println("Stats are enabled!")
}
+ rest.AddResource(&server.Pipelines{pipelineManager, channellingAPI}, "/pipelines/{id}")
// Add extra/static support if configured and exists.
if extraFolder != "" {
@@ -311,6 +311,12 @@ func runner(runtime phoenix.Runtime) error {
}
}
+ // Finally add websocket handler.
+ r.Handle("/ws", makeWSHandler(statsManager, sessionManager, codec, channellingAPI, users))
+
+ // Simple room handler.
+ r.HandleFunc("/{room}", httputils.MakeGzipHandler(roomHandler))
+
// Map everything else to a room when it is a GET.
rooms := r.PathPrefix("/").Methods("GET").Subrouter()
rooms.HandleFunc("/{room:.*}", httputils.MakeGzipHandler(roomHandler))