From 9ff675bca9826b11f1922ef6f25e322bbe6f5752 Mon Sep 17 00:00:00 2001 From: Simon Eisenmann Date: Mon, 4 Apr 2016 14:58:17 +0200 Subject: [PATCH] Implement sessionCreate via NATS. --- .gitignore | 2 +- go/channelling/bus.go | 28 ++++++++++ go/channelling/pipeline.go | 8 ++- go/channelling/pipeline_manager.go | 84 +++++++++++++++++++++++----- go/channelling/session_manager.go | 5 +- go/channelling/sessioncreator.go | 26 +++++++++ go/channelling/sink.go | 5 +- src/app/spreed-webrtc-server/main.go | 2 +- 8 files changed, 140 insertions(+), 20 deletions(-) create mode 100644 go/channelling/bus.go create mode 100644 go/channelling/sessioncreator.go diff --git a/.gitignore b/.gitignore index 26c3fc79..65082f81 100644 --- a/.gitignore +++ b/.gitignore @@ -4,7 +4,7 @@ /src/github.com /src/golang.struktur.de /*.pprof -/server.conf +/server.conf* /*.log /changelog*.txt /src/styles/.sass-cache diff --git a/go/channelling/bus.go b/go/channelling/bus.go new file mode 100644 index 00000000..47cc0c5d --- /dev/null +++ b/go/channelling/bus.go @@ -0,0 +1,28 @@ +/* + * Spreed WebRTC. + * Copyright (C) 2013-2015 struktur AG + * + * This file is part of Spreed WebRTC. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package channelling + +type SessionCreateRequest struct { + Id string + Session *DataSession + Room *DataRoom +} diff --git a/go/channelling/pipeline.go b/go/channelling/pipeline.go index 611849ba..fdf70e44 100644 --- a/go/channelling/pipeline.go +++ b/go/channelling/pipeline.go @@ -83,7 +83,13 @@ func (pipeline *Pipeline) Add(msg *DataOutgoing) *Pipeline { } func (pipeline *Pipeline) Send(b buffercache.Buffer) { - // noop for now + pipeline.mutex.RLock() + sink := pipeline.sink + pipeline.mutex.RUnlock() + if sink != nil { + // Send it through sink. + sink.Send(b) + } } func (pipeline *Pipeline) Index() uint64 { diff --git a/go/channelling/pipeline_manager.go b/go/channelling/pipeline_manager.go index 00f17539..a078de4e 100644 --- a/go/channelling/pipeline_manager.go +++ b/go/channelling/pipeline_manager.go @@ -36,6 +36,7 @@ type PipelineManager interface { BusManager SessionStore UserStore + SessionCreator GetPipelineByID(id string) (pipeline *Pipeline, ok bool) GetPipeline(namespace string, sender Sender, session *Session, to string) *Pipeline } @@ -44,29 +45,37 @@ type pipelineManager struct { BusManager SessionStore UserStore - mutex sync.RWMutex - pipelines map[string]*Pipeline - duration time.Duration + SessionCreator + mutex sync.RWMutex + pipelineTable map[string]*Pipeline + sessionTable map[string]*Session + duration time.Duration } -func NewPipelineManager(busManager BusManager, sessionStore SessionStore, userStore UserStore) PipelineManager { +func NewPipelineManager(busManager BusManager, sessionStore SessionStore, userStore UserStore, sessionCreator SessionCreator) PipelineManager { plm := &pipelineManager{ - BusManager: busManager, - SessionStore: sessionStore, - UserStore: userStore, - pipelines: make(map[string]*Pipeline), - duration: 30 * time.Minute, + BusManager: busManager, + SessionStore: sessionStore, + UserStore: userStore, + SessionCreator: sessionCreator, + pipelineTable: make(map[string]*Pipeline), + sessionTable: make(map[string]*Session), + duration: 30 * time.Minute, } plm.start() + + plm.Subscribe("channelling.session.create", plm.sessionCreate) + plm.Subscribe("channelling.session.close", plm.sessionClose) + return plm } func (plm *pipelineManager) cleanup() { plm.mutex.Lock() - for id, pipeline := range plm.pipelines { + for id, pipeline := range plm.pipelineTable { if pipeline.Expired() { pipeline.Close() - delete(plm.pipelines, id) + delete(plm.pipelineTable, id) } } plm.mutex.Unlock() @@ -81,12 +90,57 @@ func (plm *pipelineManager) start() { }() } +func (plm *pipelineManager) sessionCreate(subject, reply string, msg *SessionCreateRequest) { + log.Println("sessionCreate via NATS", subject, reply, msg) + + if msg.Session == nil || msg.Id == "" { + return + } + + plm.mutex.Lock() + session, ok := plm.sessionTable[msg.Id] + if ok { + // Remove existing session with same ID. + session.Close() + } + session = plm.CreateSession(nil, "") + plm.sessionTable[msg.Id] = session + plm.mutex.Unlock() + + session.Status = msg.Session.Status + session.SetUseridFake(msg.Session.Userid) + pipeline := plm.GetPipeline("", nil, session, "") + + if msg.Room != nil { + room, err := session.JoinRoom(msg.Room.Name, msg.Room.Type, msg.Room.Credentials, pipeline) + log.Println("Joined NATS session to room", room, err) + } + + session.BroadcastStatus() +} + +func (plm *pipelineManager) sessionClose(subject, reply string, id string) { + log.Println("sessionClose via NATS", subject, reply, id) + + if id == "" { + return + } + + plm.mutex.Lock() + session, ok := plm.sessionTable[id] + plm.mutex.Unlock() + + if ok { + session.Close() + } +} + func (plm *pipelineManager) GetPipelineByID(id string) (*Pipeline, bool) { plm.mutex.RLock() - pipeline, ok := plm.pipelines[id] + pipeline, ok := plm.pipelineTable[id] if !ok { // XXX(longsleep): Hack for development - for _, pipeline = range plm.pipelines { + for _, pipeline = range plm.pipelineTable { ok = true break } @@ -103,7 +157,7 @@ func (plm *pipelineManager) GetPipeline(namespace string, sender Sender, session id := plm.PipelineID(namespace, sender, session, to) plm.mutex.Lock() - pipeline, ok := plm.pipelines[id] + pipeline, ok := plm.pipelineTable[id] if ok { // Refresh. We do not care if the pipeline is expired. pipeline.Refresh(plm.duration) @@ -113,7 +167,7 @@ func (plm *pipelineManager) GetPipeline(namespace string, sender Sender, session log.Println("Creating pipeline", namespace, id) pipeline = NewPipeline(plm, namespace, id, session, plm.duration) - plm.pipelines[id] = pipeline + plm.pipelineTable[id] = pipeline plm.mutex.Unlock() return pipeline diff --git a/go/channelling/session_manager.go b/go/channelling/session_manager.go index 78a1c874..e65be9a7 100644 --- a/go/channelling/session_manager.go +++ b/go/channelling/session_manager.go @@ -37,7 +37,7 @@ type SessionManager interface { UserStats SessionStore UserStore - CreateSession(st *SessionToken, userid string) *Session + SessionCreator DestroySession(sessionID, userID string) Authenticate(*Session, *SessionToken, string) error GetUserSessions(session *Session, id string) []*DataSession @@ -115,6 +115,9 @@ func (sessionManager *sessionManager) GetUser(id string) (*User, bool) { } func (sessionManager *sessionManager) CreateSession(st *SessionToken, userid string) *Session { + if st == nil { + st = sessionManager.DecodeSessionToken("") + } session := NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid) if userid != "" { diff --git a/go/channelling/sessioncreator.go b/go/channelling/sessioncreator.go new file mode 100644 index 00000000..baa28478 --- /dev/null +++ b/go/channelling/sessioncreator.go @@ -0,0 +1,26 @@ +/* + * Spreed WebRTC. + * Copyright (C) 2013-2015 struktur AG + * + * This file is part of Spreed WebRTC. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package channelling + +type SessionCreator interface { + CreateSession(st *SessionToken, userid string) *Session +} diff --git a/go/channelling/sink.go b/go/channelling/sink.go index c2258ff8..0da8611a 100644 --- a/go/channelling/sink.go +++ b/go/channelling/sink.go @@ -21,12 +21,15 @@ package channelling -import () +import ( + "github.com/strukturag/spreed-webrtc/go/buffercache" +) // Sink connects a Pipeline with end points in both directions by // getting attached to a Pipeline. type Sink interface { // Write sends outgoing data on the sink from the Write(interface{}) + Send(buffercache.Buffer) Close() } diff --git a/src/app/spreed-webrtc-server/main.go b/src/app/spreed-webrtc-server/main.go index cd5c00a9..2149a8a0 100644 --- a/src/app/spreed-webrtc-server/main.go +++ b/src/app/spreed-webrtc-server/main.go @@ -266,7 +266,7 @@ func runner(runtime phoenix.Runtime) error { sessionManager := channelling.NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret) statsManager := channelling.NewStatsManager(hub, roomManager, sessionManager) busManager := channelling.NewBusManager(natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject) - pipelineManager := channelling.NewPipelineManager(busManager, sessionManager, sessionManager) + pipelineManager := channelling.NewPipelineManager(busManager, sessionManager, sessionManager, sessionManager) channellingAPI := api.New(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager, pipelineManager) // Add handlers.