From f9906b5973948cc00fe86f81538334afd969dca0 Mon Sep 17 00:00:00 2001 From: Joachim Bauch Date: Wed, 1 Jun 2016 14:01:37 +0200 Subject: [PATCH] Evaluate room type mapping and send Conference messages for conference rooms. The messages are trigger whenever a new user joins a conference room. For such rooms, clientside Conference messages are ignored. --- go/channelling/api.go | 1 + go/channelling/api/api.go | 7 +++++++ go/channelling/api/handle_conference.go | 5 +++++ go/channelling/api/handle_hello.go | 17 +++++++++++++++++ go/channelling/client.go | 4 +++- go/channelling/pipeline.go | 4 +++- go/channelling/room_manager.go | 17 ++++++++++++++++- go/channelling/roomworker.go | 5 +++++ go/channelling/server/pipelines.go | 4 +++- 9 files changed, 60 insertions(+), 4 deletions(-) diff --git a/go/channelling/api.go b/go/channelling/api.go index 729c96ec..aa14b79b 100644 --- a/go/channelling/api.go +++ b/go/channelling/api.go @@ -25,6 +25,7 @@ type ChannellingAPI interface { OnConnect(*Client, *Session) (interface{}, error) OnDisconnect(*Client, *Session) OnIncoming(Sender, *Session, *DataIncoming) (interface{}, error) + OnIncomingProcessed(Sender, *Session, *DataIncoming, interface{}, error) } type ChannellingAPIConsumer interface { diff --git a/go/channelling/api/api.go b/go/channelling/api/api.go index f843568e..b70df1f4 100644 --- a/go/channelling/api/api.go +++ b/go/channelling/api/api.go @@ -194,3 +194,10 @@ func (api *channellingAPI) OnIncoming(sender channelling.Sender, session *channe return nil, nil } + +func (api *channellingAPI) OnIncomingProcessed(sender channelling.Sender, session *channelling.Session, msg *channelling.DataIncoming, reply interface{}, err error) { + switch msg.Type { + case "Hello": + api.HelloProcessed(sender, session, msg, reply, err) + } +} diff --git a/go/channelling/api/handle_conference.go b/go/channelling/api/handle_conference.go index f913d1d6..df9933a8 100644 --- a/go/channelling/api/handle_conference.go +++ b/go/channelling/api/handle_conference.go @@ -28,6 +28,11 @@ import ( ) func (api *channellingAPI) HandleConference(session *channelling.Session, conference *channelling.DataConference) { + if room, ok := api.RoomStatusManager.Get(session.Roomid); ok && room.GetType() == "Conference" { + log.Println("Refusing client-side conference update for server-managed conferences.") + return + } + // Check conference maximum size. if len(conference.Conference) > maxConferenceSize { log.Println("Refusing to create conference above limit.", len(conference.Conference)) diff --git a/go/channelling/api/handle_hello.go b/go/channelling/api/handle_hello.go index aabd1307..e1e1b66e 100644 --- a/go/channelling/api/handle_hello.go +++ b/go/channelling/api/handle_hello.go @@ -46,3 +46,20 @@ func (api *channellingAPI) HandleHello(session *channelling.Session, hello *chan Users: api.RoomStatusManager.RoomUsers(session), }, nil } + +func (api *channellingAPI) HelloProcessed(sender channelling.Sender, session *channelling.Session, msg *channelling.DataIncoming, reply interface{}, err error) { + // If user joined a server-managed conference room, send list of session ids to all participants. + if room, ok := api.RoomStatusManager.Get(session.Roomid); ok && room.GetType() == "Conference" { + if sessionids := room.SessionIDs(); len(sessionids) > 1 { + cid := session.Roomid + session.Broadcaster.Broadcast("", session.Roomid, &channelling.DataOutgoing{ + To: cid, + Data: &channelling.DataConference{ + Type: "Conference", + Id: cid, + Conference: sessionids, + }, + }) + } + } +} diff --git a/go/channelling/client.go b/go/channelling/client.go index f845bb37..c9b604a5 100644 --- a/go/channelling/client.go +++ b/go/channelling/client.go @@ -68,11 +68,13 @@ func (client *Client) OnText(b buffercache.Buffer) { return } - if reply, err := client.ChannellingAPI.OnIncoming(client, client.session, incoming); err != nil { + var reply interface{} + if reply, err = client.ChannellingAPI.OnIncoming(client, client.session, incoming); err != nil { client.reply(incoming.Iid, err) } else if reply != nil { client.reply(incoming.Iid, reply) } + client.ChannellingAPI.OnIncomingProcessed(client, client.session, incoming, reply, err) } func (client *Client) reply(iid string, m interface{}) { diff --git a/go/channelling/pipeline.go b/go/channelling/pipeline.go index fdcccade..6f57303c 100644 --- a/go/channelling/pipeline.go +++ b/go/channelling/pipeline.go @@ -72,11 +72,13 @@ func (pipeline *Pipeline) receive() { // TODO(longsleep): Call to ToSession() should be avoided because it locks. api := pipeline.PipelineManager.GetChannellingAPI() for data := range pipeline.recvQueue { - _, err := api.OnIncoming(nil, pipeline.ToSession(), data) + session := pipeline.ToSession() + reply, err := api.OnIncoming(nil, session, data) if err != nil { // TODO(longsleep): Handle reply and error. log.Println("Pipeline receive incoming error", err) } + api.OnIncomingProcessed(nil, session, data, reply, err) } log.Println("Pipeline receive done") } diff --git a/go/channelling/room_manager.go b/go/channelling/room_manager.go index c2063adc..da17fa60 100644 --- a/go/channelling/room_manager.go +++ b/go/channelling/room_manager.go @@ -33,6 +33,7 @@ type RoomStatusManager interface { LeaveRoom(roomID, sessionID string) UpdateRoom(*Session, *DataRoom) (*DataRoom, error) MakeRoomID(roomName, roomType string) string + Get(roomID string) (room RoomWorker, ok bool) } type Broadcaster interface { @@ -169,6 +170,10 @@ func (rooms *roomManager) GetOrCreate(roomID, roomName, roomType string, credent return room, nil } + if roomType == "" { + roomType = rooms.getConfiguredRoomType(roomName) + } + rooms.Lock() // Need to re-check, another thread might have created the room // while we waited for the lock. @@ -214,8 +219,18 @@ func (rooms *roomManager) GlobalUsers() []*roomUser { func (rooms *roomManager) MakeRoomID(roomName, roomType string) string { if roomType == "" { - roomType = rooms.RoomTypeDefault + roomType = rooms.getConfiguredRoomType(roomName) } return fmt.Sprintf("%s:%s", roomType, roomName) } + +func (rooms *roomManager) getConfiguredRoomType(roomName string) string { + for re, roomType := range rooms.RoomTypes { + if re.MatchString(roomName) { + return roomType + } + } + + return rooms.RoomTypeDefault +} diff --git a/go/channelling/roomworker.go b/go/channelling/roomworker.go index d991ca24..ef504e8d 100644 --- a/go/channelling/roomworker.go +++ b/go/channelling/roomworker.go @@ -45,6 +45,7 @@ type RoomWorker interface { Broadcast(sessionID string, buf buffercache.Buffer) Join(*DataRoomCredentials, *Session, Sender) (*DataRoom, error) Leave(sessionID string) + GetType() string } type roomWorker struct { @@ -146,6 +147,10 @@ func (r *roomWorker) Users() []*roomUser { return users } +func (r *roomWorker) GetType() string { + return r.Type +} + func (r *roomWorker) Run(f func()) bool { select { case r.workers <- f: diff --git a/go/channelling/server/pipelines.go b/go/channelling/server/pipelines.go index 869372f3..3e3ec95c 100644 --- a/go/channelling/server/pipelines.go +++ b/go/channelling/server/pipelines.go @@ -87,12 +87,14 @@ func (pipelines *Pipelines) Post(request *http.Request) (int, interface{}, http. From: pipeline.FromSession().Id, Iid: incoming.Iid, } - reply, err := pipelines.API.OnIncoming(pipeline, pipeline.ToSession(), &incoming) + session := pipeline.ToSession() + reply, err := pipelines.API.OnIncoming(pipeline, session, &incoming) if err == nil { result.Data = reply } else { result.Data = err } + pipelines.API.OnIncomingProcessed(pipeline, session, &incoming, reply, err) return http.StatusOK, result, nil }