Browse Source

Added to and from userid to sink/pipeline API.

pull/272/head
Simon Eisenmann 9 years ago
parent
commit
d86e805d94
  1. 1
      go/channelling/api/api.go
  2. 7
      go/channelling/bus.go
  3. 6
      go/channelling/bus_manager.go
  4. 1
      go/channelling/data.go
  5. 88
      go/channelling/pipeline.go
  6. 2
      go/channelling/pipeline_manager.go
  7. 2
      go/channelling/sink.go

1
go/channelling/api/api.go

@ -124,6 +124,7 @@ func (api *channellingAPI) OnIncoming(sender channelling.Sender, session *channe
break break
} }
if _, ok := msg.Answer.Answer["_token"]; !ok { if _, ok := msg.Answer.Answer["_token"]; !ok {
pipeline = api.PipelineManager.GetPipeline(channelling.PipelineNamespaceCall, sender, session, msg.Answer.To)
// Trigger answer event when answer has no token. so this is // Trigger answer event when answer has no token. so this is
// not triggered for peerxfer and peerscreenshare answers. // not triggered for peerxfer and peerscreenshare answers.
api.BusManager.Trigger(channelling.BusManagerAnswer, session.Id, msg.Answer.To, nil, pipeline) api.BusManager.Trigger(channelling.BusManagerAnswer, session.Id, msg.Answer.To, nil, pipeline)

7
go/channelling/bus.go

@ -31,3 +31,10 @@ type DataSink struct {
SubjectOut string `json:subject_out"` SubjectOut string `json:subject_out"`
SubjectIn string `json:subject_in"` SubjectIn string `json:subject_in"`
} }
type DataSinkOutgoing struct {
Outgoing *DataOutgoing
ToUserid string
FromUserid string
Pipe string `json:",omitempty"`
}

6
go/channelling/bus_manager.go

@ -238,7 +238,7 @@ type natsSink struct {
SubjectOut string SubjectOut string
SubjectIn string SubjectIn string
sub *nats.Subscription sub *nats.Subscription
sendQueue chan *DataOutgoing sendQueue chan *DataSinkOutgoing
} }
func newNatsSink(bm BusManager, id string) *natsSink { func newNatsSink(bm BusManager, id string) *natsSink {
@ -249,13 +249,13 @@ func newNatsSink(bm BusManager, id string) *natsSink {
SubjectIn: bm.PrefixSubject(fmt.Sprintf("sink.%s.in", id)), SubjectIn: bm.PrefixSubject(fmt.Sprintf("sink.%s.in", id)),
} }
sink.sendQueue = make(chan *DataOutgoing, 100) sink.sendQueue = make(chan *DataSinkOutgoing, 100)
bm.BindSendChan(sink.SubjectOut, sink.sendQueue) bm.BindSendChan(sink.SubjectOut, sink.sendQueue)
return sink return sink
} }
func (sink *natsSink) Write(outgoing *DataOutgoing) (err error) { func (sink *natsSink) Write(outgoing *DataSinkOutgoing) (err error) {
if sink.Enabled() { if sink.Enabled() {
log.Println("Sending via NATS sink", sink.SubjectOut, outgoing) log.Println("Sending via NATS sink", sink.SubjectOut, outgoing)
sink.sendQueue <- outgoing sink.sendQueue <- outgoing

1
go/channelling/data.go

@ -203,7 +203,6 @@ type DataOutgoing struct {
To string `json:",omitempty"` To string `json:",omitempty"`
Iid string `json:",omitempty"` Iid string `json:",omitempty"`
A string `json:",omitempty"` A string `json:",omitempty"`
Pipe string `json:",omitempty"`
} }
type DataSessions struct { type DataSessions struct {

88
go/channelling/pipeline.go

@ -45,9 +45,10 @@ type Pipeline struct {
from *Session from *Session
to *Session to *Session
expires *time.Time expires *time.Time
data []*DataOutgoing data []*DataSinkOutgoing
sink Sink sink Sink
recvQueue chan *DataIncoming recvQueue chan *DataIncoming
closed bool
} }
func NewPipeline(manager PipelineManager, func NewPipeline(manager PipelineManager,
@ -68,8 +69,7 @@ func NewPipeline(manager PipelineManager,
} }
func (pipeline *Pipeline) receive() { func (pipeline *Pipeline) receive() {
// XXX(longsleep): Make sure this does not leak go routines. // TODO(longsleep): Call to ToSession() should be avoided because it locks.
// XXX(longsleep): Call to ToSession() should be avoided because it locks.
api := pipeline.PipelineManager.GetChannellingAPI() api := pipeline.PipelineManager.GetChannellingAPI()
for data := range pipeline.recvQueue { for data := range pipeline.recvQueue {
_, err := api.OnIncoming(nil, pipeline.ToSession(), data) _, err := api.OnIncoming(nil, pipeline.ToSession(), data)
@ -78,7 +78,7 @@ func (pipeline *Pipeline) receive() {
log.Println("Pipeline receive incoming error", err) log.Println("Pipeline receive incoming error", err)
} }
} }
log.Println("Pipline receive exit") log.Println("Pipeline receive done")
} }
func (pipeline *Pipeline) GetID() string { func (pipeline *Pipeline) GetID() string {
@ -87,15 +87,20 @@ func (pipeline *Pipeline) GetID() string {
func (pipeline *Pipeline) Refresh(duration time.Duration) { func (pipeline *Pipeline) Refresh(duration time.Duration) {
pipeline.mutex.Lock() pipeline.mutex.Lock()
pipeline.refresh(duration)
pipeline.mutex.Unlock()
}
func (pipeline *Pipeline) refresh(duration time.Duration) {
expiration := time.Now().Add(duration) expiration := time.Now().Add(duration)
pipeline.expires = &expiration pipeline.expires = &expiration
pipeline.mutex.Unlock()
} }
func (pipeline *Pipeline) Add(msg *DataOutgoing) *Pipeline { func (pipeline *Pipeline) Add(msg *DataSinkOutgoing) *Pipeline {
msg.Pipe = pipeline.id msg.Pipe = pipeline.id
pipeline.mutex.Lock() pipeline.mutex.Lock()
pipeline.data = append(pipeline.data, msg) pipeline.data = append(pipeline.data, msg)
pipeline.refresh(30 * time.Second)
pipeline.mutex.Unlock() pipeline.mutex.Unlock()
return pipeline return pipeline
@ -111,9 +116,14 @@ func (pipeline *Pipeline) Index() uint64 {
func (pipeline *Pipeline) Close() { func (pipeline *Pipeline) Close() {
pipeline.mutex.Lock() pipeline.mutex.Lock()
pipeline.expires = nil if !pipeline.closed {
if pipeline.sink != nil { pipeline.expires = nil
pipeline.sink = nil if pipeline.sink != nil {
pipeline.sink = nil
}
close(pipeline.recvQueue)
pipeline.closed = true
log.Println("Closed pipeline")
} }
pipeline.mutex.Unlock() pipeline.mutex.Unlock()
} }
@ -154,7 +164,7 @@ func (pipeline *Pipeline) JSONFeed(since, limit int) ([]byte, error) {
for seq, msg := range data { for seq, msg := range data {
line = &PipelineFeedLine{ line = &PipelineFeedLine{
Seq: seq + since, Seq: seq + since,
Msg: msg, Msg: msg.Outgoing,
} }
lineRaw, err = json.Marshal(line) lineRaw, err = json.Marshal(line)
if err != nil { if err != nil {
@ -176,33 +186,55 @@ func (pipeline *Pipeline) JSONFeed(since, limit int) ([]byte, error) {
func (pipeline *Pipeline) FlushOutgoing(hub Hub, client *Client, to string, outgoing *DataOutgoing) bool { func (pipeline *Pipeline) FlushOutgoing(hub Hub, client *Client, to string, outgoing *DataOutgoing) bool {
//log.Println("Flush outgoing via pipeline", to, client == nil) //log.Println("Flush outgoing via pipeline", to, client == nil)
if client == nil { if client == nil {
pipeline.Add(outgoing) sinkOutgoing := &DataSinkOutgoing{
Outgoing: outgoing,
}
pipeline.mutex.Lock() pipeline.mutex.Lock()
sink := pipeline.sink sink := pipeline.sink
if sink != nil && sink.Enabled() { toSession := pipeline.to
// Sink it. fromSession := pipeline.from
pipeline.mutex.Unlock()
sink.Write(outgoing)
return true
}
var session *Session for {
sink, session = pipeline.PipelineManager.FindSinkAndSession(to) if sink != nil && sink.Enabled() {
if sink != nil { // Sink it.
pipeline.to = session
err := pipeline.attach(sink)
if err == nil {
pipeline.mutex.Unlock() pipeline.mutex.Unlock()
break
}
// Create incoming receiver. sink, toSession = pipeline.PipelineManager.FindSinkAndSession(to)
sink.BindRecvChan(pipeline.recvQueue) if sink != nil {
pipeline.to = toSession
err := pipeline.attach(sink)
if err == nil {
pipeline.mutex.Unlock()
sink.Write(outgoing) // Create incoming receiver.
return true sink.BindRecvChan(pipeline.recvQueue)
// Sink it.
break
}
} }
// Not pipelined, do nothing.
pipeline.mutex.Unlock()
break
}
if fromSession != nil {
sinkOutgoing.FromUserid = fromSession.Userid()
}
if toSession != nil {
sinkOutgoing.ToUserid = toSession.Userid()
}
pipeline.Add(sinkOutgoing)
if sink != nil {
// Pipelined, sink data.
sink.Write(sinkOutgoing)
return true
} }
pipeline.mutex.Unlock()
} }
return false return false

2
go/channelling/pipeline_manager.go

@ -65,7 +65,7 @@ func NewPipelineManager(busManager BusManager, sessionStore SessionStore, userSt
sessionTable: make(map[string]*Session), sessionTable: make(map[string]*Session),
sessionByBusIDTable: make(map[string]*Session), sessionByBusIDTable: make(map[string]*Session),
sessionSinkTable: make(map[string]Sink), sessionSinkTable: make(map[string]Sink),
duration: 30 * time.Minute, duration: 60 * time.Second,
} }
plm.start() plm.start()

2
go/channelling/sink.go

@ -29,7 +29,7 @@ import (
// getting attached to a Pipeline. // getting attached to a Pipeline.
type Sink interface { type Sink interface {
// Write sends outgoing data on the sink // Write sends outgoing data on the sink
Write(*DataOutgoing) error Write(*DataSinkOutgoing) error
Enabled() bool Enabled() bool
Close() Close()
Export() *DataSink Export() *DataSink

Loading…
Cancel
Save