Browse Source

Implement sessionCreate via NATS.

pull/272/head
Simon Eisenmann 10 years ago
parent
commit
9ff675bca9
  1. 2
      .gitignore
  2. 28
      go/channelling/bus.go
  3. 8
      go/channelling/pipeline.go
  4. 84
      go/channelling/pipeline_manager.go
  5. 5
      go/channelling/session_manager.go
  6. 26
      go/channelling/sessioncreator.go
  7. 5
      go/channelling/sink.go
  8. 2
      src/app/spreed-webrtc-server/main.go

2
.gitignore vendored

@ -4,7 +4,7 @@ @@ -4,7 +4,7 @@
/src/github.com
/src/golang.struktur.de
/*.pprof
/server.conf
/server.conf*
/*.log
/changelog*.txt
/src/styles/.sass-cache

28
go/channelling/bus.go

@ -0,0 +1,28 @@ @@ -0,0 +1,28 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
type SessionCreateRequest struct {
Id string
Session *DataSession
Room *DataRoom
}

8
go/channelling/pipeline.go

@ -83,7 +83,13 @@ func (pipeline *Pipeline) Add(msg *DataOutgoing) *Pipeline { @@ -83,7 +83,13 @@ func (pipeline *Pipeline) Add(msg *DataOutgoing) *Pipeline {
}
func (pipeline *Pipeline) Send(b buffercache.Buffer) {
// noop for now
pipeline.mutex.RLock()
sink := pipeline.sink
pipeline.mutex.RUnlock()
if sink != nil {
// Send it through sink.
sink.Send(b)
}
}
func (pipeline *Pipeline) Index() uint64 {

84
go/channelling/pipeline_manager.go

@ -36,6 +36,7 @@ type PipelineManager interface { @@ -36,6 +36,7 @@ type PipelineManager interface {
BusManager
SessionStore
UserStore
SessionCreator
GetPipelineByID(id string) (pipeline *Pipeline, ok bool)
GetPipeline(namespace string, sender Sender, session *Session, to string) *Pipeline
}
@ -44,29 +45,37 @@ type pipelineManager struct { @@ -44,29 +45,37 @@ type pipelineManager struct {
BusManager
SessionStore
UserStore
mutex sync.RWMutex
pipelines map[string]*Pipeline
duration time.Duration
SessionCreator
mutex sync.RWMutex
pipelineTable map[string]*Pipeline
sessionTable map[string]*Session
duration time.Duration
}
func NewPipelineManager(busManager BusManager, sessionStore SessionStore, userStore UserStore) PipelineManager {
func NewPipelineManager(busManager BusManager, sessionStore SessionStore, userStore UserStore, sessionCreator SessionCreator) PipelineManager {
plm := &pipelineManager{
BusManager: busManager,
SessionStore: sessionStore,
UserStore: userStore,
pipelines: make(map[string]*Pipeline),
duration: 30 * time.Minute,
BusManager: busManager,
SessionStore: sessionStore,
UserStore: userStore,
SessionCreator: sessionCreator,
pipelineTable: make(map[string]*Pipeline),
sessionTable: make(map[string]*Session),
duration: 30 * time.Minute,
}
plm.start()
plm.Subscribe("channelling.session.create", plm.sessionCreate)
plm.Subscribe("channelling.session.close", plm.sessionClose)
return plm
}
func (plm *pipelineManager) cleanup() {
plm.mutex.Lock()
for id, pipeline := range plm.pipelines {
for id, pipeline := range plm.pipelineTable {
if pipeline.Expired() {
pipeline.Close()
delete(plm.pipelines, id)
delete(plm.pipelineTable, id)
}
}
plm.mutex.Unlock()
@ -81,12 +90,57 @@ func (plm *pipelineManager) start() { @@ -81,12 +90,57 @@ func (plm *pipelineManager) start() {
}()
}
func (plm *pipelineManager) sessionCreate(subject, reply string, msg *SessionCreateRequest) {
log.Println("sessionCreate via NATS", subject, reply, msg)
if msg.Session == nil || msg.Id == "" {
return
}
plm.mutex.Lock()
session, ok := plm.sessionTable[msg.Id]
if ok {
// Remove existing session with same ID.
session.Close()
}
session = plm.CreateSession(nil, "")
plm.sessionTable[msg.Id] = session
plm.mutex.Unlock()
session.Status = msg.Session.Status
session.SetUseridFake(msg.Session.Userid)
pipeline := plm.GetPipeline("", nil, session, "")
if msg.Room != nil {
room, err := session.JoinRoom(msg.Room.Name, msg.Room.Type, msg.Room.Credentials, pipeline)
log.Println("Joined NATS session to room", room, err)
}
session.BroadcastStatus()
}
func (plm *pipelineManager) sessionClose(subject, reply string, id string) {
log.Println("sessionClose via NATS", subject, reply, id)
if id == "" {
return
}
plm.mutex.Lock()
session, ok := plm.sessionTable[id]
plm.mutex.Unlock()
if ok {
session.Close()
}
}
func (plm *pipelineManager) GetPipelineByID(id string) (*Pipeline, bool) {
plm.mutex.RLock()
pipeline, ok := plm.pipelines[id]
pipeline, ok := plm.pipelineTable[id]
if !ok {
// XXX(longsleep): Hack for development
for _, pipeline = range plm.pipelines {
for _, pipeline = range plm.pipelineTable {
ok = true
break
}
@ -103,7 +157,7 @@ func (plm *pipelineManager) GetPipeline(namespace string, sender Sender, session @@ -103,7 +157,7 @@ func (plm *pipelineManager) GetPipeline(namespace string, sender Sender, session
id := plm.PipelineID(namespace, sender, session, to)
plm.mutex.Lock()
pipeline, ok := plm.pipelines[id]
pipeline, ok := plm.pipelineTable[id]
if ok {
// Refresh. We do not care if the pipeline is expired.
pipeline.Refresh(plm.duration)
@ -113,7 +167,7 @@ func (plm *pipelineManager) GetPipeline(namespace string, sender Sender, session @@ -113,7 +167,7 @@ func (plm *pipelineManager) GetPipeline(namespace string, sender Sender, session
log.Println("Creating pipeline", namespace, id)
pipeline = NewPipeline(plm, namespace, id, session, plm.duration)
plm.pipelines[id] = pipeline
plm.pipelineTable[id] = pipeline
plm.mutex.Unlock()
return pipeline

5
go/channelling/session_manager.go

@ -37,7 +37,7 @@ type SessionManager interface { @@ -37,7 +37,7 @@ type SessionManager interface {
UserStats
SessionStore
UserStore
CreateSession(st *SessionToken, userid string) *Session
SessionCreator
DestroySession(sessionID, userID string)
Authenticate(*Session, *SessionToken, string) error
GetUserSessions(session *Session, id string) []*DataSession
@ -115,6 +115,9 @@ func (sessionManager *sessionManager) GetUser(id string) (*User, bool) { @@ -115,6 +115,9 @@ func (sessionManager *sessionManager) GetUser(id string) (*User, bool) {
}
func (sessionManager *sessionManager) CreateSession(st *SessionToken, userid string) *Session {
if st == nil {
st = sessionManager.DecodeSessionToken("")
}
session := NewSession(sessionManager, sessionManager.Unicaster, sessionManager.Broadcaster, sessionManager.RoomStatusManager, sessionManager.buddyImages, sessionManager.attestations, st.Id, st.Sid)
if userid != "" {

26
go/channelling/sessioncreator.go

@ -0,0 +1,26 @@ @@ -0,0 +1,26 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
type SessionCreator interface {
CreateSession(st *SessionToken, userid string) *Session
}

5
go/channelling/sink.go

@ -21,12 +21,15 @@ @@ -21,12 +21,15 @@
package channelling
import ()
import (
"github.com/strukturag/spreed-webrtc/go/buffercache"
)
// Sink connects a Pipeline with end points in both directions by
// getting attached to a Pipeline.
type Sink interface {
// Write sends outgoing data on the sink from the
Write(interface{})
Send(buffercache.Buffer)
Close()
}

2
src/app/spreed-webrtc-server/main.go

@ -266,7 +266,7 @@ func runner(runtime phoenix.Runtime) error { @@ -266,7 +266,7 @@ func runner(runtime phoenix.Runtime) error {
sessionManager := channelling.NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret)
statsManager := channelling.NewStatsManager(hub, roomManager, sessionManager)
busManager := channelling.NewBusManager(natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject)
pipelineManager := channelling.NewPipelineManager(busManager, sessionManager, sessionManager)
pipelineManager := channelling.NewPipelineManager(busManager, sessionManager, sessionManager, sessionManager)
channellingAPI := api.New(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager, pipelineManager)
// Add handlers.

Loading…
Cancel
Save