diff --git a/.gitignore b/.gitignore
index 26c3fc79..65082f81 100644
--- a/.gitignore
+++ b/.gitignore
@@ -4,7 +4,7 @@
/src/github.com
/src/golang.struktur.de
/*.pprof
-/server.conf
+/server.conf*
/*.log
/changelog*.txt
/src/styles/.sass-cache
diff --git a/go/channelling/bus.go b/go/channelling/bus.go
new file mode 100644
index 00000000..47cc0c5d
--- /dev/null
+++ b/go/channelling/bus.go
@@ -0,0 +1,28 @@
+/*
+ * Spreed WebRTC.
+ * Copyright (C) 2013-2015 struktur AG
+ *
+ * This file is part of Spreed WebRTC.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+package channelling
+
+type SessionCreateRequest struct {
+ Id string
+ Session *DataSession
+ Room *DataRoom
+}
diff --git a/go/channelling/pipeline.go b/go/channelling/pipeline.go
index 611849ba..fdf70e44 100644
--- a/go/channelling/pipeline.go
+++ b/go/channelling/pipeline.go
@@ -83,7 +83,13 @@ func (pipeline *Pipeline) Add(msg *DataOutgoing) *Pipeline {
}
func (pipeline *Pipeline) Send(b buffercache.Buffer) {
- // noop for now
+ pipeline.mutex.RLock()
+ sink := pipeline.sink
+ pipeline.mutex.RUnlock()
+ if sink != nil {
+ // Send it through sink.
+ sink.Send(b)
+ }
}
func (pipeline *Pipeline) Index() uint64 {
diff --git a/go/channelling/pipeline_manager.go b/go/channelling/pipeline_manager.go
index 00f17539..a078de4e 100644
--- a/go/channelling/pipeline_manager.go
+++ b/go/channelling/pipeline_manager.go
@@ -36,6 +36,7 @@ type PipelineManager interface {
BusManager
SessionStore
UserStore
+ SessionCreator
GetPipelineByID(id string) (pipeline *Pipeline, ok bool)
GetPipeline(namespace string, sender Sender, session *Session, to string) *Pipeline
}
@@ -44,29 +45,37 @@ type pipelineManager struct {
BusManager
SessionStore
UserStore
- mutex sync.RWMutex
- pipelines map[string]*Pipeline
- duration time.Duration
+ SessionCreator
+ mutex sync.RWMutex
+ pipelineTable map[string]*Pipeline
+ sessionTable map[string]*Session
+ duration time.Duration
}
-func NewPipelineManager(busManager BusManager, sessionStore SessionStore, userStore UserStore) PipelineManager {
+func NewPipelineManager(busManager BusManager, sessionStore SessionStore, userStore UserStore, sessionCreator SessionCreator) PipelineManager {
plm := &pipelineManager{
- BusManager: busManager,
- SessionStore: sessionStore,
- UserStore: userStore,
- pipelines: make(map[string]*Pipeline),
- duration: 30 * time.Minute,
+ BusManager: busManager,
+ SessionStore: sessionStore,
+ UserStore: userStore,
+ SessionCreator: sessionCreator,
+ pipelineTable: make(map[string]*Pipeline),
+ sessionTable: make(map[string]*Session),
+ duration: 30 * time.Minute,
}
plm.start()
+
+ plm.Subscribe("channelling.session.create", plm.sessionCreate)
+ plm.Subscribe("channelling.session.close", plm.sessionClose)
+
return plm
}
func (plm *pipelineManager) cleanup() {
plm.mutex.Lock()
- for id, pipeline := range plm.pipelines {
+ for id, pipeline := range plm.pipelineTable {
if pipeline.Expired() {
pipeline.Close()
- delete(plm.pipelines, id)
+ delete(plm.pipelineTable, id)
}
}
plm.mutex.Unlock()
@@ -81,12 +90,57 @@ func (plm *pipelineManager) start() {
}()
}
+func (plm *pipelineManager) sessionCreate(subject, reply string, msg *SessionCreateRequest) {
+ log.Println("sessionCreate via NATS", subject, reply, msg)
+
+ if msg.Session == nil || msg.Id == "" {
+ return
+ }
+
+ plm.mutex.Lock()
+ session, ok := plm.sessionTable[msg.Id]
+ if ok {
+ // Remove existing session with same ID.
+ session.Close()
+ }
+ session = plm.CreateSession(nil, "")
+ plm.sessionTable[msg.Id] = session
+ plm.mutex.Unlock()
+
+ session.Status = msg.Session.Status
+ session.SetUseridFake(msg.Session.Userid)
+ pipeline := plm.GetPipeline("", nil, session, "")
+
+ if msg.Room != nil {
+ room, err := session.JoinRoom(msg.Room.Name, msg.Room.Type, msg.Room.Credentials, pipeline)
+ log.Println("Joined NATS session to room", room, err)
+ }
+
+ session.BroadcastStatus()
+}
+
+func (plm *pipelineManager) sessionClose(subject, reply string, id string) {
+ log.Println("sessionClose via NATS", subject, reply, id)
+
+ if id == "" {
+ return
+ }
+
+ plm.mutex.Lock()
+ session, ok := plm.sessionTable[id]
+ plm.mutex.Unlock()
+
+ if ok {
+ session.Close()
+ }
+}
+
func (plm *pipelineManager) GetPipelineByID(id string) (*Pipeline, bool) {
plm.mutex.RLock()
- pipeline, ok := plm.pipelines[id]
+ pipeline, ok := plm.pipelineTable[id]
if !ok {
// XXX(longsleep): Hack for development
- for _, pipeline = range plm.pipelines {
+ for _, pipeline = range plm.pipelineTable {
ok = true
break
}
@@ -103,7 +157,7 @@ func (plm *pipelineManager) GetPipeline(namespace string, sender Sender, session
id := plm.PipelineID(namespace, sender, session, to)
plm.mutex.Lock()
- pipeline, ok := plm.pipelines[id]
+ pipeline, ok := plm.pipelineTable[id]
if ok {
// Refresh. We do not care if the pipeline is expired.
pipeline.Refresh(plm.duration)
@@ -113,7 +167,7 @@ func (plm *pipelineManager) GetPipeline(namespace string, sender Sender, session
log.Println("Creating pipeline", namespace, id)
pipeline = NewPipeline(plm, namespace, id, session, plm.duration)
- plm.pipelines[id] = pipeline
+ plm.pipelineTable[id] = pipeline
plm.mutex.Unlock()
return pipeline
diff --git a/go/channelling/session_manager.go b/go/channelling/session_manager.go
index 78a1c874..e65be9a7 100644
--- a/go/channelling/session_manager.go
+++ b/go/channelling/session_manager.go
@@ -37,7 +37,7 @@ type SessionManager interface {
UserStats
SessionStore
UserStore
- CreateSession(st *SessionToken, userid string) *Session
+ SessionCreator
DestroySession(sessionID, userID string)
Authenticate(*Session, *SessionToken, string) error
GetUserSessions(session *Session, id string) []*DataSession
@@ -115,6 +115,9 @@ func (sessionManager *sessionManager) GetUser(id string) (*User, bool) {
}
func (sessionManager *sessionManager) CreateSession(st *SessionToken, userid string) *Session {
+ if st == nil {
+ st = sessionManager.DecodeSessionToken("")
+ }
session := NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid)
if userid != "" {
diff --git a/go/channelling/sessioncreator.go b/go/channelling/sessioncreator.go
new file mode 100644
index 00000000..baa28478
--- /dev/null
+++ b/go/channelling/sessioncreator.go
@@ -0,0 +1,26 @@
+/*
+ * Spreed WebRTC.
+ * Copyright (C) 2013-2015 struktur AG
+ *
+ * This file is part of Spreed WebRTC.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ *
+ */
+
+package channelling
+
+type SessionCreator interface {
+ CreateSession(st *SessionToken, userid string) *Session
+}
diff --git a/go/channelling/sink.go b/go/channelling/sink.go
index c2258ff8..0da8611a 100644
--- a/go/channelling/sink.go
+++ b/go/channelling/sink.go
@@ -21,12 +21,15 @@
package channelling
-import ()
+import (
+ "github.com/strukturag/spreed-webrtc/go/buffercache"
+)
// Sink connects a Pipeline with end points in both directions by
// getting attached to a Pipeline.
type Sink interface {
// Write sends outgoing data on the sink from the
Write(interface{})
+ Send(buffercache.Buffer)
Close()
}
diff --git a/src/app/spreed-webrtc-server/main.go b/src/app/spreed-webrtc-server/main.go
index cd5c00a9..2149a8a0 100644
--- a/src/app/spreed-webrtc-server/main.go
+++ b/src/app/spreed-webrtc-server/main.go
@@ -266,7 +266,7 @@ func runner(runtime phoenix.Runtime) error {
sessionManager := channelling.NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret)
statsManager := channelling.NewStatsManager(hub, roomManager, sessionManager)
busManager := channelling.NewBusManager(natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject)
- pipelineManager := channelling.NewPipelineManager(busManager, sessionManager, sessionManager)
+ pipelineManager := channelling.NewPipelineManager(busManager, sessionManager, sessionManager, sessionManager)
channellingAPI := api.New(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager, pipelineManager)
// Add handlers.