From 2d52d933bcf2c4ce4a9b9b742a1eef69d56b4dbb Mon Sep 17 00:00:00 2001 From: Lance Cooper Date: Thu, 23 Oct 2014 13:21:52 +0200 Subject: [PATCH] Support receiving and broadcasting room update events. --- doc/CHANNELING-API.txt | 17 ++++- src/app/spreed-webrtc-server/channeling.go | 5 ++ .../spreed-webrtc-server/channelling_api.go | 7 ++ .../channelling_api_test.go | 68 +++++++++++++++--- src/app/spreed-webrtc-server/room_manager.go | 38 ++++++---- .../spreed-webrtc-server/room_manager_test.go | 69 +++++++++++++++++++ 6 files changed, 181 insertions(+), 23 deletions(-) create mode 100644 src/app/spreed-webrtc-server/room_manager_test.go diff --git a/doc/CHANNELING-API.txt b/doc/CHANNELING-API.txt index e170b865..2b6a527b 100644 --- a/doc/CHANNELING-API.txt +++ b/doc/CHANNELING-API.txt @@ -204,8 +204,21 @@ Special purpose documents for channling "Name": "room-name-here" } - The Room document is sent in responses to initial joins or when room properties - have been updated. + Clients may send a Room document in order to update all room properties + to the values given in the document. The room name must be given and match + the currently joined room. Successful updates will receive an updated Room + document as a reply, or an Error document if the update fails. + + Addtionally, the Room document is included in responses to initial joins + and broadcast when room properties are updated. + + Keys under Room: + + Name: The human readable ID of the room, currently must be globally unique. + + Error codes: + + not_in_room : Clients may only update rooms which they have joined. Peer connection documents diff --git a/src/app/spreed-webrtc-server/channeling.go b/src/app/spreed-webrtc-server/channeling.go index bf9b5ac9..dd4e8eb1 100644 --- a/src/app/spreed-webrtc-server/channeling.go +++ b/src/app/spreed-webrtc-server/channeling.go @@ -27,6 +27,10 @@ type DataError struct { Message string } +func (err *DataError) Error() string { + return err.Message +} + type DataHello struct { Version string Ua string @@ -176,6 +180,7 @@ type DataIncoming struct { Alive *DataAlive Authentication *DataAuthentication Sessions *DataSessions + Room *DataRoom Iid string `json:",omitempty"` } diff --git a/src/app/spreed-webrtc-server/channelling_api.go b/src/app/spreed-webrtc-server/channelling_api.go index fbe72cb1..2f4c6ded 100644 --- a/src/app/spreed-webrtc-server/channelling_api.go +++ b/src/app/spreed-webrtc-server/channelling_api.go @@ -212,6 +212,13 @@ func (api *channellingAPI) OnIncoming(c ResponseSender, session *Session, msg *D if users != nil { c.Reply(msg.Iid, &DataSessions{Type: "Sessions", Users: users, Sessions: msg.Sessions.Sessions}) } + case "Room": + if room, err := api.UpdateRoom(session, msg.Room); err == nil { + api.Broadcast(session, room) + c.Reply(msg.Iid, room) + } else { + c.Reply(msg.Iid, err) + } default: log.Println("OnText unhandled message type", msg.Type) } diff --git a/src/app/spreed-webrtc-server/channelling_api_test.go b/src/app/spreed-webrtc-server/channelling_api_test.go index 9594aae7..0c1dff2a 100644 --- a/src/app/spreed-webrtc-server/channelling_api_test.go +++ b/src/app/spreed-webrtc-server/channelling_api_test.go @@ -52,6 +52,8 @@ type fakeRoomManager struct { joinedID string leftID string broadcasts []interface{} + updatedRoom *DataRoom + updateError error } func (fake *fakeRoomManager) CanJoinRoom(roomID string) bool { @@ -74,6 +76,33 @@ func (fake *fakeRoomManager) Broadcast(_ *Session, msg interface{}) { fake.broadcasts = append(fake.broadcasts, msg) } +func (fake *fakeRoomManager) UpdateRoom(_ *Session, _ *DataRoom) (*DataRoom, error) { + return fake.updatedRoom, fake.updateError +} + +func assertReply(t *testing.T, client *fakeClient, iid string) interface{} { + msg, ok := client.replies[iid] + if !ok { + t.Fatalf("No response received for Iid %v", iid) + } + return msg +} + +func assertErrorReply(t *testing.T, client *fakeClient, iid, code string) { + err, ok := assertReply(t, client, iid).(*DataError) + if !ok { + t.Fatalf("Expected response message to be an Error") + } + + if err.Type != "Error" { + t.Error("Message did not have the correct type") + } + + if err.Code != code { + t.Errorf("Expected error code to be %v, but was %v", code, err.Code) + } +} + func NewTestChannellingAPI() (ChannellingAPI, *fakeClient, *Session, *fakeRoomManager) { client, roomManager, session := &fakeClient{}, &fakeRoomManager{}, &Session{} return NewChannellingAPI(testAppVersion, nil, roomManager, nil, nil, nil, nil, nil, nil, roomManager, nil), client, session, roomManager @@ -176,21 +205,42 @@ func Test_ChannellingAPI_OnIncoming_HelloMessageWithAnIid_RespondsWithAnErrorIfT api.OnIncoming(client, session, &DataIncoming{Type: "Hello", Iid: iid, Hello: &DataHello{}}) - msg, ok := client.replies[iid] + assertErrorReply(t, client, iid, "default_room_disabled") +} + +func Test_ChannellingAPI_OnIncoming_RoomMessage_RespondsWithAndBroadcastsTheUpdatedRoom(t *testing.T) { + iid, roomName := "123", "foo" + api, client, session, roomManager := NewTestChannellingAPI() + roomManager.updatedRoom = &DataRoom{Name: "FOO"} + + api.OnIncoming(client, session, &DataIncoming{Type: "Hello", Iid: "0", Hello: &DataHello{Id: roomName}}) + api.OnIncoming(client, session, &DataIncoming{Type: "Room", Iid: iid, Room: &DataRoom{Name: roomName}}) + + room, ok := assertReply(t, client, iid).(*DataRoom) if !ok { - t.Fatalf("No response received for Iid %v", iid) + t.Fatalf("Expected response message to be a Room") } - err, ok := msg.(*DataError) - if !ok { - t.Fatalf("Expected response message %#v to be an Error", msg) + if room.Name != roomManager.updatedRoom.Name { + t.Errorf("Expected updated room with name %v, but got %#v", roomManager.updatedRoom, room) } - if err.Type != "Error" { - t.Error("Message did not have the correct type") + if broadcastCount := len(roomManager.broadcasts); broadcastCount != 2 { + t.Fatalf("Expected 1 broadcasts, but got %d", broadcastCount) } - if code := "default_room_disabled"; err.Code != code { - t.Errorf("Expected error code to be %v, but was %v", code, err.Code) + if _, ok := roomManager.broadcasts[1].(*DataRoom); !ok { + t.Fatal("Expected a room data broadcast") } } + +func Test_ChannellingAPI_OnIncoming_RoomMessage_RespondsWithAnErrorIfUpdatingTheRoomFails(t *testing.T) { + iid, roomName := "123", "foo" + api, client, session, roomManager := NewTestChannellingAPI() + roomManager.updateError = &DataError{Type: "Error", Code: "a_room_error", Message: ""} + + api.OnIncoming(client, session, &DataIncoming{Type: "Hello", Iid: "0", Hello: &DataHello{Id: roomName}}) + api.OnIncoming(client, session, &DataIncoming{Type: "Room", Iid: iid, Room: &DataRoom{Name: roomName}}) + + assertErrorReply(t, client, iid, "a_room_error") +} diff --git a/src/app/spreed-webrtc-server/room_manager.go b/src/app/spreed-webrtc-server/room_manager.go index 874c96c9..c57d8c75 100644 --- a/src/app/spreed-webrtc-server/room_manager.go +++ b/src/app/spreed-webrtc-server/room_manager.go @@ -31,6 +31,7 @@ type RoomStatusManager interface { RoomUsers(*Session) []*DataSession JoinRoom(*Session, Sender) LeaveRoom(*Session) + UpdateRoom(*Session, *DataRoom) (*DataRoom, error) } type Broadcaster interface { @@ -70,15 +71,27 @@ func (rooms *roomManager) CanJoinRoom(id string) bool { } func (rooms *roomManager) RoomUsers(session *Session) []*DataSession { - return <-rooms.getRoomWorker(session.Roomid).GetUsers() + return <-rooms.GetOrCreate(session).GetUsers() } func (rooms *roomManager) JoinRoom(session *Session, sender Sender) { - rooms.getRoomWorker(session.Roomid).Join(session, sender) + rooms.GetOrCreate(session).Join(session, sender) } func (rooms *roomManager) LeaveRoom(session *Session) { - rooms.getRoomWorker(session.Roomid).Leave(session) + if room, ok := rooms.Get(session); ok { + room.Leave(session) + } +} + +func (rooms *roomManager) UpdateRoom(session *Session, room *DataRoom) (*DataRoom, error) { + if !session.Hello || session.Roomid != room.Name { + return nil, &DataError{Type: "Error", Code: "not_in_room", Message: "Cannot update other rooms"} + } + // XXX(lcooper): We'll process and send documents without this field + // correctly, however clients cannot not handle it currently. + room.Type = "Room" + return room, nil } func (rooms *roomManager) Broadcast(session *Session, m interface{}) { @@ -101,8 +114,7 @@ func (rooms *roomManager) Broadcast(session *Session, m interface{}) { } rooms.RUnlock() } else { - room := rooms.getRoomWorker(id) - room.Broadcast(session, message) + rooms.GetOrCreate(session).Broadcast(session, message) } message.Decref() } @@ -121,12 +133,17 @@ func (rooms *roomManager) RoomInfo(includeSessions bool) (count int, sessionInfo return } -func (rooms *roomManager) getRoomWorker(id string) RoomWorker { - +func (rooms *roomManager) Get(session *Session) (room RoomWorker, ok bool) { rooms.RLock() - room, ok := rooms.roomTable[id] + room, ok = rooms.roomTable[session.Roomid] + rooms.RUnlock() + return +} + +func (rooms *roomManager) GetOrCreate(session *Session) RoomWorker { + room, ok := rooms.Get(session) if !ok { - rooms.RUnlock() + id := session.Roomid rooms.Lock() // Need to re-check, another thread might have created the room // while we waited for the lock. @@ -147,12 +164,9 @@ func (rooms *roomManager) getRoomWorker(id string) RoomWorker { } else { rooms.Unlock() } - } else { - rooms.RUnlock() } return room - } func (rooms *roomManager) GlobalUsers() []*roomUser { diff --git a/src/app/spreed-webrtc-server/room_manager_test.go b/src/app/spreed-webrtc-server/room_manager_test.go new file mode 100644 index 00000000..f7f2d3df --- /dev/null +++ b/src/app/spreed-webrtc-server/room_manager_test.go @@ -0,0 +1,69 @@ +/* + * Spreed WebRTC. + * Copyright (C) 2013-2014 struktur AG + * + * This file is part of Spreed WebRTC. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +package main + +import ( + "testing" +) + +func NewTestRoomManager() RoomManager { + return NewRoomManager(&Config{}, nil) +} + +func assertDataError(t *testing.T, err error, code string) { + dataError, ok := err.(*DataError) + if !ok { + t.Errorf("Expected error %#v to be a *DataError", err) + return + } + + if code != dataError.Code { + t.Errorf("Expected error code to be %v, but was %v", code, dataError.Code) + } +} + +func Test_RoomManager_UpdateRoom_ReturnsAnErrorIfNoRoomHasBeenJoined(t *testing.T) { + roomManager := NewTestRoomManager() + _, err := roomManager.UpdateRoom(&Session{}, nil) + + assertDataError(t, err, "not_in_room") +} + +func Test_RoomManager_UpdateRoom_ReturnsAnErrorIfUpdatingAnUnjoinedRoom(t *testing.T) { + roomManager := NewTestRoomManager() + session := &Session{Hello: true, Roomid: "foo"} + _, err := roomManager.UpdateRoom(session, &DataRoom{Name: "bar"}) + assertDataError(t, err, "not_in_room") +} + +func Test_RoomManager_UpdateRoom_ReturnsACorrectlyTypedDocument(t *testing.T) { + roomManager := NewTestRoomManager() + session := &Session{Hello: true, Roomid: "foo"} + room, err := roomManager.UpdateRoom(session, &DataRoom{Name: session.Roomid}) + if err != nil { + t.Fatalf("Unexpected error %v updating room", err) + } + + if room.Type != "Room" { + t.Errorf("Expected document type to be Room, but was %v", room.Type) + } +}