diff --git a/go/channelling/room_manager.go b/go/channelling/room_manager.go index da17fa60..d72a8834 100644 --- a/go/channelling/room_manager.go +++ b/go/channelling/room_manager.go @@ -25,6 +25,8 @@ import ( "fmt" "log" "sync" + + "github.com/nats-io/nats" ) type RoomStatusManager interface { @@ -48,15 +50,24 @@ type RoomManager interface { RoomStatusManager Broadcaster RoomStats + SetBusManager(bus BusManager) error } type roomManager struct { sync.RWMutex *Config OutgoingEncoder - roomTable map[string]RoomWorker - globalRoomID string - defaultRoomID string + BusManager + roomTypeSubscription *nats.Subscription + roomTable map[string]RoomWorker + roomTypes map[string]string + globalRoomID string + defaultRoomID string +} + +type roomTypeMessage struct { + Path string `json:"path"` + Type string `json:"type"` } func NewRoomManager(config *Config, encoder OutgoingEncoder) RoomManager { @@ -65,6 +76,7 @@ func NewRoomManager(config *Config, encoder OutgoingEncoder) RoomManager { Config: config, OutgoingEncoder: encoder, roomTable: make(map[string]RoomWorker), + roomTypes: make(map[string]string), } if config.GlobalRoomID != "" { rm.globalRoomID = rm.MakeRoomID(config.GlobalRoomID, "") @@ -73,6 +85,36 @@ func NewRoomManager(config *Config, encoder OutgoingEncoder) RoomManager { return rm } +func (rooms *roomManager) SetBusManager(BusManager BusManager) error { + if rooms.roomTypeSubscription != nil { + rooms.roomTypeSubscription.Unsubscribe() + rooms.roomTypeSubscription = nil + } + rooms.BusManager = BusManager + if rooms.BusManager != nil { + sub, err := rooms.Subscribe("channelling.config.roomtype", rooms.setNatsRoomType) + if err != nil { + return err + } + rooms.roomTypeSubscription = sub + } + return nil +} + +func (rooms *roomManager) setNatsRoomType(msg *roomTypeMessage) { + if msg == nil { + return + } + + if msg.Type != "" { + log.Printf("Setting room type for %s to %s\n", msg.Path, msg.Type) + rooms.roomTypes[msg.Path] = msg.Type + } else { + log.Printf("Clearing room type for %s\n", msg.Path) + delete(rooms.roomTypes, msg.Path) + } +} + func (rooms *roomManager) RoomUsers(session *Session) []*DataSession { if room, ok := rooms.Get(session.Roomid); ok { return room.GetUsers() @@ -226,6 +268,11 @@ func (rooms *roomManager) MakeRoomID(roomName, roomType string) string { } func (rooms *roomManager) getConfiguredRoomType(roomName string) string { + if roomType, found := rooms.roomTypes[roomName]; found { + // Type of this room was overwritten through NATS. + return roomType + } + for re, roomType := range rooms.RoomTypes { if re.MatchString(roomName) { return roomType diff --git a/go/channelling/room_manager_test.go b/go/channelling/room_manager_test.go index 28ac9e69..579b33fd 100644 --- a/go/channelling/room_manager_test.go +++ b/go/channelling/room_manager_test.go @@ -100,3 +100,19 @@ func Test_RoomManager_UpdateRoom_ReturnsACorrectlyTypedDocument(t *testing.T) { t.Errorf("Expected document type to be %s, but was %v", channelling.RoomTypeRoom, room.Type) } } + +func Test_RoomManager_TypeThroughNats(t *testing.T) { + theRoomManager, _ := NewTestRoomManager() + rm := theRoomManager.(*roomManager) + if rt := rm.getConfiguredRoomType("foo"); rt != channelling.RoomTypeRoom { + t.Errorf("Expected room type to be %s, but was %v", channelling.RoomTypeRoom, rt) + } + rm.setNatsRoomType(&roomTypeMessage{Path: "foo", Type: "Conference"}) + if rt := rm.getConfiguredRoomType("foo"); rt != "Conference" { + t.Errorf("Expected room type to be %s, but was %v", "Conference", rt) + } + rm.setNatsRoomType(&roomTypeMessage{Path: "foo", Type: ""}) + if rt := rm.getConfiguredRoomType("foo"); rt != channelling.RoomTypeRoom { + t.Errorf("Expected room type to be %s, but was %v", channelling.RoomTypeRoom, rt) + } +} diff --git a/src/app/spreed-webrtc-server/main.go b/src/app/spreed-webrtc-server/main.go index 07286e9f..4051bfba 100644 --- a/src/app/spreed-webrtc-server/main.go +++ b/src/app/spreed-webrtc-server/main.go @@ -276,6 +276,9 @@ func runner(runtime phoenix.Runtime) error { statsManager := channelling.NewStatsManager(hub, roomManager, sessionManager) busManager := channelling.NewBusManager(apiConsumer, natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject) pipelineManager := channelling.NewPipelineManager(busManager, sessionManager, sessionManager, sessionManager) + if err := roomManager.SetBusManager(busManager); err != nil { + return err + } // Create API. channellingAPI := api.New(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager, pipelineManager)