diff --git a/.gitignore b/.gitignore index 26c3fc79..65082f81 100644 --- a/.gitignore +++ b/.gitignore @@ -4,7 +4,7 @@ /src/github.com /src/golang.struktur.de /*.pprof -/server.conf +/server.conf* /*.log /changelog*.txt /src/styles/.sass-cache diff --git a/go/channelling/api.go b/go/channelling/api.go index 5708e6ed..729c96ec 100644 --- a/go/channelling/api.go +++ b/go/channelling/api.go @@ -26,3 +26,24 @@ type ChannellingAPI interface { OnDisconnect(*Client, *Session) OnIncoming(Sender, *Session, *DataIncoming) (interface{}, error) } + +type ChannellingAPIConsumer interface { + SetChannellingAPI(ChannellingAPI) + GetChannellingAPI() ChannellingAPI +} + +type channellingAPIConsumer struct { + ChannellingAPI ChannellingAPI +} + +func NewChannellingAPIConsumer() ChannellingAPIConsumer { + return &channellingAPIConsumer{} +} + +func (c *channellingAPIConsumer) SetChannellingAPI(api ChannellingAPI) { + c.ChannellingAPI = api +} + +func (c *channellingAPIConsumer) GetChannellingAPI() ChannellingAPI { + return c.ChannellingAPI +} diff --git a/go/channelling/api/api.go b/go/channelling/api/api.go index 0c34ab9e..bacf9e74 100644 --- a/go/channelling/api/api.go +++ b/go/channelling/api/api.go @@ -41,6 +41,7 @@ type channellingAPI struct { TurnDataCreator channelling.TurnDataCreator Unicaster channelling.Unicaster BusManager channelling.BusManager + PipelineManager channelling.PipelineManager config *channelling.Config } @@ -55,7 +56,8 @@ func New(config *channelling.Config, contactManager channelling.ContactManager, turnDataCreator channelling.TurnDataCreator, unicaster channelling.Unicaster, - busManager channelling.BusManager) channelling.ChannellingAPI { + busManager channelling.BusManager, + pipelineManager channelling.PipelineManager) channelling.ChannellingAPI { return &channellingAPI{ roomStatus, sessionEncoder, @@ -65,6 +67,7 @@ func New(config *channelling.Config, turnDataCreator, unicaster, busManager, + pipelineManager, config, } } @@ -73,17 +76,18 @@ func (api *channellingAPI) OnConnect(client *channelling.Client, session *channe api.Unicaster.OnConnect(client, session) self, err := api.HandleSelf(session) if err == nil { - api.BusManager.Trigger(channelling.BusManagerConnect, session.Id, "", nil) + api.BusManager.Trigger(channelling.BusManagerConnect, session.Id, "", nil, nil) } return self, err } func (api *channellingAPI) OnDisconnect(client *channelling.Client, session *channelling.Session) { api.Unicaster.OnDisconnect(client, session) - api.BusManager.Trigger(channelling.BusManagerDisconnect, session.Id, "", nil) + api.BusManager.Trigger(channelling.BusManagerDisconnect, session.Id, "", nil, nil) } func (api *channellingAPI) OnIncoming(sender channelling.Sender, session *channelling.Session, msg *channelling.DataIncoming) (interface{}, error) { + var pipeline *channelling.Pipeline switch msg.Type { case "Self": return api.HandleSelf(session) @@ -99,31 +103,34 @@ func (api *channellingAPI) OnIncoming(sender channelling.Sender, session *channe break } if _, ok := msg.Offer.Offer["_token"]; !ok { + pipeline = api.PipelineManager.GetPipeline(channelling.PipelineNamespaceCall, sender, session, msg.Offer.To) // Trigger offer event when offer has no token, so this is // not triggered for peerxfer and peerscreenshare offers. - api.BusManager.Trigger(channelling.BusManagerOffer, session.Id, msg.Offer.To, nil) + api.BusManager.Trigger(channelling.BusManagerOffer, session.Id, msg.Offer.To, nil, pipeline) } - session.Unicast(msg.Offer.To, msg.Offer) + session.Unicast(msg.Offer.To, msg.Offer, pipeline) case "Candidate": if msg.Candidate == nil || msg.Candidate.Candidate == nil { log.Println("Received invalid candidate message.", msg) break } - session.Unicast(msg.Candidate.To, msg.Candidate) + pipeline = api.PipelineManager.GetPipeline(channelling.PipelineNamespaceCall, sender, session, msg.Candidate.To) + session.Unicast(msg.Candidate.To, msg.Candidate, pipeline) case "Answer": if msg.Answer == nil || msg.Answer.Answer == nil { log.Println("Received invalid answer message.", msg) break } if _, ok := msg.Answer.Answer["_token"]; !ok { + pipeline = api.PipelineManager.GetPipeline(channelling.PipelineNamespaceCall, sender, session, msg.Answer.To) // Trigger answer event when answer has no token. so this is // not triggered for peerxfer and peerscreenshare answers. - api.BusManager.Trigger(channelling.BusManagerAnswer, session.Id, msg.Answer.To, nil) + api.BusManager.Trigger(channelling.BusManagerAnswer, session.Id, msg.Answer.To, nil, pipeline) } - session.Unicast(msg.Answer.To, msg.Answer) + session.Unicast(msg.Answer.To, msg.Answer, pipeline) case "Users": return api.HandleUsers(session) case "Authentication": @@ -137,9 +144,11 @@ func (api *channellingAPI) OnIncoming(sender channelling.Sender, session *channe log.Println("Received invalid bye message.", msg) break } - api.BusManager.Trigger(channelling.BusManagerBye, session.Id, msg.Bye.To, nil) + pipeline = api.PipelineManager.GetPipeline(channelling.PipelineNamespaceCall, sender, session, msg.Bye.To) + api.BusManager.Trigger(channelling.BusManagerBye, session.Id, msg.Bye.To, nil, pipeline) - session.Unicast(msg.Bye.To, msg.Bye) + session.Unicast(msg.Bye.To, msg.Bye, pipeline) + pipeline.Close() case "Status": if msg.Status == nil { log.Println("Received invalid status message.", msg) diff --git a/go/channelling/api/api_test.go b/go/channelling/api/api_test.go index 7f3bea0d..4847ed8e 100644 --- a/go/channelling/api/api_test.go +++ b/go/channelling/api/api_test.go @@ -35,6 +35,10 @@ import ( type fakeClient struct { } +func (fake *fakeClient) Index() uint64 { + return 0 +} + func (fake *fakeClient) Send(_ buffercache.Buffer) { } @@ -79,11 +83,14 @@ func (fake *fakeRoomManager) MakeRoomID(roomName, roomType string) string { } func NewTestChannellingAPI() (channelling.ChannellingAPI, *fakeClient, *channelling.Session, *fakeRoomManager) { + apiConsumer := channelling.NewChannellingAPIConsumer() client, roomManager := &fakeClient{}, &fakeRoomManager{} sessionNonces := securecookie.New(securecookie.GenerateRandomKey(64), nil) session := channelling.NewSession(nil, nil, roomManager, roomManager, nil, sessionNonces, "", "") - busManager := channelling.NewBusManager("", false, "") - return New(nil, roomManager, nil, nil, nil, nil, nil, nil, busManager), client, session, roomManager + busManager := channelling.NewBusManager(apiConsumer, "", false, "") + api := New(nil, roomManager, nil, nil, nil, nil, nil, nil, busManager, nil) + apiConsumer.SetChannellingAPI(api) + return api, client, session, roomManager } func Test_ChannellingAPI_OnIncoming_HelloMessage_JoinsTheSelectedRoom(t *testing.T) { diff --git a/go/channelling/api/handle_chat.go b/go/channelling/api/handle_chat.go index b1782cd8..de4eee1c 100644 --- a/go/channelling/api/handle_chat.go +++ b/go/channelling/api/handle_chat.go @@ -34,7 +34,7 @@ func (api *channellingAPI) HandleChat(session *channelling.Session, chat *channe to := chat.To if !msg.NoEcho { - session.Unicast(session.Id, chat) + session.Unicast(session.Id, chat, nil) } msg.Time = time.Now().Format(time.RFC3339) if to == "" { @@ -59,10 +59,10 @@ func (api *channellingAPI) HandleChat(session *channelling.Session, chat *channe api.StatsCounter.CountUnicastChat() } - session.Unicast(to, chat) + session.Unicast(to, chat, nil) if msg.Mid != "" { // Send out delivery confirmation status chat message. - session.Unicast(session.Id, &channelling.DataChat{To: to, Type: "Chat", Chat: &channelling.DataChatMessage{Mid: msg.Mid, Status: &channelling.DataChatStatus{State: "sent"}}}) + session.Unicast(session.Id, &channelling.DataChat{To: to, Type: "Chat", Chat: &channelling.DataChatMessage{Mid: msg.Mid, Status: &channelling.DataChatStatus{State: "sent"}}}, nil) } } } diff --git a/go/channelling/api/handle_conference.go b/go/channelling/api/handle_conference.go index ed961dfb..f913d1d6 100644 --- a/go/channelling/api/handle_conference.go +++ b/go/channelling/api/handle_conference.go @@ -37,7 +37,7 @@ func (api *channellingAPI) HandleConference(session *channelling.Session, confer // Send conference update to anyone. for _, id := range conference.Conference { if id != session.Id { - session.Unicast(id, conference) + session.Unicast(id, conference, nil) } } } diff --git a/go/channelling/api/handle_self.go b/go/channelling/api/handle_self.go index 504e5557..1c5e118d 100644 --- a/go/channelling/api/handle_self.go +++ b/go/channelling/api/handle_self.go @@ -47,7 +47,7 @@ func (api *channellingAPI) HandleSelf(session *channelling.Session) (*channellin Turn: api.TurnDataCreator.CreateTurnData(session), Stun: api.config.StunURIs, } - api.BusManager.Trigger(channelling.BusManagerSession, session.Id, session.Userid(), nil) + api.BusManager.Trigger(channelling.BusManagerSession, session.Id, session.Userid(), nil, nil) return self, nil } diff --git a/go/channelling/bus.go b/go/channelling/bus.go new file mode 100644 index 00000000..1fbac82b --- /dev/null +++ b/go/channelling/bus.go @@ -0,0 +1,41 @@ +/* + * Spreed WebRTC. + * Copyright (C) 2013-2015 struktur AG + * + * This file is part of Spreed WebRTC. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +package channelling + +type SessionCreateRequest struct { + Id string + Session *DataSession + Room *DataRoom + SetAsDefault bool +} + +type DataSink struct { + SubjectOut string `json:subject_out"` + SubjectIn string `json:subject_in"` +} + +type DataSinkOutgoing struct { + Outgoing *DataOutgoing + ToUserid string + FromUserid string + Pipe string `json:",omitempty"` +} diff --git a/go/channelling/bus_manager.go b/go/channelling/bus_manager.go index 50b24fec..de78df7f 100644 --- a/go/channelling/bus_manager.go +++ b/go/channelling/bus_manager.go @@ -25,6 +25,10 @@ import ( "errors" "fmt" "log" + "sync" + "time" + + "github.com/nats-io/nats" "github.com/strukturag/spreed-webrtc/go/natsconnection" ) @@ -41,17 +45,27 @@ const ( // A BusManager provides the API to interact with a bus. type BusManager interface { - Trigger(name, from, payload string, data interface{}) error + ChannellingAPIConsumer + Start() + Publish(subject string, v interface{}) error + Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error + Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error + Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) + BindRecvChan(subject string, channel interface{}) (*nats.Subscription, error) + BindSendChan(subject string, channel interface{}) error + PrefixSubject(string) string + CreateSink(string) Sink } // A BusTrigger is a container to serialize trigger events // for the bus backend. type BusTrigger struct { - Id string - Name string - From string - Payload string `json:",omitempty"` - Data interface{} `json:",omitempty"` + Id string + Name string + From string + Payload string `json:",omitempty"` + Data interface{} `json:",omitempty"` + Pipeline string `json:",omitempty"` } // BusSubjectTrigger returns the bus subject for trigger payloads. @@ -59,50 +73,77 @@ func BusSubjectTrigger(prefix, suffix string) string { return fmt.Sprintf("%s.%s", prefix, suffix) } -type busManager struct { - BusManager -} - // NewBusManager creates and initializes a new BusMager with the // provided flags for NATS support. It is intended to connect the // backend bus with a easy to use API to send and receive bus data. -func NewBusManager(id string, useNats bool, subjectPrefix string) BusManager { +func NewBusManager(apiConsumer ChannellingAPIConsumer, id string, useNats bool, subjectPrefix string) BusManager { var b BusManager var err error if useNats { - b, err = newNatsBus(id, subjectPrefix) + b, err = newNatsBus(apiConsumer, id, subjectPrefix) if err == nil { log.Println("NATS bus connected") } else { log.Println("Error connecting NATS bus", err) - b = &noopBus{id} + b = &noopBus{apiConsumer, id} } } else { - b = &noopBus{id} - } - if err == nil { - b.Trigger(BusManagerStartup, id, "", nil) + b = &noopBus{apiConsumer, id} } - return &busManager{b} + return b } type noopBus struct { + ChannellingAPIConsumer id string } -func (bus *noopBus) Trigger(name, from, payload string, data interface{}) error { +func (bus *noopBus) Start() { + // noop +} + +func (bus *noopBus) Publish(subject string, v interface{}) error { + return nil +} + +func (bus *noopBus) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error { + return nil +} + +func (bus *noopBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) error { + return nil +} + +func (bus *noopBus) PrefixSubject(subject string) string { + return subject +} + +func (bus *noopBus) BindRecvChan(subject string, channel interface{}) (*nats.Subscription, error) { + return nil, nil +} + +func (bus *noopBus) BindSendChan(subject string, channel interface{}) error { + return nil +} + +func (bus *noopBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) { + return nil, nil +} + +func (bus *noopBus) CreateSink(id string) Sink { return nil } type natsBus struct { + ChannellingAPIConsumer id string prefix string ec *natsconnection.EncodedConnection triggerQueue chan *busQueueEntry } -func newNatsBus(id, prefix string) (*natsBus, error) { +func newNatsBus(apiConsumer ChannellingAPIConsumer, id, prefix string) (*natsBus, error) { ec, err := natsconnection.EstablishJSONEncodedConnection(nil) if err != nil { return nil, err @@ -112,34 +153,68 @@ func newNatsBus(id, prefix string) (*natsBus, error) { } // Create buffered channel for outbound NATS data. triggerQueue := make(chan *busQueueEntry, 50) + + return &natsBus{apiConsumer, id, prefix, ec, triggerQueue}, nil +} + +func (bus *natsBus) Start() { // Start go routine to process outbount NATS publishing. - go chPublish(ec, triggerQueue) + go chPublish(bus.ec, bus.triggerQueue) + bus.Trigger(BusManagerStartup, bus.id, "", nil, nil) +} - return &natsBus{id, prefix, ec, triggerQueue}, nil +func (bus *natsBus) Publish(subject string, v interface{}) error { + return bus.ec.Publish(subject, v) } -func (bus *natsBus) Trigger(name, from, payload string, data interface{}) (err error) { - if bus.ec != nil { - trigger := &BusTrigger{ - Id: bus.id, - Name: name, - From: from, - Payload: payload, - Data: data, - } - entry := &busQueueEntry{BusSubjectTrigger(bus.prefix, name), trigger} - select { - case bus.triggerQueue <- entry: - // sent ok - default: - log.Println("Failed to queue NATS event - queue full?") - err = errors.New("NATS trigger queue full") - } +func (bus *natsBus) Request(subject string, v interface{}, vPtr interface{}, timeout time.Duration) error { + return bus.ec.Request(subject, v, vPtr, timeout) +} + +func (bus *natsBus) Trigger(name, from, payload string, data interface{}, pipeline *Pipeline) (err error) { + trigger := &BusTrigger{ + Id: bus.id, + Name: name, + From: from, + Payload: payload, + Data: data, + } + if pipeline != nil { + trigger.Pipeline = pipeline.GetID() + } + entry := &busQueueEntry{BusSubjectTrigger(bus.prefix, name), trigger} + select { + case bus.triggerQueue <- entry: + // sent ok + default: + log.Println("Failed to queue NATS event - queue full?") + err = errors.New("NATS trigger queue full") } return err } +func (bus *natsBus) PrefixSubject(sub string) string { + return fmt.Sprintf("%s.%s", bus.prefix, sub) +} + +func (bus *natsBus) Subscribe(subject string, cb nats.Handler) (*nats.Subscription, error) { + return bus.ec.Subscribe(subject, cb) +} + +func (bus *natsBus) BindRecvChan(subject string, channel interface{}) (*nats.Subscription, error) { + return bus.ec.BindRecvChan(subject, channel) +} + +func (bus *natsBus) BindSendChan(subject string, channel interface{}) error { + return bus.ec.BindSendChan(subject, channel) +} + +func (bus *natsBus) CreateSink(id string) (sink Sink) { + sink = newNatsSink(bus, id) + return +} + type busQueueEntry struct { subject string data interface{} @@ -154,3 +229,78 @@ func chPublish(ec *natsconnection.EncodedConnection, channel chan (*busQueueEntr } } } + +type natsSink struct { + sync.RWMutex + id string + bm BusManager + closed bool + SubjectOut string + SubjectIn string + sub *nats.Subscription + sendQueue chan *DataSinkOutgoing +} + +func newNatsSink(bm BusManager, id string) *natsSink { + sink := &natsSink{ + id: id, + bm: bm, + SubjectOut: bm.PrefixSubject(fmt.Sprintf("sink.%s.out", id)), + SubjectIn: bm.PrefixSubject(fmt.Sprintf("sink.%s.in", id)), + } + + sink.sendQueue = make(chan *DataSinkOutgoing, 100) + bm.BindSendChan(sink.SubjectOut, sink.sendQueue) + + return sink +} + +func (sink *natsSink) Write(outgoing *DataSinkOutgoing) (err error) { + if sink.Enabled() { + log.Println("Sending via NATS sink", sink.SubjectOut, outgoing) + sink.sendQueue <- outgoing + } + return err +} + +func (sink *natsSink) Enabled() bool { + sink.RLock() + defer sink.RUnlock() + return sink.closed == false +} + +func (sink *natsSink) Close() { + sink.Lock() + defer sink.Unlock() + if sink.sub != nil { + err := sink.sub.Unsubscribe() + if err != nil { + log.Println("Failed to unsubscribe NATS sink", err) + } else { + sink.sub = nil + } + } + sink.closed = true +} + +func (sink *natsSink) Export() *DataSink { + return &DataSink{ + SubjectOut: sink.SubjectOut, + SubjectIn: sink.SubjectIn, + } +} + +func (sink *natsSink) BindRecvChan(channel interface{}) (*nats.Subscription, error) { + sink.Lock() + defer sink.Unlock() + if sink.sub != nil { + sink.sub.Unsubscribe() + sink.sub = nil + } + sub, err := sink.bm.BindRecvChan(sink.SubjectIn, channel) + if err != nil { + return nil, err + } + sink.sub = sub + return sub, nil +} diff --git a/go/channelling/client.go b/go/channelling/client.go index 3310dcd3..f845bb37 100644 --- a/go/channelling/client.go +++ b/go/channelling/client.go @@ -28,14 +28,15 @@ import ( ) type Sender interface { + Index() uint64 Send(buffercache.Buffer) } type Client struct { - Codec - ChannellingAPI Connection - session *Session + Codec + ChannellingAPI ChannellingAPI + session *Session } func NewClient(codec Codec, api ChannellingAPI, session *Session) *Client { diff --git a/go/channelling/data.go b/go/channelling/data.go index 8456a839..960231b7 100644 --- a/go/channelling/data.go +++ b/go/channelling/data.go @@ -182,27 +182,27 @@ type DataAutoCall struct { type DataIncoming struct { Type string - Hello *DataHello - Offer *DataOffer - Candidate *DataCandidate - Answer *DataAnswer - Bye *DataBye - Status *DataStatus - Chat *DataChat - Conference *DataConference - Alive *DataAlive - Authentication *DataAuthentication - Sessions *DataSessions - Room *DataRoom - Iid string `json:",omitempty"` + Hello *DataHello `json:",omitempty"` + Offer *DataOffer `json:",omitempty"` + Candidate *DataCandidate `json:",omitempty"` + Answer *DataAnswer `json:",omitempty"` + Bye *DataBye `json:",omitempty"` + Status *DataStatus `json:",omitempty"` + Chat *DataChat `json:",omitempty"` + Conference *DataConference `json:",omitempty"` + Alive *DataAlive `json:",omitempty"` + Authentication *DataAuthentication `json:",omitempty"` + Sessions *DataSessions `json:",omitempty"` + Room *DataRoom `json:",omitempty"` + Iid string `json:",omitempty"` } type DataOutgoing struct { - Data interface{} - From string - To string - Iid string `json:",omitempty"` - A string `json:",omitempty"` + Data interface{} `json:",omitempty"` + From string `json:",omitempty"` + To string `json:",omitempty"` + Iid string `json:",omitempty"` + A string `json:",omitempty"` } type DataSessions struct { diff --git a/go/channelling/hub.go b/go/channelling/hub.go index c06088cd..057fcb60 100644 --- a/go/channelling/hub.go +++ b/go/channelling/hub.go @@ -155,13 +155,18 @@ func (h *hub) GetClient(sessionID string) (client *Client, ok bool) { return } -func (h *hub) Unicast(to string, outgoing *DataOutgoing) { - if message, err := h.EncodeOutgoing(outgoing); err == nil { - client, ok := h.GetClient(to) - if !ok { - log.Println("Unicast To not found", to) +func (h *hub) Unicast(to string, outgoing *DataOutgoing, pipeline *Pipeline) { + client, ok := h.GetClient(to) + if pipeline != nil { + if complete := pipeline.FlushOutgoing(h, client, to, outgoing); complete { return } + } + if !ok { + log.Println("Unicast To not found", to) + return + } + if message, err := h.EncodeOutgoing(outgoing); err == nil { client.Send(message) message.Decref() } diff --git a/go/channelling/pipeline.go b/go/channelling/pipeline.go new file mode 100644 index 00000000..fdcccade --- /dev/null +++ b/go/channelling/pipeline.go @@ -0,0 +1,266 @@ +/* + * Spreed WebRTC. + * Copyright (C) 2013-2015 struktur AG + * + * This file is part of Spreed WebRTC. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +package channelling + +import ( + "bytes" + "encoding/json" + "errors" + "log" + "sync" + "time" + + "github.com/strukturag/spreed-webrtc/go/buffercache" +) + +type PipelineFeedLine struct { + Seq int + Msg *DataOutgoing +} + +type Pipeline struct { + PipelineManager PipelineManager + mutex sync.RWMutex + namespace string + id string + from *Session + to *Session + expires *time.Time + data []*DataSinkOutgoing + sink Sink + recvQueue chan *DataIncoming + closed bool +} + +func NewPipeline(manager PipelineManager, + namespace string, + id string, + from *Session, + duration time.Duration) *Pipeline { + pipeline := &Pipeline{ + PipelineManager: manager, + namespace: namespace, + id: id, + from: from, + recvQueue: make(chan *DataIncoming, 100), + } + go pipeline.receive() + pipeline.Refresh(duration) + return pipeline +} + +func (pipeline *Pipeline) receive() { + // TODO(longsleep): Call to ToSession() should be avoided because it locks. + api := pipeline.PipelineManager.GetChannellingAPI() + for data := range pipeline.recvQueue { + _, err := api.OnIncoming(nil, pipeline.ToSession(), data) + if err != nil { + // TODO(longsleep): Handle reply and error. + log.Println("Pipeline receive incoming error", err) + } + } + log.Println("Pipeline receive done") +} + +func (pipeline *Pipeline) GetID() string { + return pipeline.id +} + +func (pipeline *Pipeline) Refresh(duration time.Duration) { + pipeline.mutex.Lock() + pipeline.refresh(duration) + pipeline.mutex.Unlock() +} + +func (pipeline *Pipeline) refresh(duration time.Duration) { + expiration := time.Now().Add(duration) + pipeline.expires = &expiration +} + +func (pipeline *Pipeline) Add(msg *DataSinkOutgoing) *Pipeline { + msg.Pipe = pipeline.id + pipeline.mutex.Lock() + pipeline.data = append(pipeline.data, msg) + pipeline.refresh(30 * time.Second) + pipeline.mutex.Unlock() + + return pipeline +} + +func (pipeline *Pipeline) Send(b buffercache.Buffer) { + // Noop. +} + +func (pipeline *Pipeline) Index() uint64 { + return 0 +} + +func (pipeline *Pipeline) Close() { + pipeline.mutex.Lock() + if !pipeline.closed { + pipeline.expires = nil + if pipeline.sink != nil { + pipeline.sink = nil + } + close(pipeline.recvQueue) + pipeline.closed = true + log.Println("Closed pipeline") + } + pipeline.mutex.Unlock() +} + +func (pipeline *Pipeline) Expired() bool { + var expired bool + pipeline.mutex.RLock() + if pipeline.expires == nil { + expired = true + } else { + expired = pipeline.expires.Before(time.Now()) + } + pipeline.mutex.RUnlock() + + return expired +} + +func (pipeline *Pipeline) FromSession() *Session { + pipeline.mutex.RLock() + defer pipeline.mutex.RUnlock() + return pipeline.from +} + +func (pipeline *Pipeline) ToSession() *Session { + pipeline.mutex.RLock() + defer pipeline.mutex.RUnlock() + return pipeline.to +} + +func (pipeline *Pipeline) JSONFeed(since, limit int) ([]byte, error) { + pipeline.mutex.RLock() + var lineRaw []byte + var line *PipelineFeedLine + var buffer bytes.Buffer + var err error + data := pipeline.data[since:] + count := 0 + for seq, msg := range data { + line = &PipelineFeedLine{ + Seq: seq + since, + Msg: msg.Outgoing, + } + lineRaw, err = json.Marshal(line) + if err != nil { + return nil, err + } + buffer.Write(lineRaw) + buffer.WriteString("\n") + + count++ + if limit > 0 && count >= limit { + break + } + } + pipeline.mutex.RUnlock() + + return buffer.Bytes(), nil +} + +func (pipeline *Pipeline) FlushOutgoing(hub Hub, client *Client, to string, outgoing *DataOutgoing) bool { + //log.Println("Flush outgoing via pipeline", to, client == nil) + if client == nil { + sinkOutgoing := &DataSinkOutgoing{ + Outgoing: outgoing, + } + + pipeline.mutex.Lock() + sink := pipeline.sink + toSession := pipeline.to + fromSession := pipeline.from + + for { + if sink != nil && sink.Enabled() { + // Sink it. + pipeline.mutex.Unlock() + break + } + + sink, toSession = pipeline.PipelineManager.FindSinkAndSession(to) + if sink != nil { + pipeline.to = toSession + err := pipeline.attach(sink) + if err == nil { + pipeline.mutex.Unlock() + + // Create incoming receiver. + sink.BindRecvChan(pipeline.recvQueue) + + // Sink it. + break + } + } + + // Not pipelined, do nothing. + pipeline.mutex.Unlock() + break + } + + if fromSession != nil { + sinkOutgoing.FromUserid = fromSession.Userid() + } + if toSession != nil { + sinkOutgoing.ToUserid = toSession.Userid() + } + pipeline.Add(sinkOutgoing) + + if sink != nil { + // Pipelined, sink data. + sink.Write(sinkOutgoing) + return true + } + } + + return false +} + +func (pipeline *Pipeline) Attach(sink Sink) error { + pipeline.mutex.Lock() + defer pipeline.mutex.Unlock() + + // Sink existing data first. + log.Println("Attach sink to pipeline", pipeline.id) + err := pipeline.attach(sink) + if err == nil { + for _, msg := range pipeline.data { + log.Println("Flushing pipeline to sink after attach", len(pipeline.data)) + sink.Write(msg) + } + } + + return err +} + +func (pipeline *Pipeline) attach(sink Sink) error { + if pipeline.sink != nil { + return errors.New("pipeline already attached to sink") + } + pipeline.sink = sink + return nil +} diff --git a/go/channelling/pipeline_manager.go b/go/channelling/pipeline_manager.go new file mode 100644 index 00000000..3b61a915 --- /dev/null +++ b/go/channelling/pipeline_manager.go @@ -0,0 +1,234 @@ +/* + * Spreed WebRTC. + * Copyright (C) 2013-2015 struktur AG + * + * This file is part of Spreed WebRTC. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +package channelling + +import ( + "fmt" + "log" + "sync" + "time" +) + +const ( + PipelineNamespaceCall = "call" +) + +type PipelineManager interface { + BusManager + SessionStore + UserStore + SessionCreator + GetPipelineByID(id string) (pipeline *Pipeline, ok bool) + GetPipeline(namespace string, sender Sender, session *Session, to string) *Pipeline + FindSinkAndSession(to string) (Sink, *Session) +} + +type pipelineManager struct { + BusManager + SessionStore + UserStore + SessionCreator + mutex sync.RWMutex + pipelineTable map[string]*Pipeline + sessionTable map[string]*Session + sessionByBusIDTable map[string]*Session + sessionSinkTable map[string]Sink + duration time.Duration + defaultSinkID string +} + +func NewPipelineManager(busManager BusManager, sessionStore SessionStore, userStore UserStore, sessionCreator SessionCreator) PipelineManager { + plm := &pipelineManager{ + BusManager: busManager, + SessionStore: sessionStore, + UserStore: userStore, + SessionCreator: sessionCreator, + pipelineTable: make(map[string]*Pipeline), + sessionTable: make(map[string]*Session), + sessionByBusIDTable: make(map[string]*Session), + sessionSinkTable: make(map[string]Sink), + duration: 60 * time.Second, + } + plm.start() + + plm.Subscribe("channelling.session.create", plm.sessionCreate) + plm.Subscribe("channelling.session.close", plm.sessionClose) + + return plm +} + +func (plm *pipelineManager) cleanup() { + plm.mutex.Lock() + for id, pipeline := range plm.pipelineTable { + if pipeline.Expired() { + pipeline.Close() + delete(plm.pipelineTable, id) + } + } + plm.mutex.Unlock() +} + +func (plm *pipelineManager) start() { + c := time.Tick(30 * time.Second) + go func() { + for _ = range c { + plm.cleanup() + } + }() +} + +func (plm *pipelineManager) sessionCreate(subject, reply string, msg *SessionCreateRequest) { + log.Println("sessionCreate via NATS", subject, reply, msg) + + if msg.Session == nil || msg.Id == "" { + return + } + + var sink Sink + + plm.mutex.Lock() + session, ok := plm.sessionByBusIDTable[msg.Id] + if ok { + // Remove existing session with same ID. + delete(plm.sessionTable, session.Id) + sink, _ = plm.sessionSinkTable[session.Id] + delete(plm.sessionSinkTable, session.Id) + session.Close() + } + session = plm.CreateSession(nil, "") + plm.sessionByBusIDTable[msg.Id] = session + plm.sessionTable[session.Id] = session + if sink == nil { + sink = plm.CreateSink(msg.Id) + log.Println("Created NATS sink", msg.Id) + } + if reply != "" { + // Always reply with our sink data + plm.Publish(reply, sink.Export()) + } + plm.sessionSinkTable[session.Id] = sink + + if msg.SetAsDefault { + plm.defaultSinkID = session.Id + log.Println("Using NATS sink as default session", session.Id) + } + plm.mutex.Unlock() + + if msg.Session.Status != nil { + session.Status = msg.Session.Status + } + + if msg.Session.Userid != "" { + session.SetUseridFake(msg.Session.Userid) + } + + if msg.Room != nil { + room, err := session.JoinRoom(msg.Room.Name, msg.Room.Type, msg.Room.Credentials, nil) + log.Println("Joined NATS session to room", room, err) + } + + session.BroadcastStatus() +} + +func (plm *pipelineManager) sessionClose(subject, reply string, id string) { + log.Println("sessionClose via NATS", subject, reply, id) + + if id == "" { + return + } + + plm.mutex.Lock() + session, ok := plm.sessionByBusIDTable[id] + if ok { + delete(plm.sessionByBusIDTable, id) + delete(plm.sessionTable, session.Id) + if sink, ok := plm.sessionSinkTable[session.Id]; ok { + delete(plm.sessionSinkTable, session.Id) + sink.Close() + } + } + plm.mutex.Unlock() + + if ok { + session.Close() + } +} + +func (plm *pipelineManager) GetPipelineByID(id string) (*Pipeline, bool) { + plm.mutex.RLock() + pipeline, ok := plm.pipelineTable[id] + plm.mutex.RUnlock() + return pipeline, ok +} + +func (plm *pipelineManager) PipelineID(namespace string, sender Sender, session *Session, to string) string { + return fmt.Sprintf("%s.%s.%s", namespace, session.Id, to) +} + +func (plm *pipelineManager) GetPipeline(namespace string, sender Sender, session *Session, to string) *Pipeline { + id := plm.PipelineID(namespace, sender, session, to) + + plm.mutex.Lock() + pipeline, ok := plm.pipelineTable[id] + if ok { + // Refresh. We do not care if the pipeline is expired. + pipeline.Refresh(plm.duration) + plm.mutex.Unlock() + return pipeline + } + + log.Println("Creating pipeline", namespace, id) + pipeline = NewPipeline(plm, namespace, id, session, plm.duration) + plm.pipelineTable[id] = pipeline + plm.mutex.Unlock() + + return pipeline +} + +func (plm *pipelineManager) FindSinkAndSession(to string) (sink Sink, session *Session) { + plm.mutex.RLock() + + var found bool + if sink, found = plm.sessionSinkTable[to]; found { + session, _ = plm.sessionTable[to] + plm.mutex.RUnlock() + if sink.Enabled() { + log.Println("Pipeline sink found via manager", sink) + return sink, session + } + } else { + plm.mutex.RUnlock() + } + + if plm.defaultSinkID != "" && to != plm.defaultSinkID { + // Keep target to while returning a the default sink. + log.Println("Find sink via default sink ID", plm.defaultSinkID) + sink, _ = plm.FindSinkAndSession(plm.defaultSinkID) + if sink != nil { + if session, found = plm.GetSession(to); found { + return + } + } + } + + return nil, nil +} diff --git a/go/channelling/roomworker.go b/go/channelling/roomworker.go index 8450f069..d991ca24 100644 --- a/go/channelling/roomworker.go +++ b/go/channelling/roomworker.go @@ -225,8 +225,8 @@ func (r *roomWorker) Broadcast(sessionID string, message buffercache.Buffer) { worker := func() { r.mutex.RLock() for id, user := range r.users { - if id == sessionID { - // Skip broadcast to self. + if id == sessionID || user.Sender == nil { + // Skip broadcast to self or non existing sender. continue } //fmt.Printf("%s\n", m.Message) diff --git a/go/channelling/server/pipelines.go b/go/channelling/server/pipelines.go new file mode 100644 index 00000000..869372f3 --- /dev/null +++ b/go/channelling/server/pipelines.go @@ -0,0 +1,98 @@ +/* + * Spreed WebRTC. + * Copyright (C) 2013-2015 struktur AG + * + * This file is part of Spreed WebRTC. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +package server + +import ( + "encoding/json" + "net/http" + "strconv" + + "github.com/strukturag/spreed-webrtc/go/channelling" + + "github.com/gorilla/mux" +) + +type Pipelines struct { + channelling.PipelineManager + API channelling.ChannellingAPI +} + +func (pipelines *Pipelines) Get(request *http.Request) (int, interface{}, http.Header) { + vars := mux.Vars(request) + id, ok := vars["id"] + if !ok { + return http.StatusNotFound, "", nil + } + + pipeline, ok := pipelines.GetPipelineByID(id) + if !ok { + return http.StatusNotFound, "", nil + } + + since := 0 + limit := 0 + if sinceParam := request.Form.Get("since"); sinceParam != "" { + since, _ = strconv.Atoi(sinceParam) + } + if limitParam := request.Form.Get("limit"); limitParam != "" { + limit, _ = strconv.Atoi(limitParam) + } + + result, err := pipeline.JSONFeed(since, limit) + if err != nil { + return http.StatusInternalServerError, err.Error(), nil + } + + return http.StatusOK, result, nil +} + +func (pipelines *Pipelines) Post(request *http.Request) (int, interface{}, http.Header) { + vars := mux.Vars(request) + id, ok := vars["id"] + if !ok { + return http.StatusNotFound, "", nil + } + + pipeline, ok := pipelines.GetPipelineByID(id) + if !ok { + return http.StatusNotFound, "", nil + } + + var incoming channelling.DataIncoming + dec := json.NewDecoder(request.Body) + if err := dec.Decode(&incoming); err != nil { + return http.StatusBadRequest, err.Error(), nil + } + + result := &channelling.DataOutgoing{ + From: pipeline.FromSession().Id, + Iid: incoming.Iid, + } + reply, err := pipelines.API.OnIncoming(pipeline, pipeline.ToSession(), &incoming) + if err == nil { + result.Data = reply + } else { + result.Data = err + } + + return http.StatusOK, result, nil +} diff --git a/go/channelling/server/users.go b/go/channelling/server/users.go index 3898808e..922d9858 100644 --- a/go/channelling/server/users.go +++ b/go/channelling/server/users.go @@ -293,41 +293,27 @@ func (un *UserNonce) Response() (int, interface{}, http.Header) { } type Users struct { + channelling.SessionStore channelling.SessionValidator channelling.SessionManager - channelling.SessionStore realm string handler UsersHandler } func NewUsers(sessionStore channelling.SessionStore, sessionValidator channelling.SessionValidator, sessionManager channelling.SessionManager, mode, realm string, runtime phoenix.Runtime) *Users { - var users = &Users{ + sessionStore, sessionValidator, sessionManager, - sessionStore, realm, nil, } + // Create handler based on mode. var handler UsersHandler var err error - - // Create handler based on mode. if handler, err = users.createHandler(mode, runtime); handler != nil && err == nil { users.handler = handler - // Register handler Get. - sessionManager.RetrieveUsersWith(func(request *http.Request) (userid string, err error) { - userid, err = handler.Get(request) - if err != nil { - log.Printf("Failed to get userid from handler: %s", err) - } else { - if userid != "" { - log.Printf("Users handler get success: %s\n", userid) - } - } - return - }) log.Printf("Enabled users handler '%s'\n", mode) } else if err != nil { log.Printf("Failed to enable handler '%s': %s\n", mode, err) @@ -422,6 +408,21 @@ func (users *Users) createHandler(mode string, runtime phoenix.Runtime) (handler } +func (users *Users) GetUserID(request *http.Request) (userid string, err error) { + if users.handler == nil { + return + } + userid, err = users.handler.Get(request) + if err != nil { + log.Printf("Failed to get userid from handler: %s", err) + } else { + if userid != "" { + log.Printf("Users handler get success: %s\n", userid) + } + } + return +} + // Post is used to create new userids for this server. func (users *Users) Post(request *http.Request) (int, interface{}, http.Header) { @@ -465,7 +466,7 @@ func (users *Users) Post(request *http.Request) (int, interface{}, http.Header) nonce string err error ) - if session, ok := users.GetSession(snr.Id); ok { + if session, ok := users.SessionStore.GetSession(snr.Id); ok { nonce, err = session.Authorize(users.Realm(), &channelling.SessionToken{Id: snr.Id, Sid: snr.Sid, Userid: userid}) } else { err = errors.New("no such session") diff --git a/go/channelling/session.go b/go/channelling/session.go index 82c17c39..1dbd77fc 100644 --- a/go/channelling/session.go +++ b/go/channelling/session.go @@ -33,30 +33,30 @@ import ( var sessionNonces *securecookie.SecureCookie type Session struct { - SessionManager - Unicaster - Broadcaster - RoomStatusManager - buddyImages ImageCache - Id string - Sid string - Ua string - UpdateRev uint64 - Status interface{} - Nonce string - Prio int - Hello bool - Roomid string - mutex sync.RWMutex - userid string - fake bool - stamp int64 - attestation *SessionAttestation - attestations *securecookie.SecureCookie - subscriptions map[string]*Session - subscribers map[string]*Session - disconnected bool - replaced bool + SessionManager SessionManager + Unicaster Unicaster + Broadcaster Broadcaster + RoomStatusManager RoomStatusManager + buddyImages ImageCache + Id string + Sid string + Ua string + UpdateRev uint64 + Status interface{} + Nonce string + Prio int + Hello bool + Roomid string + mutex sync.RWMutex + userid string + fake bool + stamp int64 + attestation *SessionAttestation + attestations *securecookie.SecureCookie + subscriptions map[string]*Session + subscribers map[string]*Session + disconnected bool + replaced bool } func NewSession(manager SessionManager, @@ -194,7 +194,7 @@ func (s *Session) BroadcastStatus() { s.mutex.RUnlock() } -func (s *Session) Unicast(to string, m interface{}) { +func (s *Session) Unicast(to string, m interface{}, pipeline *Pipeline) { s.mutex.RLock() outgoing := &DataOutgoing{ From: s.Id, @@ -204,7 +204,7 @@ func (s *Session) Unicast(to string, m interface{}) { } s.mutex.RUnlock() - s.Unicaster.Unicast(to, outgoing) + s.Unicaster.Unicast(to, outgoing, pipeline) } func (s *Session) Close() { @@ -235,12 +235,12 @@ func (s *Session) Close() { } for _, session := range s.subscribers { - s.Unicaster.Unicast(session.Id, outgoing) + s.Unicaster.Unicast(session.Id, outgoing, nil) } for _, session := range s.subscriptions { session.RemoveSubscriber(s.Id) - s.Unicaster.Unicast(session.Id, outgoing) + s.Unicaster.Unicast(session.Id, outgoing, nil) } s.SessionManager.DestroySession(s.Id, s.userid) diff --git a/go/channelling/session_manager.go b/go/channelling/session_manager.go index 7decaa9c..e65be9a7 100644 --- a/go/channelling/session_manager.go +++ b/go/channelling/session_manager.go @@ -35,11 +35,13 @@ type UserStats interface { type SessionManager interface { UserStats - RetrieveUsersWith(func(*http.Request) (string, error)) - CreateSession(*http.Request) *Session + SessionStore + UserStore + SessionCreator DestroySession(sessionID, userID string) Authenticate(*Session, *SessionToken, string) error GetUserSessions(session *Session, id string) []*DataSession + DecodeSessionToken(token string) (st *SessionToken) } type sessionManager struct { @@ -48,12 +50,13 @@ type sessionManager struct { Unicaster Broadcaster RoomStatusManager - buddyImages ImageCache - config *Config - userTable map[string]*User - fakesessionTable map[string]*Session - useridRetriever func(*http.Request) (string, error) - attestations *securecookie.SecureCookie + buddyImages ImageCache + config *Config + userTable map[string]*User + sessionTable map[string]*Session + sessionByUserIDTable map[string]*Session + useridRetriever func(*http.Request) (string, error) + attestations *securecookie.SecureCookie } func NewSessionManager(config *Config, tickets Tickets, unicaster Unicaster, broadcaster Broadcaster, rooms RoomStatusManager, buddyImages ImageCache, sessionSecret []byte) SessionManager { @@ -67,6 +70,7 @@ func NewSessionManager(config *Config, tickets Tickets, unicaster Unicaster, bro config, make(map[string]*User), make(map[string]*Session), + make(map[string]*Session), nil, nil, } @@ -93,29 +97,31 @@ func (sessionManager *sessionManager) UserInfo(details bool) (userCount int, use return } -func (sessionManager *sessionManager) RetrieveUsersWith(retriever func(*http.Request) (string, error)) { - sessionManager.useridRetriever = retriever +// GetSession returns the client-less sessions created directly by sessionManager. +func (sessionManager *sessionManager) GetSession(id string) (*Session, bool) { + sessionManager.RLock() + defer sessionManager.RUnlock() + + session, ok := sessionManager.sessionTable[id] + return session, ok } -func (sessionManager *sessionManager) CreateSession(request *http.Request) *Session { - request.ParseForm() - token := request.FormValue("t") - st := sessionManager.DecodeSessionToken(token) - - var userid string - if sessionManager.config.UsersEnabled { - if sessionManager.useridRetriever != nil { - userid, _ = sessionManager.useridRetriever(request) - if userid == "" { - userid = st.Userid - } - } - } +func (sessionManager *sessionManager) GetUser(id string) (*User, bool) { + sessionManager.RLock() + defer sessionManager.RUnlock() + + user, ok := sessionManager.userTable[id] + return user, ok +} +func (sessionManager *sessionManager) CreateSession(st *SessionToken, userid string) *Session { + if st == nil { + st = sessionManager.DecodeSessionToken("") + } session := NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid) if userid != "" { - // XXX(lcooper): Should errors be handled here? + // Errors are ignored here, session is returned without userID when auth failed. sessionManager.Authenticate(session, st, userid) } @@ -128,10 +134,15 @@ func (sessionManager *sessionManager) DestroySession(sessionID, userID string) { } sessionManager.Lock() - user, ok := sessionManager.userTable[userID] - if ok && user.RemoveSession(sessionID) { + if user, ok := sessionManager.userTable[userID]; ok && user.RemoveSession(sessionID) { delete(sessionManager.userTable, userID) } + if _, ok := sessionManager.sessionTable[sessionID]; ok { + delete(sessionManager.sessionTable, sessionID) + } + if session, ok := sessionManager.sessionByUserIDTable[userID]; ok && session.Id == sessionID { + delete(sessionManager.sessionByUserIDTable, sessionID) + } sessionManager.Unlock() } @@ -165,12 +176,13 @@ func (sessionManager *sessionManager) GetUserSessions(session *Session, userid s if !ok { // No user. Create fake session. sessionManager.Lock() - session, ok := sessionManager.fakesessionTable[userid] + session, ok := sessionManager.sessionByUserIDTable[userid] if !ok { st := sessionManager.FakeSessionToken(userid) session = NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid) session.SetUseridFake(st.Userid) - sessionManager.fakesessionTable[userid] = session + sessionManager.sessionByUserIDTable[userid] = session + sessionManager.sessionTable[session.Id] = session } sessionManager.Unlock() users = make([]*DataSession, 1, 1) diff --git a/go/channelling/sessioncreator.go b/go/channelling/sessioncreator.go new file mode 100644 index 00000000..baa28478 --- /dev/null +++ b/go/channelling/sessioncreator.go @@ -0,0 +1,26 @@ +/* + * Spreed WebRTC. + * Copyright (C) 2013-2015 struktur AG + * + * This file is part of Spreed WebRTC. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +package channelling + +type SessionCreator interface { + CreateSession(st *SessionToken, userid string) *Session +} diff --git a/go/channelling/sink.go b/go/channelling/sink.go new file mode 100644 index 00000000..f665d069 --- /dev/null +++ b/go/channelling/sink.go @@ -0,0 +1,37 @@ +/* + * Spreed WebRTC. + * Copyright (C) 2013-2015 struktur AG + * + * This file is part of Spreed WebRTC. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +package channelling + +import ( + "github.com/nats-io/nats" +) + +// Sink connects a Pipeline with end points in both directions by +// getting attached to a Pipeline. +type Sink interface { + // Write sends outgoing data on the sink + Write(*DataSinkOutgoing) error + Enabled() bool + Close() + Export() *DataSink + BindRecvChan(channel interface{}) (*nats.Subscription, error) +} diff --git a/go/channelling/tickets.go b/go/channelling/tickets.go index 90145e1e..13f986b7 100644 --- a/go/channelling/tickets.go +++ b/go/channelling/tickets.go @@ -96,13 +96,12 @@ func (tickets *tickets) DecodeSessionToken(token string) (st *SessionToken) { return } -func (tickets *tickets) FakeSessionToken(userid string) *SessionToken { - st := &SessionToken{} - st.Sid = fmt.Sprintf("fake-%s", randomstring.NewRandomString(27)) - st.Id, _ = tickets.Encode("id", st.Sid) - st.Userid = userid +func (tickets *tickets) FakeSessionToken(userid string) (st *SessionToken) { + sid := fmt.Sprintf("fake-%s", randomstring.NewRandomString(27)) + id, _ := tickets.Encode("id", sid) + st = &SessionToken{Id: id, Sid: sid, Userid: userid} log.Println("Created new fake session id", st.Id) - return st + return } func (tickets *tickets) ValidateSession(id, sid string) bool { diff --git a/go/channelling/unicaster.go b/go/channelling/unicaster.go index 7f69474b..e9f28af1 100644 --- a/go/channelling/unicaster.go +++ b/go/channelling/unicaster.go @@ -25,5 +25,5 @@ type Unicaster interface { SessionStore OnConnect(*Client, *Session) OnDisconnect(*Client, *Session) - Unicast(to string, outgoing *DataOutgoing) + Unicast(to string, outgoing *DataOutgoing, pipeline *Pipeline) } diff --git a/go/channelling/userstore.go b/go/channelling/userstore.go new file mode 100644 index 00000000..5da01011 --- /dev/null +++ b/go/channelling/userstore.go @@ -0,0 +1,26 @@ +/* + * Spreed WebRTC. + * Copyright (C) 2013-2015 struktur AG + * + * This file is part of Spreed WebRTC. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + */ + +package channelling + +type UserStore interface { + GetUser(id string) (user *User, ok bool) +} diff --git a/go/natsconnection/natsconnection.go b/go/natsconnection/natsconnection.go index db9cdc47..bf64bb7a 100644 --- a/go/natsconnection/natsconnection.go +++ b/go/natsconnection/natsconnection.go @@ -48,6 +48,9 @@ func NewConnection() (*Connection, error) { ReconnectedCB: func(conn *nats.Conn) { log.Println("NATS reconnected") }, + AsyncErrorCB: func(conn *nats.Conn, sub *nats.Subscription, err error) { + log.Println("NATS async error", sub, err) + }, } nc, err := opts.Connect() diff --git a/src/app/spreed-webrtc-server/handler_ws.go b/src/app/spreed-webrtc-server/handler_ws.go index cde32fac..16fb406e 100644 --- a/src/app/spreed-webrtc-server/handler_ws.go +++ b/src/app/spreed-webrtc-server/handler_ws.go @@ -26,6 +26,7 @@ import ( "net/http" "github.com/strukturag/spreed-webrtc/go/channelling" + "github.com/strukturag/spreed-webrtc/go/channelling/server" "github.com/gorilla/websocket" ) @@ -52,7 +53,7 @@ var ( } ) -func makeWSHandler(connectionCounter channelling.ConnectionCounter, sessionManager channelling.SessionManager, codec channelling.Codec, channellingAPI channelling.ChannellingAPI) http.HandlerFunc { +func makeWSHandler(connectionCounter channelling.ConnectionCounter, sessionManager channelling.SessionManager, codec channelling.Codec, channellingAPI channelling.ChannellingAPI, users *server.Users) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { // Validate incoming request. if r.Method != "GET" { @@ -69,8 +70,20 @@ func makeWSHandler(connectionCounter channelling.ConnectionCounter, sessionManag return } + r.ParseForm() + token := r.FormValue("t") + st := sessionManager.DecodeSessionToken(token) + + var userid string + if users != nil { + userid, _ = users.GetUserID(r) + if userid == "" { + userid = st.Userid + } + } + // Create a new connection instance. - session := sessionManager.CreateSession(r) + session := sessionManager.CreateSession(st, userid) client := channelling.NewClient(codec, channellingAPI, session) conn := channelling.NewConnection(connectionCounter.CountConnection(), ws, client) diff --git a/src/app/spreed-webrtc-server/main.go b/src/app/spreed-webrtc-server/main.go index 9d27fb48..12036dbb 100644 --- a/src/app/spreed-webrtc-server/main.go +++ b/src/app/spreed-webrtc-server/main.go @@ -80,6 +80,11 @@ func runner(runtime phoenix.Runtime) error { statsEnabled = false } + pipelinesEnabled, err := runtime.GetBool("http", "pipelines") + if err != nil { + pipelinesEnabled = false + } + pprofListen, err := runtime.GetString("http", "pprofListen") if err == nil && pprofListen != "" { log.Printf("Starting pprof HTTP server on %s", pprofListen) @@ -258,6 +263,7 @@ func runner(runtime phoenix.Runtime) error { } // Prepare services. + apiConsumer := channelling.NewChannellingAPIConsumer() buddyImages := channelling.NewImageCache() codec := channelling.NewCodec(incomingCodecLimit) roomManager := channelling.NewRoomManager(config, codec) @@ -265,8 +271,15 @@ func runner(runtime phoenix.Runtime) error { tickets := channelling.NewTickets(sessionSecret, encryptionSecret, computedRealm) sessionManager := channelling.NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret) statsManager := channelling.NewStatsManager(hub, roomManager, sessionManager) - busManager := channelling.NewBusManager(natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject) - channellingAPI := api.New(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager) + busManager := channelling.NewBusManager(apiConsumer, natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject) + pipelineManager := channelling.NewPipelineManager(busManager, sessionManager, sessionManager, sessionManager) + + // Create API. + channellingAPI := api.New(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager, pipelineManager) + apiConsumer.SetChannellingAPI(channellingAPI) + + // Start bus. + busManager.Start() // Add handlers. r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler)) @@ -274,12 +287,8 @@ func runner(runtime phoenix.Runtime) error { r.Handle("/static/{path:.*}", http.StripPrefix(config.B, httputils.FileStaticServer(http.Dir(rootFolder)))) r.Handle("/robots.txt", http.StripPrefix(config.B, http.FileServer(http.Dir(path.Join(rootFolder, "static"))))) r.Handle("/favicon.ico", http.StripPrefix(config.B, http.FileServer(http.Dir(path.Join(rootFolder, "static", "img"))))) - r.Handle("/ws", makeWSHandler(statsManager, sessionManager, codec, channellingAPI)) r.HandleFunc("/.well-known/spreed-configuration", wellKnownHandler) - // Simple room handler. - r.HandleFunc("/{room}", httputils.MakeGzipHandler(roomHandler)) - // Sandbox handler. r.HandleFunc("/sandbox/{origin_scheme}/{origin_host}/{sandbox}.html", httputils.MakeGzipHandler(sandboxHandler)) @@ -289,9 +298,11 @@ func runner(runtime phoenix.Runtime) error { rest.AddResource(&server.Rooms{}, "/rooms") rest.AddResource(config, "/config") rest.AddResourceWithWrapper(&server.Tokens{tokenProvider}, httputils.MakeGzipHandler, "/tokens") + + var users *server.Users if config.UsersEnabled { // Create Users handler. - users := server.NewUsers(hub, tickets, sessionManager, config.UsersMode, serverRealm, runtime) + users = server.NewUsers(hub, tickets, sessionManager, config.UsersMode, serverRealm, runtime) rest.AddResource(&server.Sessions{tickets, hub, users}, "/sessions/{id}/") if config.UsersAllowRegistration { rest.AddResource(users, "/users") @@ -301,6 +312,10 @@ func runner(runtime phoenix.Runtime) error { rest.AddResourceWithWrapper(&server.Stats{statsManager}, httputils.MakeGzipHandler, "/stats") log.Println("Stats are enabled!") } + if pipelinesEnabled { + rest.AddResource(&server.Pipelines{pipelineManager, channellingAPI}, "/pipelines/{id}") + log.Println("Pipelines are enabled!") + } // Add extra/static support if configured and exists. if extraFolder != "" { @@ -311,6 +326,12 @@ func runner(runtime phoenix.Runtime) error { } } + // Finally add websocket handler. + r.Handle("/ws", makeWSHandler(statsManager, sessionManager, codec, channellingAPI, users)) + + // Simple room handler. + r.HandleFunc("/{room}", httputils.MakeGzipHandler(roomHandler)) + // Map everything else to a room when it is a GET. rooms := r.PathPrefix("/").Methods("GET").Subrouter() rooms.HandleFunc("/{room:.*}", httputils.MakeGzipHandler(roomHandler))