Browse Source

Merge pull request #271 from longsleep/go-subdir-refactor

Refactored structure of Go source code to module and binary
pull/238/merge
Simon Eisenmann 9 years ago
parent
commit
d1627c3a54
  1. 7
      .travis.yml
  2. 12
      Godeps
  3. 46
      Makefile.am
  4. 12
      dependencies.tsv
  5. 4
      go/buffercache/buffercache.go
  6. 28
      go/channelling/api.go
  7. 185
      go/channelling/api/api.go
  8. 92
      go/channelling/api/api_test.go
  9. 43
      go/channelling/api/handle_authentication.go
  10. 68
      go/channelling/api/handle_chat.go
  11. 43
      go/channelling/api/handle_conference.go
  12. 48
      go/channelling/api/handle_hello.go
  13. 35
      go/channelling/api/handle_room.go
  14. 53
      go/channelling/api/handle_self.go
  15. 60
      go/channelling/api/handle_sessions.go
  16. 36
      go/channelling/api/handle_users.go
  17. 4
      go/channelling/bus_manager.go
  18. 44
      go/channelling/client.go
  19. 26
      go/channelling/clientstats.go
  20. 20
      go/channelling/codec.go
  21. 2
      go/channelling/common_test.go
  22. 43
      go/channelling/config.go
  23. 27
      go/channelling/connection.go
  24. 4
      go/channelling/contact.go
  25. 27
      go/channelling/contact_manager.go
  26. 2
      go/channelling/context.go
  27. 2
      go/channelling/data.go
  28. 57
      go/channelling/hub.go
  29. 22
      go/channelling/imagecache.go
  30. 2
      go/channelling/nats.go
  31. 14
      go/channelling/room_manager.go
  32. 4
      go/channelling/room_manager_test.go
  33. 23
      go/channelling/roomworker.go
  34. 2
      go/channelling/roomworker_test.go
  35. 2
      go/channelling/server/api.go
  36. 63
      go/channelling/server/config.go
  37. 6
      go/channelling/server/rooms.go
  38. 19
      go/channelling/server/sessions.go
  39. 12
      go/channelling/server/stats.go
  40. 2
      go/channelling/server/tls.go
  41. 8
      go/channelling/server/tokens.go
  42. 21
      go/channelling/server/users.go
  43. 88
      go/channelling/session.go
  44. 5
      go/channelling/session_manager.go
  45. 60
      go/channelling/sessionattestation.go
  46. 26
      go/channelling/sessionstore.go
  47. 29
      go/channelling/sessiontoken.go
  48. 29
      go/channelling/sessionupdate.go
  49. 2
      go/channelling/stats_manager.go
  50. 8
      go/channelling/tickets.go
  51. 7
      go/channelling/tokenprovider.go
  52. 26
      go/channelling/turndata.go
  53. 29
      go/channelling/unicaster.go
  54. 12
      go/channelling/user.go
  55. 2
      go/randomstring/randomstring.go
  56. 348
      src/app/spreed-webrtc-server/channelling_api.go
  57. 56
      src/app/spreed-webrtc-server/handler_image.go
  58. 30
      src/app/spreed-webrtc-server/handler_main.go
  59. 91
      src/app/spreed-webrtc-server/handler_room.go
  60. 79
      src/app/spreed-webrtc-server/handler_sandbox.go
  61. 59
      src/app/spreed-webrtc-server/handler_wellknown.go
  62. 12
      src/app/spreed-webrtc-server/handler_ws.go
  63. 259
      src/app/spreed-webrtc-server/main.go
  64. 17
      src/app/spreed-webrtc-server/utils.go

7
.travis.yml

@ -10,8 +10,7 @@ go:
- tip - tip
env: env:
- GEM_HOME=/var/lib/gems/1.9.1 USE_GODEPS=0 - GEM_HOME=/var/lib/gems/1.9.1
- GEM_HOME=/var/lib/gems/1.9.1 USE_GODEPS=1
before_install: before_install:
- sudo add-apt-repository -y ppa:chris-lea/node.js - sudo add-apt-repository -y ppa:chris-lea/node.js
@ -22,19 +21,17 @@ install:
- sudo gem1.9.1 install compass - sudo gem1.9.1 install compass
- sudo gem1.9.1 install scss-lint - sudo gem1.9.1 install scss-lint
- npm install - npm install
- if [ "$USE_GODEPS" = "1" ]; then wget https://raw.githubusercontent.com/pote/gpm/v1.3.2/bin/gpm && chmod +x gpm && sudo mv gpm /usr/local/bin; fi
script: script:
- ./autogen.sh - ./autogen.sh
- ./configure - ./configure
- if [ "$USE_GODEPS" = "0" ]; then make get; fi
- if [ "$USE_GODEPS" = "1" ]; then make gpm; fi
- make styleshint - make styleshint
# TODO(fancycode): enable styleslint once all styles have been fixed # TODO(fancycode): enable styleslint once all styles have been fixed
# - make styleslint # - make styleslint
- make styles - make styles
- make jshint - make jshint
- make javascript - make javascript
- make goget
- make test - make test
- make binary - make binary
- make build-i18n - make build-i18n

12
Godeps

@ -1,12 +0,0 @@
github.com/gorilla/context 215affda49addc4c8ef7e2534915df2c8c35c6cd
github.com/gorilla/mux ba336c9cfb43552c90de6cb2ceedd3271c747558
github.com/gorilla/securecookie aeade84400a85c6875264ae51c7a56ecdcb61751
github.com/gorilla/websocket 6eb6ad425a89d9da7a5549bc6da8f79ba5c17844
github.com/longsleep/pkac 0.0.1
github.com/satori/go.uuid afe1e2ddf0f05b7c29d388a3f8e76cb15c2231ca
github.com/strukturag/goacceptlanguageparser goacceptlanguageparser_v100
github.com/strukturag/httputils httputils_v012
github.com/strukturag/phoenix phoenix_v100
github.com/strukturag/sloth v0.9.2
github.com/dlintw/goconf dcc070983490608a14480e3bf943bad464785df5
github.com/nats-io/nats v1.1.6

46
Makefile.am

@ -22,6 +22,8 @@ AUTOMAKE_OPTIONS = -Wno-portability
ACLOCAL_AMFLAGS = -I m4 ACLOCAL_AMFLAGS = -I m4
EXENAME := spreed-webrtc-server EXENAME := spreed-webrtc-server
GOPKG := github.com/strukturag/spreed-webrtc
GOPATH := "$(CURDIR)/vendor:$(CURDIR)"
CONFIG_FILE ?= spreed-webrtc-server.conf CONFIG_FILE ?= spreed-webrtc-server.conf
CONFIG_PATH ?= /etc CONFIG_PATH ?= /etc
@ -47,24 +49,26 @@ build: get binary assets
gopath: gopath:
@echo GOPATH=$(GOPATH) @echo GOPATH=$(GOPATH)
if READONLY_VENDOR_GOPATH goget:
export GOPATH = $(DIST):$(CURDIR) if [ -z "$(DEB_BUILDING)" ]; then GOPATH=$(GOPATH) go get github.com/rogpeppe/godeps; fi
get: $(DIST) if [ -z "$(DEB_BUILDING)" ]; then GOPATH=$(GOPATH) $(CURDIR)/vendor/bin/godeps -u dependencies.tsv; fi
ln -sf $(VENDOR_GOPATH)/src -t $(DIST) mkdir -p $(shell dirname "$(CURDIR)/vendor/src/$(GOPKG)")
else rm -f $(CURDIR)/vendor/src/$(GOPKG)
export GOPATH = $(VENDOR_GOPATH):$(CURDIR) ln -sfn $(PWD) $(CURDIR)/vendor/src/$(GOPKG)
get:
endif
$(GO) get app/...
getupdate: vendorclean get get: goget
gpm: gogetupdate: govendorclean goget
@if [ "$(GPM)" = "" ]; then echo "Command 'gpm' not found"; exit 1; fi
$(GPM) install dependencies.tsv:
set -e ;\
TMP=$$(mktemp -d) ;\
cp -r $(CURDIR)/vendor $$TMP ;\
GOPATH=$$TMP/vendor:$(CURDIR) $(CURDIR)/vendor/bin/godeps $(GOPKG)/src/app/spreed-webrtc-server ./go/... > $(CURDIR)/dependencies.tsv ;\
rm -rf $$TMP
binary: binary:
$(GO) build $(GOBUILDFLAGS) -o bin/$(EXENAME) -ldflags '$(INTERNALLDFLAGS)' app/$(EXENAME) GOPATH=$(GOPATH) $(GO) build $(GOBUILDFLAGS) -o bin/$(EXENAME) -ldflags '$(INTERNALLDFLAGS)' app/$(EXENAME)
binaryrace: GOBUILDFLAGS := $(GOBUILDFLAGS) -race binaryrace: GOBUILDFLAGS := $(GOBUILDFLAGS) -race
binaryrace: binary binaryrace: binary
@ -72,11 +76,13 @@ binaryrace: binary
binaryall: GOBUILDFLAGS := $(GOBUILDFLAGS) -a binaryall: GOBUILDFLAGS := $(GOBUILDFLAGS) -a
binaryall: binary binaryall: binary
fmt: gofmt:
$(GO) fmt app/... GOPATH=$(GOPATH) $(GO) fmt app/... ./go/...
fmt: gofmt
test: get test:
$(GO) test $(GOTESTFLAGS) app/... GOPATH=$(GOPATH) $(GO) test -v $(GOTESTFLAGS) app/... ./go/...
assets: javascript fonts assets: javascript fonts
@ -166,7 +172,7 @@ clean:
distclean: clean distclean: clean
rm -rf $(DIST) rm -rf $(DIST)
vendorclean: govendorclean:
rm -rf vendor/* rm -rf vendor/*
pristine: distclean vendorclean pristine: distclean vendorclean
@ -187,4 +193,4 @@ tarball: distclean release install
cp server.conf.in $(TARPATH)/loader cp server.conf.in $(TARPATH)/loader
tar czf $(DIST)/$(PACKAGE_NAME)-$(PACKAGE_VERSION)_$(BUILD_OS)_$(BUILD_ARCH).tar.gz -C $(DIST) $(PACKAGE_NAME)-$(PACKAGE_VERSION) tar czf $(DIST)/$(PACKAGE_NAME)-$(PACKAGE_VERSION)_$(BUILD_OS)_$(BUILD_ARCH).tar.gz -C $(DIST) $(PACKAGE_NAME)-$(PACKAGE_VERSION)
.PHONY: clean distclean vendorclean pristine get getupdate build javascript fonts styles release releasetest dist_gopath install install-binary install-assets gopath binary binaryrace binaryall tarball assets .PHONY: clean distclean govendorclean pristine goget gogetupdate build javascript fonts styles release releasetest dist_gopath install install-binary install-assets gopath binary binaryrace binaryall tarball assets dependencies.tsv

12
dependencies.tsv

@ -0,0 +1,12 @@
github.com/dlintw/goconf git dcc070983490608a14480e3bf943bad464785df5 2012-02-28T08:26:10Z
github.com/gorilla/context git 215affda49addc4c8ef7e2534915df2c8c35c6cd 2014-12-17T16:02:51Z
github.com/gorilla/mux git ba336c9cfb43552c90de6cb2ceedd3271c747558 2015-07-17T15:03:03Z
github.com/gorilla/securecookie git aeade84400a85c6875264ae51c7a56ecdcb61751 2015-07-16T23:32:44Z
github.com/gorilla/websocket git 6eb6ad425a89d9da7a5549bc6da8f79ba5c17844 2015-07-14T14:06:27Z
github.com/longsleep/pkac git 68bf8859f58dd84332ee41c07eba357fb3818ba3 2014-05-01T18:13:13Z
github.com/nats-io/nats git 355b5b97e0842dc94f1106729aa88e33e06317ca 2015-12-09T21:13:14Z
github.com/satori/go.uuid git afe1e2ddf0f05b7c29d388a3f8e76cb15c2231ca 2015-06-15T02:45:37Z
github.com/strukturag/goacceptlanguageparser git 68066e68c2940059aadc6e19661610cf428b6647 2014-02-13T13:31:23Z
github.com/strukturag/httputils git afbf05c71ac03ee7989c96d033a9571ba4ded468 2014-07-02T01:35:33Z
github.com/strukturag/phoenix git c3429c4e93588d848606263a7f96f91c90e43178 2016-03-02T12:52:52Z
github.com/strukturag/sloth git 74a8bcf67368de59baafe5d3e17aee9875564cfc 2015-04-22T08:59:42Z
1 github.com/dlintw/goconf git dcc070983490608a14480e3bf943bad464785df5 2012-02-28T08:26:10Z
2 github.com/gorilla/context git 215affda49addc4c8ef7e2534915df2c8c35c6cd 2014-12-17T16:02:51Z
3 github.com/gorilla/mux git ba336c9cfb43552c90de6cb2ceedd3271c747558 2015-07-17T15:03:03Z
4 github.com/gorilla/securecookie git aeade84400a85c6875264ae51c7a56ecdcb61751 2015-07-16T23:32:44Z
5 github.com/gorilla/websocket git 6eb6ad425a89d9da7a5549bc6da8f79ba5c17844 2015-07-14T14:06:27Z
6 github.com/longsleep/pkac git 68bf8859f58dd84332ee41c07eba357fb3818ba3 2014-05-01T18:13:13Z
7 github.com/nats-io/nats git 355b5b97e0842dc94f1106729aa88e33e06317ca 2015-12-09T21:13:14Z
8 github.com/satori/go.uuid git afe1e2ddf0f05b7c29d388a3f8e76cb15c2231ca 2015-06-15T02:45:37Z
9 github.com/strukturag/goacceptlanguageparser git 68066e68c2940059aadc6e19661610cf428b6647 2014-02-13T13:31:23Z
10 github.com/strukturag/httputils git afbf05c71ac03ee7989c96d033a9571ba4ded468 2014-07-02T01:35:33Z
11 github.com/strukturag/phoenix git c3429c4e93588d848606263a7f96f91c90e43178 2016-03-02T12:52:52Z
12 github.com/strukturag/sloth git 74a8bcf67368de59baafe5d3e17aee9875564cfc 2015-04-22T08:59:42Z

4
src/app/spreed-webrtc-server/buffercache.go → go/buffercache/buffercache.go vendored

@ -19,7 +19,7 @@
* *
*/ */
package main package buffercache
import ( import (
"bytes" "bytes"
@ -161,7 +161,7 @@ func (cache *bufferCache) Wrap(data []byte) Buffer {
return &directBuffer{refcnt: 1, cache: cache, buf: bytes.NewBuffer(data)} return &directBuffer{refcnt: 1, cache: cache, buf: bytes.NewBuffer(data)}
} }
func readAll(dest Buffer, r io.Reader) error { func ReadAll(dest Buffer, r io.Reader) error {
var err error var err error
defer func() { defer func() {
e := recover() e := recover()

28
go/channelling/api.go

@ -0,0 +1,28 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
type ChannellingAPI interface {
OnConnect(*Client, *Session) (interface{}, error)
OnDisconnect(*Client, *Session)
OnIncoming(Sender, *Session, *DataIncoming) (interface{}, error)
}

185
go/channelling/api/api.go

@ -0,0 +1,185 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package api
import (
"log"
"github.com/strukturag/spreed-webrtc/go/channelling"
)
const (
maxConferenceSize = 100
apiVersion = 1.4 // Keep this in sync with CHANNELING-API docs.Hand
)
type channellingAPI struct {
RoomStatusManager channelling.RoomStatusManager
SessionEncoder channelling.SessionEncoder
SessionManager channelling.SessionManager
StatsCounter channelling.StatsCounter
ContactManager channelling.ContactManager
TurnDataCreator channelling.TurnDataCreator
Unicaster channelling.Unicaster
BusManager channelling.BusManager
config *channelling.Config
}
// New creates and initializes a new ChannellingAPI using
// various other services for initialization. It is intended to handle
// incoming and outgoing channeling API events from clients.
func New(config *channelling.Config,
roomStatus channelling.RoomStatusManager,
sessionEncoder channelling.SessionEncoder,
sessionManager channelling.SessionManager,
statsCounter channelling.StatsCounter,
contactManager channelling.ContactManager,
turnDataCreator channelling.TurnDataCreator,
unicaster channelling.Unicaster,
busManager channelling.BusManager) channelling.ChannellingAPI {
return &channellingAPI{
roomStatus,
sessionEncoder,
sessionManager,
statsCounter,
contactManager,
turnDataCreator,
unicaster,
busManager,
config,
}
}
func (api *channellingAPI) OnConnect(client *channelling.Client, session *channelling.Session) (interface{}, error) {
api.Unicaster.OnConnect(client, session)
self, err := api.HandleSelf(session)
if err == nil {
api.BusManager.Trigger(channelling.BusManagerConnect, session.Id, "", nil)
}
return self, err
}
func (api *channellingAPI) OnDisconnect(client *channelling.Client, session *channelling.Session) {
api.Unicaster.OnDisconnect(client, session)
api.BusManager.Trigger(channelling.BusManagerDisconnect, session.Id, "", nil)
}
func (api *channellingAPI) OnIncoming(sender channelling.Sender, session *channelling.Session, msg *channelling.DataIncoming) (interface{}, error) {
switch msg.Type {
case "Self":
return api.HandleSelf(session)
case "Hello":
if msg.Hello == nil {
return nil, channelling.NewDataError("bad_request", "message did not contain Hello")
}
return api.HandleHello(session, msg.Hello, sender)
case "Offer":
if msg.Offer == nil || msg.Offer.Offer == nil {
log.Println("Received invalid offer message.", msg)
break
}
if _, ok := msg.Offer.Offer["_token"]; !ok {
// Trigger offer event when offer has no token, so this is
// not triggered for peerxfer and peerscreenshare offers.
api.BusManager.Trigger(channelling.BusManagerOffer, session.Id, msg.Offer.To, nil)
}
session.Unicast(msg.Offer.To, msg.Offer)
case "Candidate":
if msg.Candidate == nil || msg.Candidate.Candidate == nil {
log.Println("Received invalid candidate message.", msg)
break
}
session.Unicast(msg.Candidate.To, msg.Candidate)
case "Answer":
if msg.Answer == nil || msg.Answer.Answer == nil {
log.Println("Received invalid answer message.", msg)
break
}
if _, ok := msg.Answer.Answer["_token"]; !ok {
// Trigger answer event when answer has no token. so this is
// not triggered for peerxfer and peerscreenshare answers.
api.BusManager.Trigger(channelling.BusManagerAnswer, session.Id, msg.Answer.To, nil)
}
session.Unicast(msg.Answer.To, msg.Answer)
case "Users":
return api.HandleUsers(session)
case "Authentication":
if msg.Authentication == nil || msg.Authentication.Authentication == nil {
return nil, channelling.NewDataError("bad_request", "message did not contain Authentication")
}
return api.HandleAuthentication(session, msg.Authentication.Authentication)
case "Bye":
if msg.Bye == nil {
log.Println("Received invalid bye message.", msg)
break
}
api.BusManager.Trigger(channelling.BusManagerBye, session.Id, msg.Bye.To, nil)
session.Unicast(msg.Bye.To, msg.Bye)
case "Status":
if msg.Status == nil {
log.Println("Received invalid status message.", msg)
break
}
//log.Println("Status", msg.Status)
session.Update(&channelling.SessionUpdate{Types: []string{"Status"}, Status: msg.Status.Status})
session.BroadcastStatus()
case "Chat":
if msg.Chat == nil || msg.Chat.Chat == nil {
log.Println("Received invalid chat message.", msg)
break
}
api.HandleChat(session, msg.Chat)
case "Conference":
if msg.Conference == nil {
log.Println("Received invalid conference message.", msg)
break
}
api.HandleConference(session, msg.Conference)
case "Alive":
return msg.Alive, nil
case "Sessions":
if msg.Sessions == nil || msg.Sessions.Sessions == nil {
return nil, channelling.NewDataError("bad_request", "message did not contain Sessions")
}
return api.HandleSessions(session, msg.Sessions.Sessions)
case "Room":
if msg.Room == nil {
return nil, channelling.NewDataError("bad_request", "message did not contain Room")
}
return api.HandleRoom(session, msg.Room)
default:
log.Println("OnText unhandled message type", msg.Type)
}
return nil, nil
}

92
src/app/spreed-webrtc-server/channelling_api_test.go → go/channelling/api/api_test.go

@ -19,50 +19,55 @@
* *
*/ */
package main package api
import ( import (
"errors" "errors"
"fmt" "fmt"
"testing" "testing"
"github.com/gorilla/securecookie"
"github.com/strukturag/spreed-webrtc/go/buffercache"
"github.com/strukturag/spreed-webrtc/go/channelling"
) )
type fakeClient struct { type fakeClient struct {
} }
func (fake *fakeClient) Send(_ Buffer) { func (fake *fakeClient) Send(_ buffercache.Buffer) {
} }
type fakeRoomManager struct { type fakeRoomManager struct {
joinedRoomID string joinedRoomID string
leftRoomID string leftRoomID string
roomUsers []*DataSession roomUsers []*channelling.DataSession
joinedID string joinedID string
joinError error joinError error
leftID string leftID string
broadcasts []interface{} broadcasts []interface{}
updatedRoom *DataRoom updatedRoom *channelling.DataRoom
updateError error updateError error
} }
func (fake *fakeRoomManager) RoomUsers(session *Session) []*DataSession { func (fake *fakeRoomManager) RoomUsers(session *channelling.Session) []*channelling.DataSession {
return fake.roomUsers return fake.roomUsers
} }
func (fake *fakeRoomManager) JoinRoom(id, roomName, roomType string, _ *DataRoomCredentials, session *Session, sessionAuthenticated bool, _ Sender) (*DataRoom, error) { func (fake *fakeRoomManager) JoinRoom(id, roomName, roomType string, _ *channelling.DataRoomCredentials, session *channelling.Session, sessionAuthenticated bool, _ channelling.Sender) (*channelling.DataRoom, error) {
fake.joinedID = id fake.joinedID = id
return &DataRoom{Name: roomName, Type: roomType}, fake.joinError return &channelling.DataRoom{Name: roomName, Type: roomType}, fake.joinError
} }
func (fake *fakeRoomManager) LeaveRoom(roomID, sessionID string) { func (fake *fakeRoomManager) LeaveRoom(roomID, sessionID string) {
fake.leftID = roomID fake.leftID = roomID
} }
func (fake *fakeRoomManager) Broadcast(_, _ string, outgoing *DataOutgoing) { func (fake *fakeRoomManager) Broadcast(_, _ string, outgoing *channelling.DataOutgoing) {
fake.broadcasts = append(fake.broadcasts, outgoing.Data) fake.broadcasts = append(fake.broadcasts, outgoing.Data)
} }
func (fake *fakeRoomManager) UpdateRoom(_ *Session, _ *DataRoom) (*DataRoom, error) { func (fake *fakeRoomManager) UpdateRoom(_ *channelling.Session, _ *channelling.DataRoom) (*channelling.DataRoom, error) {
return fake.updatedRoom, fake.updateError return fake.updatedRoom, fake.updateError
} }
@ -73,23 +78,19 @@ func (fake *fakeRoomManager) MakeRoomID(roomName, roomType string) string {
return fmt.Sprintf("%s:%s", roomType, roomName) return fmt.Sprintf("%s:%s", roomType, roomName)
} }
func NewTestChannellingAPI() (ChannellingAPI, *fakeClient, *Session, *fakeRoomManager) { func NewTestChannellingAPI() (channelling.ChannellingAPI, *fakeClient, *channelling.Session, *fakeRoomManager) {
client, roomManager := &fakeClient{}, &fakeRoomManager{} client, roomManager := &fakeClient{}, &fakeRoomManager{}
session := &Session{ sessionNonces := securecookie.New(securecookie.GenerateRandomKey(64), nil)
attestations: sessionNonces, session := channelling.NewSession(nil, nil, roomManager, roomManager, nil, sessionNonces, "", "")
Broadcaster: roomManager, busManager := channelling.NewBusManager("", false, "")
RoomStatusManager: roomManager, return New(nil, roomManager, nil, nil, nil, nil, nil, nil, busManager), client, session, roomManager
}
busManager := NewBusManager("", false, "")
session.attestation = &SessionAttestation{s: session}
return NewChannellingAPI(nil, roomManager, nil, nil, nil, nil, nil, nil, busManager), client, session, roomManager
} }
func Test_ChannellingAPI_OnIncoming_HelloMessage_JoinsTheSelectedRoom(t *testing.T) { func Test_ChannellingAPI_OnIncoming_HelloMessage_JoinsTheSelectedRoom(t *testing.T) {
roomID, roomName, ua := "Room:foobar", "foobar", "unit tests" roomID, roomName, ua := "Room:foobar", "foobar", "unit tests"
api, client, session, roomManager := NewTestChannellingAPI() api, client, session, roomManager := NewTestChannellingAPI()
api.OnIncoming(client, session, &DataIncoming{Type: "Hello", Hello: &DataHello{Id: roomName, Ua: ua}}) api.OnIncoming(client, session, &channelling.DataIncoming{Type: "Hello", Hello: &channelling.DataHello{Id: roomName, Ua: ua}})
if roomManager.joinedID != roomID { if roomManager.joinedID != roomID {
t.Errorf("Expected to have joined room %v, but got %v", roomID, roomManager.joinedID) t.Errorf("Expected to have joined room %v, but got %v", roomID, roomManager.joinedID)
@ -99,7 +100,7 @@ func Test_ChannellingAPI_OnIncoming_HelloMessage_JoinsTheSelectedRoom(t *testing
t.Fatalf("Expected 1 broadcast, but got %d", broadcastCount) t.Fatalf("Expected 1 broadcast, but got %d", broadcastCount)
} }
dataSession, ok := roomManager.broadcasts[0].(*DataSession) dataSession, ok := roomManager.broadcasts[0].(*channelling.DataSession)
if !ok { if !ok {
t.Fatal("Expected a session data broadcast") t.Fatal("Expected a session data broadcast")
} }
@ -113,8 +114,8 @@ func Test_ChannellingAPI_OnIncoming_HelloMessage_LeavesAnyPreviouslyJoinedRooms(
roomID, roomName := "Room:foobar", "foobar" roomID, roomName := "Room:foobar", "foobar"
api, client, session, roomManager := NewTestChannellingAPI() api, client, session, roomManager := NewTestChannellingAPI()
api.OnIncoming(client, session, &DataIncoming{Type: "Hello", Hello: &DataHello{Id: roomName}}) api.OnIncoming(client, session, &channelling.DataIncoming{Type: "Hello", Hello: &channelling.DataHello{Id: roomName}})
api.OnIncoming(client, session, &DataIncoming{Type: "Hello", Hello: &DataHello{Id: "baz"}}) api.OnIncoming(client, session, &channelling.DataIncoming{Type: "Hello", Hello: &channelling.DataHello{Id: "baz"}})
if roomManager.leftID != roomID { if roomManager.leftID != roomID {
t.Errorf("Expected to have left room %v, but got %v", roomID, roomManager.leftID) t.Errorf("Expected to have left room %v, but got %v", roomID, roomManager.leftID)
@ -124,7 +125,7 @@ func Test_ChannellingAPI_OnIncoming_HelloMessage_LeavesAnyPreviouslyJoinedRooms(
t.Fatalf("Expected 3 broadcasts, but got %d", broadcastCount) t.Fatalf("Expected 3 broadcasts, but got %d", broadcastCount)
} }
dataSession, ok := roomManager.broadcasts[1].(*DataSession) dataSession, ok := roomManager.broadcasts[1].(*channelling.DataSession)
if !ok { if !ok {
t.Fatal("Expected a session data broadcast") t.Fatal("Expected a session data broadcast")
} }
@ -138,7 +139,7 @@ func Test_ChannellingAPI_OnIncoming_HelloMessage_DoesNotJoinIfNotPermitted(t *te
api, client, session, roomManager := NewTestChannellingAPI() api, client, session, roomManager := NewTestChannellingAPI()
roomManager.joinError = errors.New("Can't enter this room") roomManager.joinError = errors.New("Can't enter this room")
api.OnIncoming(client, session, &DataIncoming{Type: "Hello", Hello: &DataHello{}}) api.OnIncoming(client, session, &channelling.DataIncoming{Type: "Hello", Hello: &channelling.DataHello{}})
if broadcastCount := len(roomManager.broadcasts); broadcastCount != 0 { if broadcastCount := len(roomManager.broadcasts); broadcastCount != 0 {
t.Fatalf("Expected no broadcasts, but got %d", broadcastCount) t.Fatalf("Expected no broadcasts, but got %d", broadcastCount)
@ -148,14 +149,14 @@ func Test_ChannellingAPI_OnIncoming_HelloMessage_DoesNotJoinIfNotPermitted(t *te
func Test_ChannellingAPI_OnIncoming_HelloMessage_RespondsWithAWelcome(t *testing.T) { func Test_ChannellingAPI_OnIncoming_HelloMessage_RespondsWithAWelcome(t *testing.T) {
roomID := "a-room" roomID := "a-room"
api, client, session, roomManager := NewTestChannellingAPI() api, client, session, roomManager := NewTestChannellingAPI()
roomManager.roomUsers = []*DataSession{&DataSession{}} roomManager.roomUsers = []*channelling.DataSession{&channelling.DataSession{}}
reply, err := api.OnIncoming(client, session, &DataIncoming{Type: "Hello", Hello: &DataHello{Id: roomID}}) reply, err := api.OnIncoming(client, session, &channelling.DataIncoming{Type: "Hello", Hello: &channelling.DataHello{Id: roomID}})
if err != nil { if err != nil {
t.Fatalf("Unexpected error %v", err) t.Fatalf("Unexpected error %v", err)
} }
welcome, ok := reply.(*DataWelcome) welcome, ok := reply.(*channelling.DataWelcome)
if !ok { if !ok {
t.Fatalf("Expected response %#v to be a Welcome", reply) t.Fatalf("Expected response %#v to be a Welcome", reply)
} }
@ -175,9 +176,9 @@ func Test_ChannellingAPI_OnIncoming_HelloMessage_RespondsWithAWelcome(t *testing
func Test_ChannellingAPI_OnIncoming_HelloMessage_RespondsWithAnErrorIfTheRoomCannotBeJoined(t *testing.T) { func Test_ChannellingAPI_OnIncoming_HelloMessage_RespondsWithAnErrorIfTheRoomCannotBeJoined(t *testing.T) {
api, client, session, roomManager := NewTestChannellingAPI() api, client, session, roomManager := NewTestChannellingAPI()
roomManager.joinError = NewDataError("bad_join", "") roomManager.joinError = channelling.NewDataError("bad_join", "")
_, err := api.OnIncoming(client, session, &DataIncoming{Type: "Hello", Hello: &DataHello{}}) _, err := api.OnIncoming(client, session, &channelling.DataIncoming{Type: "Hello", Hello: &channelling.DataHello{}})
assertDataError(t, err, "bad_join") assertDataError(t, err, "bad_join")
} }
@ -185,19 +186,19 @@ func Test_ChannellingAPI_OnIncoming_HelloMessage_RespondsWithAnErrorIfTheRoomCan
func Test_ChannellingAPI_OnIncoming_RoomMessage_RespondsWithAndBroadcastsTheUpdatedRoom(t *testing.T) { func Test_ChannellingAPI_OnIncoming_RoomMessage_RespondsWithAndBroadcastsTheUpdatedRoom(t *testing.T) {
roomName := "foo" roomName := "foo"
api, client, session, roomManager := NewTestChannellingAPI() api, client, session, roomManager := NewTestChannellingAPI()
roomManager.updatedRoom = &DataRoom{Name: "FOO"} roomManager.updatedRoom = &channelling.DataRoom{Name: "FOO"}
_, err := api.OnIncoming(client, session, &DataIncoming{Type: "Hello", Hello: &DataHello{Id: roomName}}) _, err := api.OnIncoming(client, session, &channelling.DataIncoming{Type: "Hello", Hello: &channelling.DataHello{Id: roomName}})
if err != nil { if err != nil {
t.Fatalf("Unexpected error %v", err) t.Fatalf("Unexpected error %v", err)
} }
reply, err := api.OnIncoming(client, session, &DataIncoming{Type: "Room", Room: &DataRoom{Name: roomName}}) reply, err := api.OnIncoming(client, session, &channelling.DataIncoming{Type: "Room", Room: &channelling.DataRoom{Name: roomName}})
if err != nil { if err != nil {
t.Fatalf("Unexpected error %v", err) t.Fatalf("Unexpected error %v", err)
} }
room, ok := reply.(*DataRoom) room, ok := reply.(*channelling.DataRoom)
if !ok { if !ok {
t.Fatalf("Expected response message to be a Room") t.Fatalf("Expected response message to be a Room")
} }
@ -210,7 +211,7 @@ func Test_ChannellingAPI_OnIncoming_RoomMessage_RespondsWithAndBroadcastsTheUpda
t.Fatalf("Expected 1 broadcasts, but got %d", broadcastCount) t.Fatalf("Expected 1 broadcasts, but got %d", broadcastCount)
} }
if _, ok := roomManager.broadcasts[1].(*DataRoom); !ok { if _, ok := roomManager.broadcasts[1].(*channelling.DataRoom); !ok {
t.Fatal("Expected a room data broadcast") t.Fatal("Expected a room data broadcast")
} }
} }
@ -218,13 +219,30 @@ func Test_ChannellingAPI_OnIncoming_RoomMessage_RespondsWithAndBroadcastsTheUpda
func Test_ChannellingAPI_OnIncoming_RoomMessage_RespondsWithAnErrorIfUpdatingTheRoomFails(t *testing.T) { func Test_ChannellingAPI_OnIncoming_RoomMessage_RespondsWithAnErrorIfUpdatingTheRoomFails(t *testing.T) {
roomName := "foo" roomName := "foo"
api, client, session, roomManager := NewTestChannellingAPI() api, client, session, roomManager := NewTestChannellingAPI()
roomManager.updateError = NewDataError("a_room_error", "") roomManager.updateError = channelling.NewDataError("a_room_error", "")
_, err := api.OnIncoming(client, session, &DataIncoming{Type: "Hello", Hello: &DataHello{Id: roomName}}) _, err := api.OnIncoming(client, session, &channelling.DataIncoming{Type: "Hello", Hello: &channelling.DataHello{Id: roomName}})
if err != nil { if err != nil {
t.Fatalf("Unexpected error %v", err) t.Fatalf("Unexpected error %v", err)
} }
_, err = api.OnIncoming(client, session, &DataIncoming{Type: "Room", Room: &DataRoom{Name: roomName}}) _, err = api.OnIncoming(client, session, &channelling.DataIncoming{Type: "Room", Room: &channelling.DataRoom{Name: roomName}})
assertDataError(t, err, "a_room_error") assertDataError(t, err, "a_room_error")
} }
func assertDataError(t *testing.T, err error, code string) {
if err == nil {
t.Error("Expected an error, but none was returned")
return
}
dataError, ok := err.(*channelling.DataError)
if !ok {
t.Errorf("Expected error %#v to be a *DataError", err)
return
}
if code != dataError.Code {
t.Errorf("Expected error code to be %v, but was %v", code, dataError.Code)
}
}

43
go/channelling/api/handle_authentication.go

@ -0,0 +1,43 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package api
import (
"log"
"github.com/strukturag/spreed-webrtc/go/channelling"
)
func (api *channellingAPI) HandleAuthentication(session *channelling.Session, st *channelling.SessionToken) (*channelling.DataSelf, error) {
if err := api.SessionManager.Authenticate(session, st, ""); err != nil {
log.Println("Authentication failed", err, st.Userid, st.Nonce)
return nil, err
}
log.Println("Authentication success", session.Userid())
self, err := api.HandleSelf(session)
if err == nil {
session.BroadcastStatus()
}
return self, err
}

68
go/channelling/api/handle_chat.go

@ -0,0 +1,68 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package api
import (
"log"
"time"
"github.com/strukturag/spreed-webrtc/go/channelling"
)
func (api *channellingAPI) HandleChat(session *channelling.Session, chat *channelling.DataChat) {
// TODO(longsleep): Limit sent chat messages per incoming connection.
msg := chat.Chat
to := chat.To
if !msg.NoEcho {
session.Unicast(session.Id, chat)
}
msg.Time = time.Now().Format(time.RFC3339)
if to == "" {
// TODO(longsleep): Check if chat broadcast is allowed.
if session.Hello {
api.StatsCounter.CountBroadcastChat()
session.Broadcast(chat)
}
} else {
if msg.Status != nil {
if msg.Status.ContactRequest != nil {
if !api.config.WithModule("contacts") {
return
}
if err := api.ContactManager.ContactrequestHandler(session, to, msg.Status.ContactRequest); err != nil {
log.Println("Ignoring invalid contact request.", err)
return
}
msg.Status.ContactRequest.Userid = session.Userid()
}
} else {
api.StatsCounter.CountUnicastChat()
}
session.Unicast(to, chat)
if msg.Mid != "" {
// Send out delivery confirmation status chat message.
session.Unicast(session.Id, &channelling.DataChat{To: to, Type: "Chat", Chat: &channelling.DataChatMessage{Mid: msg.Mid, Status: &channelling.DataChatStatus{State: "sent"}}})
}
}
}

43
go/channelling/api/handle_conference.go

@ -0,0 +1,43 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package api
import (
"log"
"github.com/strukturag/spreed-webrtc/go/channelling"
)
func (api *channellingAPI) HandleConference(session *channelling.Session, conference *channelling.DataConference) {
// Check conference maximum size.
if len(conference.Conference) > maxConferenceSize {
log.Println("Refusing to create conference above limit.", len(conference.Conference))
return
}
// Send conference update to anyone.
for _, id := range conference.Conference {
if id != session.Id {
session.Unicast(id, conference)
}
}
}

48
go/channelling/api/handle_hello.go

@ -0,0 +1,48 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package api
import (
"github.com/strukturag/spreed-webrtc/go/channelling"
)
func (api *channellingAPI) HandleHello(session *channelling.Session, hello *channelling.DataHello, sender channelling.Sender) (*channelling.DataWelcome, error) {
// TODO(longsleep): Filter room id and user agent.
session.Update(&channelling.SessionUpdate{Types: []string{"Ua"}, Ua: hello.Ua})
// Compatibily for old clients.
roomName := hello.Name
if roomName == "" {
roomName = hello.Id
}
room, err := session.JoinRoom(roomName, hello.Type, hello.Credentials, sender)
if err != nil {
return nil, err
}
return &channelling.DataWelcome{
Type: "Welcome",
Room: room,
Users: api.RoomStatusManager.RoomUsers(session),
}, nil
}

35
go/channelling/api/handle_room.go

@ -0,0 +1,35 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package api
import (
"github.com/strukturag/spreed-webrtc/go/channelling"
)
func (api *channellingAPI) HandleRoom(session *channelling.Session, room *channelling.DataRoom) (*channelling.DataRoom, error) {
room, err := api.RoomStatusManager.UpdateRoom(session, room)
if err == nil {
session.Broadcast(room)
}
return room, err
}

53
go/channelling/api/handle_self.go

@ -0,0 +1,53 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package api
import (
"log"
"github.com/strukturag/spreed-webrtc/go/channelling"
)
func (api *channellingAPI) HandleSelf(session *channelling.Session) (*channelling.DataSelf, error) {
token, err := api.SessionEncoder.EncodeSessionToken(session)
if err != nil {
log.Println("Error in OnRegister", err)
return nil, err
}
log.Println("Created new session token", len(token), token)
self := &channelling.DataSelf{
Type: "Self",
Id: session.Id,
Sid: session.Sid,
Userid: session.Userid(),
Suserid: api.SessionEncoder.EncodeSessionUserID(session),
Token: token,
Version: api.config.Version,
ApiVersion: apiVersion,
Turn: api.TurnDataCreator.CreateTurnData(session),
Stun: api.config.StunURIs,
}
api.BusManager.Trigger(channelling.BusManagerSession, session.Id, session.Userid(), nil)
return self, nil
}

60
go/channelling/api/handle_sessions.go

@ -0,0 +1,60 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package api
import (
"github.com/strukturag/spreed-webrtc/go/channelling"
)
func (api *channellingAPI) HandleSessions(session *channelling.Session, sessions *channelling.DataSessionsRequest) (*channelling.DataSessions, error) {
switch sessions.Type {
case "contact":
if !api.config.WithModule("contacts") {
return nil, channelling.NewDataError("contacts_not_enabled", "incoming contacts session request with contacts disabled")
}
userID, err := api.ContactManager.GetContactID(session, sessions.Token)
if err != nil {
return nil, err
}
return &channelling.DataSessions{
Type: "Sessions",
Users: api.SessionManager.GetUserSessions(session, userID),
Sessions: sessions,
}, nil
case "session":
id, err := session.DecodeAttestation(sessions.Token)
if err != nil {
return nil, channelling.NewDataError("bad_attestation", err.Error())
}
session, ok := api.Unicaster.GetSession(id)
if !ok {
return nil, channelling.NewDataError("no_such_session", "cannot retrieve session")
}
return &channelling.DataSessions{
Type: "Sessions",
Users: []*channelling.DataSession{session.Data()},
Sessions: sessions,
}, nil
default:
return nil, channelling.NewDataError("bad_request", "unknown sessions request type")
}
}

36
go/channelling/api/handle_users.go

@ -0,0 +1,36 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package api
import (
"github.com/strukturag/spreed-webrtc/go/channelling"
)
func (api *channellingAPI) HandleUsers(session *channelling.Session) (sessions *channelling.DataSessions, err error) {
if session.Hello {
sessions = &channelling.DataSessions{Type: "Users", Users: api.RoomStatusManager.RoomUsers(session)}
} else {
err = channelling.NewDataError("not_in_room", "Cannot list users without a current room")
}
return
}

4
src/app/spreed-webrtc-server/bus_manager.go → go/channelling/bus_manager.go

@ -19,7 +19,7 @@
* *
*/ */
package main package channelling
import ( import (
"errors" "errors"
@ -114,6 +114,7 @@ func newNatsBus(id, prefix string) (*natsBus, error) {
triggerQueue := make(chan *busQueueEntry, 50) triggerQueue := make(chan *busQueueEntry, 50)
// Start go routine to process outbount NATS publishing. // Start go routine to process outbount NATS publishing.
go chPublish(ec, triggerQueue) go chPublish(ec, triggerQueue)
return &natsBus{id, prefix, ec, triggerQueue}, nil return &natsBus{id, prefix, ec, triggerQueue}, nil
} }
@ -135,6 +136,7 @@ func (bus *natsBus) Trigger(name, from, payload string, data interface{}) (err e
err = errors.New("NATS trigger queue full") err = errors.New("NATS trigger queue full")
} }
} }
return err return err
} }

44
src/app/spreed-webrtc-server/client.go → go/channelling/client.go

@ -19,36 +19,34 @@
* *
*/ */
package main package channelling
import ( import (
"log" "log"
"github.com/strukturag/spreed-webrtc/go/buffercache"
) )
type Sender interface { type Sender interface {
Send(Buffer) Send(buffercache.Buffer)
}
type Client interface {
Sender
Session() *Session
Index() uint64
Close()
ReplaceAndClose(Client)
} }
type client struct { type Client struct {
Codec Codec
ChannellingAPI ChannellingAPI
Connection Connection
session *Session session *Session
} }
func NewClient(codec Codec, api ChannellingAPI, session *Session) *client { func NewClient(codec Codec, api ChannellingAPI, session *Session) *Client {
return &client{codec, api, nil, session} return &Client{
Codec: codec,
ChannellingAPI: api,
session: session,
}
} }
func (client *client) OnConnect(conn Connection) { func (client *Client) OnConnect(conn Connection) {
client.Connection = conn client.Connection = conn
if reply, err := client.ChannellingAPI.OnConnect(client, client.session); err == nil { if reply, err := client.ChannellingAPI.OnConnect(client, client.session); err == nil {
client.reply("", reply) client.reply("", reply)
@ -57,38 +55,38 @@ func (client *client) OnConnect(conn Connection) {
} }
} }
func (client *client) OnDisconnect() { func (client *Client) OnDisconnect() {
client.session.Close() client.session.Close()
client.ChannellingAPI.OnDisconnect(client, client.session) client.ChannellingAPI.OnDisconnect(client, client.session)
} }
func (client *client) OnText(b Buffer) { func (client *Client) OnText(b buffercache.Buffer) {
incoming, err := client.DecodeIncoming(b) incoming, err := client.Codec.DecodeIncoming(b)
if err != nil { if err != nil {
log.Println("OnText error while processing incoming message", err) log.Println("OnText error while processing incoming message", err)
return return
} }
if reply, err := client.OnIncoming(client, client.session, incoming); err != nil { if reply, err := client.ChannellingAPI.OnIncoming(client, client.session, incoming); err != nil {
client.reply(incoming.Iid, err) client.reply(incoming.Iid, err)
} else if reply != nil { } else if reply != nil {
client.reply(incoming.Iid, reply) client.reply(incoming.Iid, reply)
} }
} }
func (client *client) reply(iid string, m interface{}) { func (client *Client) reply(iid string, m interface{}) {
outgoing := &DataOutgoing{From: client.session.Id, Iid: iid, Data: m} outgoing := &DataOutgoing{From: client.session.Id, Iid: iid, Data: m}
if b, err := client.EncodeOutgoing(outgoing); err == nil { if b, err := client.Codec.EncodeOutgoing(outgoing); err == nil {
client.Send(b) client.Connection.Send(b)
b.Decref() b.Decref()
} }
} }
func (client *client) Session() *Session { func (client *Client) Session() *Session {
return client.session return client.session
} }
func (client *client) ReplaceAndClose(oldClient Client) { func (client *Client) ReplaceAndClose(oldClient *Client) {
oldSession := oldClient.Session() oldSession := oldClient.Session()
client.session.Replace(oldSession) client.session.Replace(oldSession)
go func() { go func() {

26
go/channelling/clientstats.go

@ -0,0 +1,26 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
type ClientStats interface {
ClientInfo(details bool) (int, map[string]*DataSession, map[string]string)
}

20
src/app/spreed-webrtc-server/incoming_codec.go → go/channelling/codec.go

@ -19,43 +19,45 @@
* *
*/ */
package main package channelling
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"errors" "errors"
"log" "log"
"github.com/strukturag/spreed-webrtc/go/buffercache"
) )
type IncomingDecoder interface { type IncomingDecoder interface {
DecodeIncoming(Buffer) (*DataIncoming, error) DecodeIncoming(buffercache.Buffer) (*DataIncoming, error)
} }
type OutgoingEncoder interface { type OutgoingEncoder interface {
EncodeOutgoing(*DataOutgoing) (Buffer, error) EncodeOutgoing(*DataOutgoing) (buffercache.Buffer, error)
} }
type Codec interface { type Codec interface {
NewBuffer() Buffer NewBuffer() buffercache.Buffer
IncomingDecoder IncomingDecoder
OutgoingEncoder OutgoingEncoder
} }
type incomingCodec struct { type incomingCodec struct {
buffers BufferCache buffers buffercache.BufferCache
incomingLimit int incomingLimit int
} }
func NewCodec(incomingLimit int) Codec { func NewCodec(incomingLimit int) Codec {
return &incomingCodec{NewBufferCache(1024, bytes.MinRead), incomingLimit} return &incomingCodec{buffercache.NewBufferCache(1024, bytes.MinRead), incomingLimit}
} }
func (codec incomingCodec) NewBuffer() Buffer { func (codec incomingCodec) NewBuffer() buffercache.Buffer {
return codec.buffers.New() return codec.buffers.New()
} }
func (codec incomingCodec) DecodeIncoming(b Buffer) (*DataIncoming, error) { func (codec incomingCodec) DecodeIncoming(b buffercache.Buffer) (*DataIncoming, error) {
length := b.GetBuffer().Len() length := b.GetBuffer().Len()
if length > codec.incomingLimit { if length > codec.incomingLimit {
return nil, errors.New("Incoming message size limit exceeded") return nil, errors.New("Incoming message size limit exceeded")
@ -64,7 +66,7 @@ func (codec incomingCodec) DecodeIncoming(b Buffer) (*DataIncoming, error) {
return incoming, json.Unmarshal(b.Bytes(), incoming) return incoming, json.Unmarshal(b.Bytes(), incoming)
} }
func (codec incomingCodec) EncodeOutgoing(outgoing *DataOutgoing) (Buffer, error) { func (codec incomingCodec) EncodeOutgoing(outgoing *DataOutgoing) (buffercache.Buffer, error) {
b := codec.NewBuffer() b := codec.NewBuffer()
if err := json.NewEncoder(b).Encode(outgoing); err != nil { if err := json.NewEncoder(b).Encode(outgoing); err != nil {
log.Println("Error while encoding JSON", err) log.Println("Error while encoding JSON", err)

2
src/app/spreed-webrtc-server/common_test.go → go/channelling/common_test.go

@ -19,7 +19,7 @@
* *
*/ */
package main package channelling
import ( import (
"testing" "testing"

43
go/channelling/config.go

@ -0,0 +1,43 @@
package channelling
import (
"net/http"
)
type Config struct {
Title string // Title
Ver string `json:"-"` // Version (not exported to Javascript)
S string // Static URL prefix with version
B string // Base URL
Token string // Server token
Renegotiation bool // Renegotiation flag
StunURIs []string // STUN server URIs
TurnURIs []string // TURN server URIs
Tokens bool // True when we got a tokens file
Version string // Server version number
UsersEnabled bool // Flag if users are enabled
UsersAllowRegistration bool // Flag if users can register
UsersMode string // Users mode string
DefaultRoomEnabled bool // Flag if default room ("") is enabled
Plugin string // Plugin to load
AuthorizeRoomCreation bool // Whether a user account is required to create rooms
AuthorizeRoomJoin bool // Whether a user account is required to join rooms
Modules []string // List of enabled modules
ModulesTable map[string]bool `json:"-"` // Map of enabled modules
GlobalRoomID string `json:"-"` // Id of the global room (not exported to Javascript)
ContentSecurityPolicy string `json:"-"` // HTML content security policy
ContentSecurityPolicyReportOnly string `json:"-"` // HTML content security policy in report only mode
RoomTypeDefault string `json:"-"` // New rooms default to this type
}
func (config *Config) WithModule(m string) bool {
if val, ok := config.ModulesTable[m]; ok && val {
return true
}
return false
}
func (config *Config) Get(request *http.Request) (int, interface{}, http.Header) {
return 200, config, http.Header{"Content-Type": {"application/json; charset=utf-8"}}
}

27
src/app/spreed-webrtc-server/connection.go → go/channelling/connection.go

@ -19,7 +19,7 @@
* *
*/ */
package main package channelling
import ( import (
"container/list" "container/list"
@ -28,6 +28,8 @@ import (
"sync" "sync"
"time" "time"
"github.com/strukturag/spreed-webrtc/go/buffercache"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
@ -55,17 +57,17 @@ const (
type Connection interface { type Connection interface {
Index() uint64 Index() uint64
Send(Buffer) Send(buffercache.Buffer)
Close() Close()
readPump() ReadPump()
writePump() WritePump()
} }
type ConnectionHandler interface { type ConnectionHandler interface {
NewBuffer() Buffer NewBuffer() buffercache.Buffer
OnConnect(Connection) OnConnect(Connection)
OnDisconnect() OnDisconnect()
OnText(Buffer) OnText(buffercache.Buffer)
} }
type connection struct { type connection struct {
@ -116,7 +118,7 @@ func (c *connection) Close() {
break break
} }
c.queue.Remove(head) c.queue.Remove(head)
message := head.Value.(Buffer) message := head.Value.(buffercache.Buffer)
message.Decref() message.Decref()
} }
c.condition.Signal() c.condition.Signal()
@ -124,7 +126,7 @@ func (c *connection) Close() {
} }
// readPump pumps messages from the websocket connection to the hub. // readPump pumps messages from the websocket connection to the hub.
func (c *connection) readPump() { func (c *connection) ReadPump() {
c.ws.SetReadLimit(maxMessageSize) c.ws.SetReadLimit(maxMessageSize)
c.ws.SetReadDeadline(time.Now().Add(pongWait)) c.ws.SetReadDeadline(time.Now().Add(pongWait))
c.ws.SetPongHandler(func(string) error { c.ws.SetPongHandler(func(string) error {
@ -161,7 +163,7 @@ func (c *connection) readPump() {
times.PushBack(now) times.PushBack(now)
message := c.handler.NewBuffer() message := c.handler.NewBuffer()
err = readAll(message, r) err = buffercache.ReadAll(message, r)
if err != nil { if err != nil {
message.Decref() message.Decref()
break break
@ -176,7 +178,7 @@ func (c *connection) readPump() {
} }
// Write message to outbound queue. // Write message to outbound queue.
func (c *connection) Send(message Buffer) { func (c *connection) Send(message buffercache.Buffer) {
c.mutex.Lock() c.mutex.Lock()
defer c.mutex.Unlock() defer c.mutex.Unlock()
if c.isClosed { if c.isClosed {
@ -190,11 +192,10 @@ func (c *connection) Send(message Buffer) {
message.Incref() message.Incref()
c.queue.PushBack(message) c.queue.PushBack(message)
c.condition.Signal() c.condition.Signal()
} }
// writePump pumps messages from the queue to the websocket connection. // writePump pumps messages from the queue to the websocket connection.
func (c *connection) writePump() { func (c *connection) WritePump() {
var timer *time.Timer var timer *time.Timer
ping := false ping := false
@ -232,7 +233,7 @@ func (c *connection) writePump() {
break break
} }
c.queue.Remove(head) c.queue.Remove(head)
message := head.Value.(Buffer) message := head.Value.(buffercache.Buffer)
if ping { if ping {
// Send ping. // Send ping.
ping = false ping = false

4
src/app/spreed-webrtc-server/contact.go → go/channelling/contact.go

@ -19,9 +19,7 @@
* *
*/ */
package main package channelling
import ()
type Contact struct { type Contact struct {
A string A string

27
go/channelling/contact_manager.go

@ -0,0 +1,27 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
type ContactManager interface {
ContactrequestHandler(*Session, string, *DataContactRequest) error
GetContactID(*Session, string) (string, error)
}

2
src/app/spreed-webrtc-server/context.go → go/channelling/context.go

@ -19,7 +19,7 @@
* *
*/ */
package main package channelling
type Context struct { type Context struct {
App string // Main client script App string // Main client script

2
src/app/spreed-webrtc-server/channelling.go → go/channelling/data.go

@ -19,7 +19,7 @@
* *
*/ */
package main package channelling
type DataError struct { type DataError struct {
Type string Type string

57
src/app/spreed-webrtc-server/hub.go → go/channelling/hub.go

@ -19,7 +19,7 @@
* *
*/ */
package main package channelling
import ( import (
"crypto/aes" "crypto/aes"
@ -36,35 +36,9 @@ import (
) )
const ( const (
turnTTL = 3600 // XXX(longsleep): Add to config file. turnTTL = 3600 // XXX(longsleep): Add to config file.
maxBroadcastPerSecond = 1000
maxUsersLength = 5000
) )
type SessionStore interface {
GetSession(id string) (session *Session, ok bool)
}
type Unicaster interface {
SessionStore
OnConnect(Client, *Session)
OnDisconnect(Client, *Session)
Unicast(to string, outgoing *DataOutgoing)
}
type ContactManager interface {
contactrequestHandler(*Session, string, *DataContactRequest) error
getContactID(*Session, string) (string, error)
}
type TurnDataCreator interface {
CreateTurnData(*Session) *DataTurn
}
type ClientStats interface {
ClientInfo(details bool) (int, map[string]*DataSession, map[string]string)
}
type Hub interface { type Hub interface {
ClientStats ClientStats
Unicaster Unicaster
@ -74,7 +48,7 @@ type Hub interface {
type hub struct { type hub struct {
OutgoingEncoder OutgoingEncoder
clients map[string]Client clients map[string]*Client
config *Config config *Config
turnSecret []byte turnSecret []byte
mutex sync.RWMutex mutex sync.RWMutex
@ -82,10 +56,9 @@ type hub struct {
} }
func NewHub(config *Config, sessionSecret, encryptionSecret, turnSecret []byte, encoder OutgoingEncoder) Hub { func NewHub(config *Config, sessionSecret, encryptionSecret, turnSecret []byte, encoder OutgoingEncoder) Hub {
h := &hub{ h := &hub{
OutgoingEncoder: encoder, OutgoingEncoder: encoder,
clients: make(map[string]Client), clients: make(map[string]*Client),
config: config, config: config,
turnSecret: turnSecret, turnSecret: turnSecret,
} }
@ -94,8 +67,8 @@ func NewHub(config *Config, sessionSecret, encryptionSecret, turnSecret []byte,
h.contacts.MaxAge(0) // Forever h.contacts.MaxAge(0) // Forever
h.contacts.HashFunc(sha256.New) h.contacts.HashFunc(sha256.New)
h.contacts.BlockFunc(aes.NewCipher) h.contacts.BlockFunc(aes.NewCipher)
return h
return h
} }
func (h *hub) ClientInfo(details bool) (clientCount int, sessions map[string]*DataSession, connections map[string]string) { func (h *hub) ClientInfo(details bool) (clientCount int, sessions map[string]*DataSession, connections map[string]string) {
@ -119,7 +92,6 @@ func (h *hub) ClientInfo(details bool) (clientCount int, sessions map[string]*Da
} }
func (h *hub) CreateTurnData(session *Session) *DataTurn { func (h *hub) CreateTurnData(session *Session) *DataTurn {
// Create turn data credentials for shared secret auth with TURN // Create turn data credentials for shared secret auth with TURN
// server. See http://tools.ietf.org/html/draft-uberti-behave-turn-rest-00 // server. See http://tools.ietf.org/html/draft-uberti-behave-turn-rest-00
// and https://code.google.com/p/rfc5766-turn-server/ REST API auth // and https://code.google.com/p/rfc5766-turn-server/ REST API auth
@ -136,20 +108,21 @@ func (h *hub) CreateTurnData(session *Session) *DataTurn {
user := fmt.Sprintf("%d:%s", expiration, id) user := fmt.Sprintf("%d:%s", expiration, id)
foo.Write([]byte(user)) foo.Write([]byte(user))
password := base64.StdEncoding.EncodeToString(foo.Sum(nil)) password := base64.StdEncoding.EncodeToString(foo.Sum(nil))
return &DataTurn{user, password, turnTTL, h.config.TurnURIs}
return &DataTurn{user, password, turnTTL, h.config.TurnURIs}
} }
func (h *hub) GetSession(id string) (session *Session, ok bool) { func (h *hub) GetSession(id string) (session *Session, ok bool) {
var client Client var client *Client
client, ok = h.GetClient(id) client, ok = h.GetClient(id)
if ok { if ok {
session = client.Session() session = client.Session()
} }
return return
} }
func (h *hub) OnConnect(client Client, session *Session) { func (h *hub) OnConnect(client *Client, session *Session) {
h.mutex.Lock() h.mutex.Lock()
log.Printf("Created client %d with id %s\n", client.Index(), session.Id) log.Printf("Created client %d with id %s\n", client.Index(), session.Id)
// Register connection or replace existing one. // Register connection or replace existing one.
@ -161,7 +134,7 @@ func (h *hub) OnConnect(client Client, session *Session) {
h.mutex.Unlock() h.mutex.Unlock()
} }
func (h *hub) OnDisconnect(client Client, session *Session) { func (h *hub) OnDisconnect(client *Client, session *Session) {
h.mutex.Lock() h.mutex.Lock()
if ec, ok := h.clients[session.Id]; ok { if ec, ok := h.clients[session.Id]; ok {
if ec == client { if ec == client {
@ -174,10 +147,11 @@ func (h *hub) OnDisconnect(client Client, session *Session) {
h.mutex.Unlock() h.mutex.Unlock()
} }
func (h *hub) GetClient(sessionID string) (client Client, ok bool) { func (h *hub) GetClient(sessionID string) (client *Client, ok bool) {
h.mutex.RLock() h.mutex.RLock()
client, ok = h.clients[sessionID] client, ok = h.clients[sessionID]
h.mutex.RUnlock() h.mutex.RUnlock()
return return
} }
@ -193,7 +167,7 @@ func (h *hub) Unicast(to string, outgoing *DataOutgoing) {
} }
} }
func (h *hub) getContactID(session *Session, token string) (userid string, err error) { func (h *hub) GetContactID(session *Session, token string) (userid string, err error) {
contact := &Contact{} contact := &Contact{}
err = h.contacts.Decode("contact", token, contact) err = h.contacts.Decode("contact", token, contact)
if err != nil { if err != nil {
@ -210,11 +184,11 @@ func (h *hub) getContactID(session *Session, token string) (userid string, err e
if userid == "" { if userid == "" {
err = fmt.Errorf("Ignoring foreign contact token", contact.A, contact.B) err = fmt.Errorf("Ignoring foreign contact token", contact.A, contact.B)
} }
return return
} }
func (h *hub) contactrequestHandler(session *Session, to string, cr *DataContactRequest) error { func (h *hub) ContactrequestHandler(session *Session, to string, cr *DataContactRequest) error {
var err error var err error
if cr.Success { if cr.Success {
@ -274,5 +248,4 @@ func (h *hub) contactrequestHandler(session *Session, to string, cr *DataContact
} }
return err return err
} }

22
src/app/spreed-webrtc-server/images.go → go/channelling/imagecache.go

@ -19,7 +19,7 @@
* *
*/ */
package main package channelling
import ( import (
"bytes" "bytes"
@ -43,11 +43,25 @@ type Image struct {
data []byte data []byte
} }
func (img *Image) LastChangeID() string {
return img.lastChangeId
}
func (img *Image) LastChange() time.Time {
return img.lastChange
}
func (img *Image) MimeType() string {
return img.mimetype
}
func (img *Image) Reader() *bytes.Reader {
return bytes.NewReader(img.data)
}
type ImageCache interface { type ImageCache interface {
Update(sessionId string, image string) string Update(sessionId string, image string) string
Get(imageId string) *Image Get(imageId string) *Image
Delete(sessionId string) Delete(sessionId string)
} }
@ -136,6 +150,7 @@ func (self *imageCache) Update(sessionId string, image string) string {
if ok { if ok {
result += "/" + filename result += "/" + filename
} }
return result return result
} }
@ -143,6 +158,7 @@ func (self *imageCache) Get(imageId string) *Image {
self.mutex.RLock() self.mutex.RLock()
image := self.images[imageId] image := self.images[imageId]
self.mutex.RUnlock() self.mutex.RUnlock()
return image return image
} }

2
src/app/spreed-webrtc-server/nats.go → go/channelling/nats.go

@ -1,4 +1,4 @@
package main package channelling
import ( import (
"errors" "errors"

14
src/app/spreed-webrtc-server/room_manager.go → go/channelling/room_manager.go

@ -19,7 +19,7 @@
* *
*/ */
package main package channelling
import ( import (
"fmt" "fmt"
@ -65,8 +65,8 @@ func NewRoomManager(config *Config, encoder OutgoingEncoder) RoomManager {
OutgoingEncoder: encoder, OutgoingEncoder: encoder,
roomTable: make(map[string]RoomWorker), roomTable: make(map[string]RoomWorker),
} }
if config.globalRoomID != "" { if config.GlobalRoomID != "" {
rm.globalRoomID = rm.MakeRoomID(config.globalRoomID, "") rm.globalRoomID = rm.MakeRoomID(config.GlobalRoomID, "")
} }
rm.defaultRoomID = rm.MakeRoomID("", "") rm.defaultRoomID = rm.MakeRoomID("", "")
return rm return rm
@ -111,7 +111,8 @@ func (rooms *roomManager) UpdateRoom(session *Session, room *DataRoom) (*DataRoo
return room, roomWorker.Update(room) return room, roomWorker.Update(room)
} }
// Set default room type if room was not found. // Set default room type if room was not found.
room.Type = rooms.roomTypeDefault room.Type = rooms.RoomTypeDefault
// TODO(lcooper): We should almost certainly return an error in this case. // TODO(lcooper): We should almost certainly return an error in this case.
return room, nil return room, nil
} }
@ -147,6 +148,7 @@ func (rooms *roomManager) RoomInfo(includeSessions bool) (count int, sessionInfo
sessionInfo[roomid] = room.SessionIDs() sessionInfo[roomid] = room.SessionIDs()
} }
} }
return return
} }
@ -154,6 +156,7 @@ func (rooms *roomManager) Get(roomID string) (room RoomWorker, ok bool) {
rooms.RLock() rooms.RLock()
room, ok = rooms.roomTable[roomID] room, ok = rooms.roomTable[roomID]
rooms.RUnlock() rooms.RUnlock()
return return
} }
@ -211,7 +214,8 @@ func (rooms *roomManager) GlobalUsers() []*roomUser {
func (rooms *roomManager) MakeRoomID(roomName, roomType string) string { func (rooms *roomManager) MakeRoomID(roomName, roomType string) string {
if roomType == "" { if roomType == "" {
roomType = rooms.roomTypeDefault roomType = rooms.RoomTypeDefault
} }
return fmt.Sprintf("%s:%s", roomType, roomName) return fmt.Sprintf("%s:%s", roomType, roomName)
} }

4
src/app/spreed-webrtc-server/room_manager_test.go → go/channelling/room_manager_test.go

@ -19,7 +19,7 @@
* *
*/ */
package main package channelling
import ( import (
"testing" "testing"
@ -27,7 +27,7 @@ import (
func NewTestRoomManager() (RoomManager, *Config) { func NewTestRoomManager() (RoomManager, *Config) {
config := &Config{ config := &Config{
roomTypeDefault: "Room", RoomTypeDefault: "Room",
} }
return NewRoomManager(config, nil), config return NewRoomManager(config, nil), config
} }

23
src/app/spreed-webrtc-server/roomworker.go → go/channelling/roomworker.go

@ -19,18 +19,21 @@
* *
*/ */
package main package channelling
import ( import (
"crypto/subtle" "crypto/subtle"
"log" "log"
"sync" "sync"
"time" "time"
"github.com/strukturag/spreed-webrtc/go/buffercache"
) )
const ( const (
roomMaxWorkers = 10000 roomMaxWorkers = 10000
roomExpiryDuration = 60 * time.Second roomExpiryDuration = 60 * time.Second
maxUsersLength = 5000
) )
type RoomWorker interface { type RoomWorker interface {
@ -39,7 +42,7 @@ type RoomWorker interface {
Users() []*roomUser Users() []*roomUser
Update(*DataRoom) error Update(*DataRoom) error
GetUsers() []*DataSession GetUsers() []*DataSession
Broadcast(sessionID string, buf Buffer) Broadcast(sessionID string, buf buffercache.Buffer)
Join(*DataRoomCredentials, *Session, Sender) (*DataRoom, error) Join(*DataRoomCredentials, *Session, Sender) (*DataRoom, error)
Leave(sessionID string) Leave(sessionID string)
} }
@ -68,7 +71,6 @@ type roomUser struct {
} }
func NewRoomWorker(manager *roomManager, roomID, roomName, roomType string, credentials *DataRoomCredentials) RoomWorker { func NewRoomWorker(manager *roomManager, roomID, roomName, roomType string, credentials *DataRoomCredentials) RoomWorker {
log.Printf("Creating worker for room '%s'\n", roomID) log.Printf("Creating worker for room '%s'\n", roomID)
r := &roomWorker{ r := &roomWorker{
@ -91,11 +93,9 @@ func NewRoomWorker(manager *roomManager, roomID, roomName, roomType string, cred
}) })
return r return r
} }
func (r *roomWorker) Start() { func (r *roomWorker) Start() {
// Main blocking worker. // Main blocking worker.
L: L:
for { for {
@ -122,7 +122,6 @@ L:
r.timer.Stop() r.timer.Stop()
close(r.workers) close(r.workers)
//fmt.Println("Exit worker", r.Id) //fmt.Println("Exit worker", r.Id)
} }
func (r *roomWorker) SessionIDs() []string { func (r *roomWorker) SessionIDs() []string {
@ -132,23 +131,22 @@ func (r *roomWorker) SessionIDs() []string {
for id := range r.users { for id := range r.users {
sessions = append(sessions, id) sessions = append(sessions, id)
} }
return sessions return sessions
} }
func (r *roomWorker) Users() []*roomUser { func (r *roomWorker) Users() []*roomUser {
r.mutex.RLock() r.mutex.RLock()
defer r.mutex.RUnlock() defer r.mutex.RUnlock()
users := make([]*roomUser, 0, len(r.users)) users := make([]*roomUser, 0, len(r.users))
for _, user := range r.users { for _, user := range r.users {
users = append(users, user) users = append(users, user)
} }
return users
return users
} }
func (r *roomWorker) Run(f func()) bool { func (r *roomWorker) Run(f func()) bool {
select { select {
case r.workers <- f: case r.workers <- f:
return true return true
@ -156,7 +154,6 @@ func (r *roomWorker) Run(f func()) bool {
log.Printf("Room worker channel full or closed '%s'\n", r.id) log.Printf("Room worker channel full or closed '%s'\n", r.id)
return false return false
} }
} }
func (r *roomWorker) Update(room *DataRoom) error { func (r *roomWorker) Update(room *DataRoom) error {
@ -178,6 +175,7 @@ func (r *roomWorker) Update(room *DataRoom) error {
fault <- nil fault <- nil
} }
r.Run(worker) r.Run(worker)
return <-fault return <-fault
} }
@ -223,8 +221,7 @@ func (r *roomWorker) GetUsers() []*DataSession {
return <-out return <-out
} }
func (r *roomWorker) Broadcast(sessionID string, message Buffer) { func (r *roomWorker) Broadcast(sessionID string, message buffercache.Buffer) {
worker := func() { worker := func() {
r.mutex.RLock() r.mutex.RLock()
for id, user := range r.users { for id, user := range r.users {
@ -241,7 +238,6 @@ func (r *roomWorker) Broadcast(sessionID string, message Buffer) {
message.Incref() message.Incref()
r.Run(worker) r.Run(worker)
} }
type joinResult struct { type joinResult struct {
@ -280,6 +276,7 @@ func (r *roomWorker) Join(credentials *DataRoomCredentials, session *Session, se
} }
r.Run(worker) r.Run(worker)
result := <-results result := <-results
return result.DataRoom, result.error return result.DataRoom, result.error
} }

2
src/app/spreed-webrtc-server/roomworker_test.go → go/channelling/roomworker_test.go

@ -19,7 +19,7 @@
* *
*/ */
package main package channelling
import ( import (
"testing" "testing"

2
src/app/spreed-webrtc-server/api.go → go/channelling/server/api.go

@ -19,7 +19,7 @@
* *
*/ */
package main package server
import () import ()

63
src/app/spreed-webrtc-server/config.go → go/channelling/server/config.go

@ -19,44 +19,20 @@
* *
*/ */
package main package server
import ( import (
"fmt" "fmt"
"github.com/strukturag/phoenix"
"log" "log"
"net/http"
"strings" "strings"
"time" "time"
)
type Config struct { "github.com/strukturag/spreed-webrtc/go/channelling"
Title string // Title
ver string // Version (not exported to Javascript)
S string // Static URL prefix with version
B string // Base URL
Token string // Server token
Renegotiation bool // Renegotiation flag
StunURIs []string // STUN server URIs
TurnURIs []string // TURN server URIs
Tokens bool // True when we got a tokens file
Version string // Server version number
UsersEnabled bool // Flag if users are enabled
UsersAllowRegistration bool // Flag if users can register
UsersMode string // Users mode string
DefaultRoomEnabled bool // Flag if default room ("") is enabled
Plugin string // Plugin to load
AuthorizeRoomCreation bool // Whether a user account is required to create rooms
AuthorizeRoomJoin bool // Whether a user account is required to join rooms
Modules []string // List of enabled modules
modulesTable map[string]bool // Map of enabled modules
globalRoomID string // Id of the global room (not exported to Javascript)
contentSecurityPolicy string // HTML content security policy
contentSecurityPolicyReportOnly string // HTML content security policy in report only mode
roomTypeDefault string // New rooms default to this type
}
func NewConfig(container phoenix.Container, tokens bool) *Config { "github.com/strukturag/phoenix"
)
func NewConfig(container phoenix.Container, tokens bool) *channelling.Config {
ver := container.GetStringDefault("app", "ver", "") ver := container.GetStringDefault("app", "ver", "")
version := container.Version() version := container.Version()
@ -107,9 +83,9 @@ func NewConfig(container phoenix.Container, tokens bool) *Config {
} }
log.Println("Enabled modules:", modules) log.Println("Enabled modules:", modules)
return &Config{ return &channelling.Config{
Title: container.GetStringDefault("app", "title", "Spreed WebRTC"), Title: container.GetStringDefault("app", "title", "Spreed WebRTC"),
ver: ver, Ver: ver,
S: fmt.Sprintf("static/ver=%s", ver), S: fmt.Sprintf("static/ver=%s", ver),
B: basePath, B: basePath,
Token: serverToken, Token: serverToken,
@ -126,27 +102,14 @@ func NewConfig(container phoenix.Container, tokens bool) *Config {
AuthorizeRoomCreation: container.GetBoolDefault("app", "authorizeRoomCreation", false), AuthorizeRoomCreation: container.GetBoolDefault("app", "authorizeRoomCreation", false),
AuthorizeRoomJoin: container.GetBoolDefault("app", "authorizeRoomJoin", false), AuthorizeRoomJoin: container.GetBoolDefault("app", "authorizeRoomJoin", false),
Modules: modules, Modules: modules,
modulesTable: modulesTable, ModulesTable: modulesTable,
globalRoomID: container.GetStringDefault("app", "globalRoom", ""), GlobalRoomID: container.GetStringDefault("app", "globalRoom", ""),
contentSecurityPolicy: container.GetStringDefault("app", "contentSecurityPolicy", ""), ContentSecurityPolicy: container.GetStringDefault("app", "contentSecurityPolicy", ""),
contentSecurityPolicyReportOnly: container.GetStringDefault("app", "contentSecurityPolicyReportOnly", ""), ContentSecurityPolicyReportOnly: container.GetStringDefault("app", "contentSecurityPolicyReportOnly", ""),
roomTypeDefault: "Room", RoomTypeDefault: "Room",
} }
} }
func (config *Config) Get(request *http.Request) (int, interface{}, http.Header) {
return 200, config, http.Header{"Content-Type": {"application/json; charset=utf-8"}}
}
func (config *Config) WithModule(m string) bool {
if val, ok := config.modulesTable[m]; ok && val {
return true
}
return false
}
// Helper function to clean up string arrays. // Helper function to clean up string arrays.
func trimAndRemoveDuplicates(data *[]string) { func trimAndRemoveDuplicates(data *[]string) {
found := make(map[string]bool) found := make(map[string]bool)

6
src/app/spreed-webrtc-server/rooms.go → go/channelling/server/rooms.go

@ -19,11 +19,13 @@
* *
*/ */
package main package server
import ( import (
"fmt" "fmt"
"net/http" "net/http"
"github.com/strukturag/spreed-webrtc/go/randomstring"
) )
type Room struct { type Room struct {
@ -36,7 +38,7 @@ type Rooms struct {
func (rooms *Rooms) Post(request *http.Request) (int, interface{}, http.Header) { func (rooms *Rooms) Post(request *http.Request) (int, interface{}, http.Header) {
name := NewRandomString(11) name := randomstring.NewRandomString(11)
return 200, &Room{name, fmt.Sprintf("/%s", name)}, http.Header{"Content-Type": {"application/json"}} return 200, &Room{name, fmt.Sprintf("/%s", name)}, http.Header{"Content-Type": {"application/json"}}
} }

19
src/app/spreed-webrtc-server/sessions.go → go/channelling/server/sessions.go

@ -19,14 +19,17 @@
* *
*/ */
package main package server
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"github.com/gorilla/mux"
"log" "log"
"net/http" "net/http"
"github.com/strukturag/spreed-webrtc/go/channelling"
"github.com/gorilla/mux"
) )
type SessionNonce struct { type SessionNonce struct {
@ -43,9 +46,9 @@ type SessionNonceRequest struct {
} }
type Sessions struct { type Sessions struct {
SessionValidator channelling.SessionValidator
SessionStore channelling.SessionStore
users *Users Users *Users
} }
// Patch is used to add a userid to a given session (login). // Patch is used to add a userid to a given session (login).
@ -87,8 +90,8 @@ func (sessions *Sessions) Patch(request *http.Request) (int, interface{}, http.H
var userid string var userid string
// Validate with users handler. // Validate with users handler.
if sessions.users.handler != nil { if sessions.Users.handler != nil {
userid, err = sessions.users.handler.Validate(&snr, request) userid, err = sessions.Users.handler.Validate(&snr, request)
if err != nil { if err != nil {
error = true error = true
log.Println("Session patch failed - users validation failed.", err) log.Println("Session patch failed - users validation failed.", err)
@ -107,7 +110,7 @@ func (sessions *Sessions) Patch(request *http.Request) (int, interface{}, http.H
if !error { if !error {
// FIXME(longsleep): Not running this might reveal error state with a timing attack. // FIXME(longsleep): Not running this might reveal error state with a timing attack.
if session, ok := sessions.GetSession(snr.Id); ok { if session, ok := sessions.GetSession(snr.Id); ok {
nonce, err = session.Authorize(sessions.Realm(), &SessionToken{Id: snr.Id, Sid: snr.Sid, Userid: userid}) nonce, err = session.Authorize(sessions.Realm(), &channelling.SessionToken{Id: snr.Id, Sid: snr.Sid, Userid: userid})
} else { } else {
err = errors.New("no such session") err = errors.New("no such session")
} }

12
src/app/spreed-webrtc-server/stats.go → go/channelling/server/stats.go

@ -19,21 +19,23 @@
* *
*/ */
package main package server
import ( import (
"net/http" "net/http"
"runtime" "runtime"
"time" "time"
"github.com/strukturag/spreed-webrtc/go/channelling"
) )
type Stat struct { type Stat struct {
details bool details bool
Runtime *RuntimeStat `json:"runtime"` Runtime *RuntimeStat `json:"runtime"`
Hub *HubStat `json:"hub"` Hub *channelling.HubStat `json:"hub"`
} }
func NewStat(details bool, statsGenerator StatsGenerator) *Stat { func NewStat(details bool, statsGenerator channelling.StatsGenerator) *Stat {
stat := &Stat{ stat := &Stat{
details: details, details: details,
Runtime: &RuntimeStat{}, Runtime: &RuntimeStat{},
@ -69,7 +71,7 @@ func (stat *RuntimeStat) Read() {
} }
type Stats struct { type Stats struct {
StatsGenerator channelling.StatsGenerator
} }
func (stats *Stats) Get(request *http.Request) (int, interface{}, http.Header) { func (stats *Stats) Get(request *http.Request) (int, interface{}, http.Header) {

2
src/app/spreed-webrtc-server/tls.go → go/channelling/server/tls.go

@ -32,7 +32,7 @@
* *
*/ */
package main package server
import ( import (
"crypto" "crypto"

8
src/app/spreed-webrtc-server/tokens.go → go/channelling/server/tokens.go

@ -19,12 +19,14 @@
* *
*/ */
package main package server
import ( import (
"log" "log"
"net/http" "net/http"
"strings" "strings"
"github.com/strukturag/spreed-webrtc/go/channelling"
) )
type Token struct { type Token struct {
@ -33,7 +35,7 @@ type Token struct {
} }
type Tokens struct { type Tokens struct {
provider TokenProvider Provider channelling.TokenProvider
} }
func (tokens Tokens) Post(request *http.Request) (int, interface{}, http.Header) { func (tokens Tokens) Post(request *http.Request) (int, interface{}, http.Header) {
@ -44,7 +46,7 @@ func (tokens Tokens) Post(request *http.Request) (int, interface{}, http.Header)
return 413, NewApiError("auth_too_large", "Auth too large"), http.Header{"Content-Type": {"application/json"}} return 413, NewApiError("auth_too_large", "Auth too large"), http.Header{"Content-Type": {"application/json"}}
} }
valid := tokens.provider(strings.ToLower(auth)) valid := tokens.Provider(strings.ToLower(auth))
if valid != "" { if valid != "" {
log.Printf("Good incoming token request: %s\n", auth) log.Printf("Good incoming token request: %s\n", auth)

21
src/app/spreed-webrtc-server/users.go → go/channelling/server/users.go

@ -19,7 +19,7 @@
* *
*/ */
package main package server
import ( import (
"crypto" "crypto"
@ -34,15 +34,18 @@ import (
"encoding/pem" "encoding/pem"
"errors" "errors"
"fmt" "fmt"
"github.com/longsleep/pkac"
"github.com/satori/go.uuid"
"github.com/strukturag/phoenix"
"log" "log"
"math/big" "math/big"
"net/http" "net/http"
"strconv" "strconv"
"strings" "strings"
"time" "time"
"github.com/strukturag/spreed-webrtc/go/channelling"
"github.com/longsleep/pkac"
"github.com/satori/go.uuid"
"github.com/strukturag/phoenix"
) )
var ( var (
@ -290,14 +293,14 @@ func (un *UserNonce) Response() (int, interface{}, http.Header) {
} }
type Users struct { type Users struct {
SessionValidator channelling.SessionValidator
SessionManager channelling.SessionManager
SessionStore channelling.SessionStore
realm string realm string
handler UsersHandler handler UsersHandler
} }
func NewUsers(sessionStore SessionStore, sessionValidator SessionValidator, sessionManager SessionManager, mode, realm string, runtime phoenix.Runtime) *Users { func NewUsers(sessionStore channelling.SessionStore, sessionValidator channelling.SessionValidator, sessionManager channelling.SessionManager, mode, realm string, runtime phoenix.Runtime) *Users {
var users = &Users{ var users = &Users{
sessionValidator, sessionValidator,
@ -463,7 +466,7 @@ func (users *Users) Post(request *http.Request) (int, interface{}, http.Header)
err error err error
) )
if session, ok := users.GetSession(snr.Id); ok { if session, ok := users.GetSession(snr.Id); ok {
nonce, err = session.Authorize(users.Realm(), &SessionToken{Id: snr.Id, Sid: snr.Sid, Userid: userid}) nonce, err = session.Authorize(users.Realm(), &channelling.SessionToken{Id: snr.Id, Sid: snr.Sid, Userid: userid})
} else { } else {
err = errors.New("no such session") err = errors.New("no such session")
} }

88
src/app/spreed-webrtc-server/session.go → go/channelling/session.go

@ -19,14 +19,15 @@
* *
*/ */
package main package channelling
import ( import (
"fmt" "fmt"
"github.com/gorilla/securecookie"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/gorilla/securecookie"
) )
var sessionNonces *securecookie.SecureCookie var sessionNonces *securecookie.SecureCookie
@ -58,8 +59,14 @@ type Session struct {
replaced bool replaced bool
} }
func NewSession(manager SessionManager, unicaster Unicaster, broadcaster Broadcaster, rooms RoomStatusManager, buddyImages ImageCache, attestations *securecookie.SecureCookie, id, sid string) *Session { func NewSession(manager SessionManager,
unicaster Unicaster,
broadcaster Broadcaster,
rooms RoomStatusManager,
buddyImages ImageCache,
attestations *securecookie.SecureCookie,
id,
sid string) *Session {
session := &Session{ session := &Session{
SessionManager: manager, SessionManager: manager,
Unicaster: unicaster, Unicaster: unicaster,
@ -75,8 +82,8 @@ func NewSession(manager SessionManager, unicaster Unicaster, broadcaster Broadca
subscribers: make(map[string]*Session), subscribers: make(map[string]*Session),
} }
session.NewAttestation() session.NewAttestation()
return session
return session
} }
func (s *Session) authenticated() (authenticated bool) { func (s *Session) authenticated() (authenticated bool) {
@ -152,6 +159,7 @@ func (s *Session) JoinRoom(roomName, roomType string, credentials *DataRoomCrede
} else { } else {
s.Hello = false s.Hello = false
} }
return room, err return room, err
} }
@ -248,7 +256,6 @@ func (s *Session) Close() {
} }
func (s *Session) Replace(oldSession *Session) { func (s *Session) Replace(oldSession *Session) {
oldSession.mutex.Lock() oldSession.mutex.Lock()
if oldSession.disconnected { if oldSession.disconnected {
oldSession.mutex.Unlock() oldSession.mutex.Unlock()
@ -265,7 +272,6 @@ func (s *Session) Replace(oldSession *Session) {
// Mark old session as replaced. // Mark old session as replaced.
oldSession.replaced = true oldSession.replaced = true
oldSession.mutex.Unlock() oldSession.mutex.Unlock()
} }
func (s *Session) Update(update *SessionUpdate) uint64 { func (s *Session) Update(update *SessionUpdate) uint64 {
@ -301,11 +307,9 @@ func (s *Session) Update(update *SessionUpdate) uint64 {
s.UpdateRev++ s.UpdateRev++
return s.UpdateRev return s.UpdateRev
} }
func (s *Session) Authorize(realm string, st *SessionToken) (string, error) { func (s *Session) Authorize(realm string, st *SessionToken) (string, error) {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
@ -324,11 +328,9 @@ func (s *Session) Authorize(realm string, st *SessionToken) (string, error) {
} }
return s.Nonce, err return s.Nonce, err
} }
func (s *Session) Authenticate(realm string, st *SessionToken, userid string) error { func (s *Session) Authenticate(realm string, st *SessionToken, userid string) error {
s.mutex.Lock() s.mutex.Lock()
defer s.mutex.Unlock() defer s.mutex.Unlock()
@ -352,12 +354,11 @@ func (s *Session) Authenticate(realm string, st *SessionToken, userid string) er
s.userid = userid s.userid = userid
s.stamp = time.Now().Unix() s.stamp = time.Now().Unix()
s.UpdateRev++ s.UpdateRev++
return nil
return nil
} }
func (s *Session) Token() *SessionToken { func (s *Session) Token() *SessionToken {
s.mutex.RLock() s.mutex.RLock()
defer s.mutex.RUnlock() defer s.mutex.RUnlock()
@ -365,7 +366,6 @@ func (s *Session) Token() *SessionToken {
} }
func (s *Session) Data() *DataSession { func (s *Session) Data() *DataSession {
s.mutex.RLock() s.mutex.RLock()
defer s.mutex.RUnlock() defer s.mutex.RUnlock()
@ -378,25 +378,21 @@ func (s *Session) Data() *DataSession {
Prio: s.Prio, Prio: s.Prio,
stamp: s.stamp, stamp: s.stamp,
} }
} }
func (s *Session) Userid() (userid string) { func (s *Session) Userid() (userid string) {
s.mutex.RLock() s.mutex.RLock()
userid = s.userid userid = s.userid
s.mutex.RUnlock() s.mutex.RUnlock()
return
return
} }
func (s *Session) SetUseridFake(userid string) { func (s *Session) SetUseridFake(userid string) {
s.mutex.Lock() s.mutex.Lock()
s.userid = userid s.userid = userid
s.fake = true s.fake = true
s.mutex.Unlock() s.mutex.Unlock()
} }
func (s *Session) NewAttestation() { func (s *Session) NewAttestation() {
@ -406,58 +402,14 @@ func (s *Session) NewAttestation() {
s.attestation.Update() s.attestation.Update()
} }
func (s *Session) UpdateAttestation() { func (s *Session) UpdateAttestation() (string, error) {
s.mutex.Lock() s.mutex.Lock()
s.attestation.Update() defer s.mutex.Unlock()
s.mutex.Unlock() return s.attestation.Update()
}
type SessionUpdate struct {
Types []string
Ua string
Prio int
Status interface{}
}
type SessionToken struct {
Id string // Public session id.
Sid string // Secret session id.
Userid string // Public user id.
Nonce string `json:"Nonce,omitempty"` // User autentication nonce.
}
type SessionAttestation struct {
refresh int64
token string
s *Session
}
func (sa *SessionAttestation) Update() (string, error) {
token, err := sa.Encode()
if err == nil {
sa.token = token
sa.refresh = time.Now().Unix() + 180 // expires after 3 minutes
}
return token, err
}
func (sa *SessionAttestation) Token() (token string) {
if sa.refresh < time.Now().Unix() {
token, _ = sa.Update()
} else {
token = sa.token
}
return
}
func (sa *SessionAttestation) Encode() (string, error) {
return sa.s.attestations.Encode("attestation", sa.s.Id)
} }
func (sa *SessionAttestation) Decode(token string) (string, error) { func (s *Session) DecodeAttestation(token string) (string, error) {
var id string return s.attestation.Decode(token)
err := sa.s.attestations.Decode("attestation", token, &id)
return id, err
} }
func init() { func init() {

5
src/app/spreed-webrtc-server/session_manager.go → go/channelling/session_manager.go

@ -19,7 +19,7 @@
* *
*/ */
package main package channelling
import ( import (
"crypto/sha256" "crypto/sha256"
@ -89,6 +89,7 @@ func (sessionManager *sessionManager) UserInfo(details bool) (userCount int, use
users[userid] = user.Data() users[userid] = user.Data()
} }
} }
return return
} }
@ -149,6 +150,7 @@ func (sessionManager *sessionManager) Authenticate(session *Session, st *Session
} }
sessionManager.Unlock() sessionManager.Unlock()
user.AddSession(session) user.AddSession(session)
return nil return nil
} }
@ -177,5 +179,6 @@ func (sessionManager *sessionManager) GetUserSessions(session *Session, userid s
// Add sessions for foreign user. // Add sessions for foreign user.
users = user.SubscribeSessions(session) users = user.SubscribeSessions(session)
} }
return return
} }

60
go/channelling/sessionattestation.go

@ -0,0 +1,60 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
import (
"time"
)
type SessionAttestation struct {
refresh int64
token string
s *Session
}
func (sa *SessionAttestation) Update() (string, error) {
token, err := sa.Encode()
if err == nil {
sa.token = token
sa.refresh = time.Now().Unix() + 180 // expires after 3 minutes
}
return token, err
}
func (sa *SessionAttestation) Token() (token string) {
if sa.refresh < time.Now().Unix() {
token, _ = sa.Update()
} else {
token = sa.token
}
return
}
func (sa *SessionAttestation) Encode() (string, error) {
return sa.s.attestations.Encode("attestation", sa.s.Id)
}
func (sa *SessionAttestation) Decode(token string) (string, error) {
var id string
err := sa.s.attestations.Decode("attestation", token, &id)
return id, err
}

26
go/channelling/sessionstore.go

@ -0,0 +1,26 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
type SessionStore interface {
GetSession(id string) (session *Session, ok bool)
}

29
go/channelling/sessiontoken.go

@ -0,0 +1,29 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
type SessionToken struct {
Id string // Public session id.
Sid string // Secret session id.
Userid string // Public user id.
Nonce string `json:"Nonce,omitempty"` // User autentication nonce.
}

29
go/channelling/sessionupdate.go

@ -0,0 +1,29 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
type SessionUpdate struct {
Types []string
Ua string
Prio int
Status interface{}
}

2
src/app/spreed-webrtc-server/stats_manager.go → go/channelling/stats_manager.go

@ -19,7 +19,7 @@
* *
*/ */
package main package channelling
import ( import (
"sync/atomic" "sync/atomic"

8
src/app/spreed-webrtc-server/tickets.go → go/channelling/tickets.go

@ -19,7 +19,7 @@
* *
*/ */
package main package channelling
import ( import (
"crypto/aes" "crypto/aes"
@ -29,6 +29,8 @@ import (
"fmt" "fmt"
"log" "log"
"github.com/strukturag/spreed-webrtc/go/randomstring"
"github.com/gorilla/securecookie" "github.com/gorilla/securecookie"
) )
@ -86,7 +88,7 @@ func (tickets *tickets) DecodeSessionToken(token string) (st *SessionToken) {
} }
if st == nil || err != nil { if st == nil || err != nil {
sid := NewRandomString(32) sid := randomstring.NewRandomString(32)
id, _ := tickets.Encode("id", sid) id, _ := tickets.Encode("id", sid)
st = &SessionToken{Id: id, Sid: sid} st = &SessionToken{Id: id, Sid: sid}
log.Println("Created new session id", id) log.Println("Created new session id", id)
@ -96,7 +98,7 @@ func (tickets *tickets) DecodeSessionToken(token string) (st *SessionToken) {
func (tickets *tickets) FakeSessionToken(userid string) *SessionToken { func (tickets *tickets) FakeSessionToken(userid string) *SessionToken {
st := &SessionToken{} st := &SessionToken{}
st.Sid = fmt.Sprintf("fake-%s", NewRandomString(27)) st.Sid = fmt.Sprintf("fake-%s", randomstring.NewRandomString(27))
st.Id, _ = tickets.Encode("id", st.Sid) st.Id, _ = tickets.Encode("id", st.Sid)
st.Userid = userid st.Userid = userid
log.Println("Created new fake session id", st.Id) log.Println("Created new fake session id", st.Id)

7
src/app/spreed-webrtc-server/tokenprovider.go → go/channelling/tokenprovider.go

@ -19,7 +19,7 @@
* *
*/ */
package main package channelling
import ( import (
"encoding/csv" "encoding/csv"
@ -47,11 +47,11 @@ func (tf *TokenFile) ReloadIfModified() error {
tf.Info = info tf.Info = info
tf.Reload() tf.Reload()
} }
return nil return nil
} }
func reloadRokens(tf *TokenFile) { func reloadRokens(tf *TokenFile) {
r, err := os.Open(tf.Path) r, err := os.Open(tf.Path)
if err != nil { if err != nil {
panic(err) panic(err)
@ -71,11 +71,9 @@ func reloadRokens(tf *TokenFile) {
for _, record := range records { for _, record := range records {
tf.Tokens[strings.ToLower(record[0])] = true tf.Tokens[strings.ToLower(record[0])] = true
} }
} }
func TokenFileProvider(filename string) TokenProvider { func TokenFileProvider(filename string) TokenProvider {
tf := &TokenFile{Path: filename} tf := &TokenFile{Path: filename}
tf.Reload = func() { reloadRokens(tf) } tf.Reload = func() { reloadRokens(tf) }
return func(token string) string { return func(token string) string {
@ -86,5 +84,4 @@ func TokenFileProvider(filename string) TokenProvider {
} }
return token return token
} }
} }

26
go/channelling/turndata.go

@ -0,0 +1,26 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
type TurnDataCreator interface {
CreateTurnData(*Session) *DataTurn
}

29
go/channelling/unicaster.go

@ -0,0 +1,29 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package channelling
type Unicaster interface {
SessionStore
OnConnect(*Client, *Session)
OnDisconnect(*Client, *Session)
Unicast(to string, outgoing *DataOutgoing)
}

12
src/app/spreed-webrtc-server/user.go → go/channelling/user.go

@ -19,7 +19,7 @@
* *
*/ */
package main package channelling
import ( import (
"log" "log"
@ -34,13 +34,12 @@ type User struct {
} }
func NewUser(id string) *User { func NewUser(id string) *User {
user := &User{ user := &User{
Id: id, Id: id,
sessionTable: make(map[string]*Session), sessionTable: make(map[string]*Session),
} }
return user
return user
} }
// AddSession adds a session to the session table and returns true if // AddSession adds a session to the session table and returns true if
@ -54,6 +53,7 @@ func (u *User) AddSession(s *Session) bool {
first = true first = true
} }
u.mutex.Unlock() u.mutex.Unlock()
return first return first
} }
@ -68,12 +68,14 @@ func (u *User) RemoveSession(sessionID string) bool {
last = true last = true
} }
u.mutex.Unlock() u.mutex.Unlock()
return last return last
} }
func (u *User) Data() *DataUser { func (u *User) Data() *DataUser {
u.mutex.RLock() u.mutex.RLock()
defer u.mutex.RUnlock() defer u.mutex.RUnlock()
return &DataUser{ return &DataUser{
Id: u.Id, Id: u.Id,
Sessions: len(u.sessionTable), Sessions: len(u.sessionTable),
@ -81,7 +83,6 @@ func (u *User) Data() *DataUser {
} }
func (u *User) SubscribeSessions(from *Session) []*DataSession { func (u *User) SubscribeSessions(from *Session) []*DataSession {
sessions := make([]*DataSession, 0, len(u.sessionTable)) sessions := make([]*DataSession, 0, len(u.sessionTable))
u.mutex.RLock() u.mutex.RLock()
defer u.mutex.RUnlock() defer u.mutex.RUnlock()
@ -91,8 +92,8 @@ func (u *User) SubscribeSessions(from *Session) []*DataSession {
sessions = append(sessions, session.Data()) sessions = append(sessions, session.Data())
} }
sort.Sort(ByPrioAndStamp(sessions)) sort.Sort(ByPrioAndStamp(sessions))
return sessions
return sessions
} }
type ByPrioAndStamp []*DataSession type ByPrioAndStamp []*DataSession
@ -112,5 +113,6 @@ func (a ByPrioAndStamp) Less(i, j int) bool {
if a[i].Prio == a[j].Prio { if a[i].Prio == a[j].Prio {
return a[i].stamp < a[j].stamp return a[i].stamp < a[j].stamp
} }
return false return false
} }

2
src/app/spreed-webrtc-server/random.go → go/randomstring/randomstring.go

@ -19,7 +19,7 @@
* *
*/ */
package main package randomstring
import ( import (
"crypto/rand" "crypto/rand"

348
src/app/spreed-webrtc-server/channelling_api.go

@ -1,348 +0,0 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"log"
"time"
)
const (
maxConferenceSize = 100
apiVersion = 1.4 // Keep this in sync with CHANNELING-API docs.Hand
)
type ChannellingAPI interface {
OnConnect(Client, *Session) (interface{}, error)
OnDisconnect(Client, *Session)
OnIncoming(Sender, *Session, *DataIncoming) (interface{}, error)
}
type channellingAPI struct {
*Config
RoomStatusManager
SessionEncoder
SessionManager
StatsCounter
ContactManager
TurnDataCreator
Unicaster
BusManager
}
// NewChannellingAPI creates and initializes a new ChannellingAPI using
// various other services for initialization. It is intended to handle
// incoming and outgoing channeling API events from clients.
func NewChannellingAPI(config *Config, roomStatus RoomStatusManager, sessionEncoder SessionEncoder, sessionManager SessionManager, statsCounter StatsCounter, contactManager ContactManager, turnDataCreator TurnDataCreator, unicaster Unicaster, busManager BusManager) ChannellingAPI {
return &channellingAPI{
config,
roomStatus,
sessionEncoder,
sessionManager,
statsCounter,
contactManager,
turnDataCreator,
unicaster,
busManager,
}
}
func (api *channellingAPI) OnConnect(client Client, session *Session) (interface{}, error) {
api.Unicaster.OnConnect(client, session)
self, err := api.HandleSelf(session)
if err == nil {
api.Trigger(BusManagerConnect, session.Id, "", nil)
}
return self, err
}
func (api *channellingAPI) OnDisconnect(client Client, session *Session) {
api.Unicaster.OnDisconnect(client, session)
api.Trigger(BusManagerDisconnect, session.Id, "", nil)
}
func (api *channellingAPI) OnIncoming(sender Sender, session *Session, msg *DataIncoming) (interface{}, error) {
switch msg.Type {
case "Self":
return api.HandleSelf(session)
case "Hello":
if msg.Hello == nil {
return nil, NewDataError("bad_request", "message did not contain Hello")
}
return api.HandleHello(session, msg.Hello, sender)
case "Offer":
if msg.Offer == nil || msg.Offer.Offer == nil {
log.Println("Received invalid offer message.", msg)
break
}
if _, ok := msg.Offer.Offer["_token"]; !ok {
// Trigger offer event when offer has no token, so this is
// not triggered for peerxfer and peerscreenshare offers.
api.Trigger(BusManagerOffer, session.Id, msg.Offer.To, nil)
}
session.Unicast(msg.Offer.To, msg.Offer)
case "Candidate":
if msg.Candidate == nil || msg.Candidate.Candidate == nil {
log.Println("Received invalid candidate message.", msg)
break
}
session.Unicast(msg.Candidate.To, msg.Candidate)
case "Answer":
if msg.Answer == nil || msg.Answer.Answer == nil {
log.Println("Received invalid answer message.", msg)
break
}
if _, ok := msg.Answer.Answer["_token"]; !ok {
// Trigger answer event when answer has no token. so this is
// not triggered for peerxfer and peerscreenshare answers.
api.Trigger(BusManagerAnswer, session.Id, msg.Answer.To, nil)
}
session.Unicast(msg.Answer.To, msg.Answer)
case "Users":
return api.HandleUsers(session)
case "Authentication":
if msg.Authentication == nil || msg.Authentication.Authentication == nil {
return nil, NewDataError("bad_request", "message did not contain Authentication")
}
return api.HandleAuthentication(session, msg.Authentication.Authentication)
case "Bye":
if msg.Bye == nil {
log.Println("Received invalid bye message.", msg)
break
}
api.Trigger(BusManagerBye, session.Id, msg.Bye.To, nil)
session.Unicast(msg.Bye.To, msg.Bye)
case "Status":
if msg.Status == nil {
log.Println("Received invalid status message.", msg)
break
}
//log.Println("Status", msg.Status)
session.Update(&SessionUpdate{Types: []string{"Status"}, Status: msg.Status.Status})
session.BroadcastStatus()
case "Chat":
if msg.Chat == nil || msg.Chat.Chat == nil {
log.Println("Received invalid chat message.", msg)
break
}
api.HandleChat(session, msg.Chat)
case "Conference":
if msg.Conference == nil {
log.Println("Received invalid conference message.", msg)
break
}
api.HandleConference(session, msg.Conference)
case "Alive":
return msg.Alive, nil
case "Sessions":
if msg.Sessions == nil || msg.Sessions.Sessions == nil {
return nil, NewDataError("bad_request", "message did not contain Sessions")
}
return api.HandleSessions(session, msg.Sessions.Sessions)
case "Room":
if msg.Room == nil {
return nil, NewDataError("bad_request", "message did not contain Room")
}
return api.HandleRoom(session, msg.Room)
default:
log.Println("OnText unhandled message type", msg.Type)
}
return nil, nil
}
func (api *channellingAPI) HandleSelf(session *Session) (*DataSelf, error) {
token, err := api.EncodeSessionToken(session)
if err != nil {
log.Println("Error in OnRegister", err)
return nil, err
}
log.Println("Created new session token", len(token), token)
self := &DataSelf{
Type: "Self",
Id: session.Id,
Sid: session.Sid,
Userid: session.Userid(),
Suserid: api.EncodeSessionUserID(session),
Token: token,
Version: api.Version,
ApiVersion: apiVersion,
Turn: api.CreateTurnData(session),
Stun: api.StunURIs,
}
api.Trigger(BusManagerSession, session.Id, session.Userid(), nil)
return self, nil
}
func (api *channellingAPI) HandleHello(session *Session, hello *DataHello, sender Sender) (*DataWelcome, error) {
// TODO(longsleep): Filter room id and user agent.
session.Update(&SessionUpdate{Types: []string{"Ua"}, Ua: hello.Ua})
// Compatibily for old clients.
roomName := hello.Name
if roomName == "" {
roomName = hello.Id
}
room, err := session.JoinRoom(roomName, hello.Type, hello.Credentials, sender)
if err != nil {
return nil, err
}
return &DataWelcome{
Type: "Welcome",
Room: room,
Users: api.RoomUsers(session),
}, nil
}
func (api *channellingAPI) HandleUsers(session *Session) (sessions *DataSessions, err error) {
if session.Hello {
sessions = &DataSessions{Type: "Users", Users: api.RoomUsers(session)}
} else {
err = NewDataError("not_in_room", "Cannot list users without a current room")
}
return
}
func (api *channellingAPI) HandleAuthentication(session *Session, st *SessionToken) (*DataSelf, error) {
if err := api.Authenticate(session, st, ""); err != nil {
log.Println("Authentication failed", err, st.Userid, st.Nonce)
return nil, err
}
log.Println("Authentication success", session.Userid())
self, err := api.HandleSelf(session)
if err == nil {
session.BroadcastStatus()
}
return self, err
}
func (api *channellingAPI) HandleChat(session *Session, chat *DataChat) {
// TODO(longsleep): Limit sent chat messages per incoming connection.
msg := chat.Chat
to := chat.To
if !msg.NoEcho {
session.Unicast(session.Id, chat)
}
msg.Time = time.Now().Format(time.RFC3339)
if to == "" {
// TODO(longsleep): Check if chat broadcast is allowed.
if session.Hello {
api.CountBroadcastChat()
session.Broadcast(chat)
}
} else {
if msg.Status != nil {
if msg.Status.ContactRequest != nil {
if !api.Config.WithModule("contacts") {
return
}
if err := api.contactrequestHandler(session, to, msg.Status.ContactRequest); err != nil {
log.Println("Ignoring invalid contact request.", err)
return
}
msg.Status.ContactRequest.Userid = session.Userid()
}
} else {
api.CountUnicastChat()
}
session.Unicast(to, chat)
if msg.Mid != "" {
// Send out delivery confirmation status chat message.
session.Unicast(session.Id, &DataChat{To: to, Type: "Chat", Chat: &DataChatMessage{Mid: msg.Mid, Status: &DataChatStatus{State: "sent"}}})
}
}
}
func (api *channellingAPI) HandleConference(session *Session, conference *DataConference) {
// Check conference maximum size.
if len(conference.Conference) > maxConferenceSize {
log.Println("Refusing to create conference above limit.", len(conference.Conference))
return
}
// Send conference update to anyone.
for _, id := range conference.Conference {
if id != session.Id {
session.Unicast(id, conference)
}
}
}
func (api *channellingAPI) HandleSessions(session *Session, sessions *DataSessionsRequest) (*DataSessions, error) {
switch sessions.Type {
case "contact":
if !api.Config.WithModule("contacts") {
return nil, NewDataError("contacts_not_enabled", "incoming contacts session request with contacts disabled")
}
userID, err := api.getContactID(session, sessions.Token)
if err != nil {
return nil, err
}
return &DataSessions{
Type: "Sessions",
Users: api.GetUserSessions(session, userID),
Sessions: sessions,
}, nil
case "session":
id, err := session.attestation.Decode(sessions.Token)
if err != nil {
return nil, NewDataError("bad_attestation", err.Error())
}
session, ok := api.GetSession(id)
if !ok {
return nil, NewDataError("no_such_session", "cannot retrieve session")
}
return &DataSessions{
Type: "Sessions",
Users: []*DataSession{session.Data()},
Sessions: sessions,
}, nil
default:
return nil, NewDataError("bad_request", "unknown sessions request type")
}
}
func (api *channellingAPI) HandleRoom(session *Session, room *DataRoom) (*DataRoom, error) {
room, err := api.UpdateRoom(session, room)
if err == nil {
session.Broadcast(room)
}
return room, err
}

56
src/app/spreed-webrtc-server/handler_image.go

@ -0,0 +1,56 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"net/http"
"strconv"
"time"
"github.com/strukturag/spreed-webrtc/go/channelling"
"github.com/gorilla/mux"
)
func makeImageHandler(buddyImages channelling.ImageCache, expires time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
image := buddyImages.Get(vars["imageid"])
if image == nil {
http.Error(w, "Unknown image", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", image.MimeType())
w.Header().Set("ETag", image.LastChangeID())
age := time.Now().Sub(image.LastChange())
if age >= time.Second {
w.Header().Set("Age", strconv.Itoa(int(age.Seconds())))
}
if expires >= time.Second {
w.Header().Set("Expires", time.Now().Add(expires).Format(time.RFC1123))
w.Header().Set("Cache-Control", "public, no-transform, max-age="+strconv.Itoa(int(expires.Seconds())))
}
http.ServeContent(w, r, "", image.LastChange(), image.Reader())
}
}

30
src/app/spreed-webrtc-server/handler_main.go

@ -0,0 +1,30 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"net/http"
)
func mainHandler(w http.ResponseWriter, r *http.Request) {
handleRoomView("", w, r)
}

91
src/app/spreed-webrtc-server/handler_room.go

@ -0,0 +1,91 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"net/http"
"github.com/strukturag/spreed-webrtc/go/channelling"
"github.com/gorilla/mux"
)
func roomHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
handleRoomView(vars["room"], w, r)
}
func handleRoomView(room string, w http.ResponseWriter, r *http.Request) {
var err error
w.Header().Set("Content-Type", "text/html; charset=UTF-8")
w.Header().Set("Expires", "-1")
w.Header().Set("Cache-Control", "private, max-age=0")
csp := false
if config.ContentSecurityPolicy != "" {
w.Header().Set("Content-Security-Policy", config.ContentSecurityPolicy)
csp = true
}
if config.ContentSecurityPolicyReportOnly != "" {
w.Header().Set("Content-Security-Policy-Report-Only", config.ContentSecurityPolicyReportOnly)
csp = true
}
scheme := "http"
// Detect if the request was made with SSL.
ssl := r.TLS != nil
proto, ok := r.Header["X-Forwarded-Proto"]
if ok {
ssl = proto[0] == "https"
scheme = "https"
}
// Get languages from request.
langs := getRequestLanguages(r, []string{})
if len(langs) == 0 {
langs = append(langs, "en")
}
// Prepare context to deliver to HTML..
context := &channelling.Context{Cfg: config, App: "main", Host: r.Host, Scheme: scheme, Ssl: ssl, Csp: csp, Languages: langs, Room: room}
// Get URL parameters.
r.ParseForm()
// Check if incoming request is a crawler which supports AJAX crawling.
// See https://developers.google.com/webmasters/ajax-crawling/docs/getting-started for details.
if _, ok := r.Form["_escaped_fragment_"]; ok {
// Render crawlerPage template..
err = templates.ExecuteTemplate(w, "crawlerPage", &context)
} else {
// Render mainPage template.
err = templates.ExecuteTemplate(w, "mainPage", &context)
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}

79
src/app/spreed-webrtc-server/handler_sandbox.go

@ -0,0 +1,79 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"fmt"
"net/http"
"net/url"
"github.com/strukturag/spreed-webrtc/go/channelling"
"github.com/gorilla/mux"
)
func sandboxHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
// NOTE(longsleep): origin_scheme is window.location.protocol (eg. https:, http:).
originURL, err := url.Parse(fmt.Sprintf("%s//%s", vars["origin_scheme"], vars["origin_host"]))
if err != nil || originURL.Scheme == "" || originURL.Host == "" {
http.Error(w, "Invalid origin path", http.StatusBadRequest)
return
}
origin := fmt.Sprintf("%s://%s", originURL.Scheme, originURL.Host)
handleSandboxView(vars["sandbox"], origin, w, r)
}
func handleSandboxView(sandbox string, origin string, w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=UTF-8")
w.Header().Set("Expires", "-1")
w.Header().Set("Cache-Control", "private, max-age=0")
sandboxTemplateName := fmt.Sprintf("%s_sandbox.html", sandbox)
// Prepare context to deliver to HTML..
if t := templates.Lookup(sandboxTemplateName); t != nil {
// CSP support for sandboxes.
var csp string
switch sandbox {
case "odfcanvas":
csp = fmt.Sprintf("default-src 'none'; script-src %s; img-src data: blob:; style-src 'unsafe-inline'", origin)
case "pdfcanvas":
csp = fmt.Sprintf("default-src 'none'; script-src %s 'unsafe-eval'; img-src 'self' data: blob:; style-src 'unsafe-inline'", origin)
default:
csp = "default-src 'none'"
}
w.Header().Set("Content-Security-Policy", csp)
// Prepare context to deliver to HTML..
context := &channelling.Context{Cfg: config, Origin: origin, Csp: true}
err := t.Execute(w, &context)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
} else {
http.Error(w, "404 Unknown Sandbox", http.StatusNotFound)
}
}

59
src/app/spreed-webrtc-server/handler_wellknown.go

@ -0,0 +1,59 @@
/*
* Spreed WebRTC.
* Copyright (C) 2013-2015 struktur AG
*
* This file is part of Spreed WebRTC.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/
package main
import (
"encoding/json"
"net/http"
"net/url"
"strings"
)
func wellKnownHandler(w http.ResponseWriter, r *http.Request) {
// Detect if the request was made with SSL.
ssl := r.TLS != nil
scheme := "http"
proto, ok := r.Header["X-Forwarded-Proto"]
if ok {
ssl = proto[0] == "https"
}
if ssl {
scheme = "https"
}
// Construct our URL.
url := url.URL{
Scheme: scheme,
Host: r.Host,
Path: strings.TrimSuffix(config.B, "/"),
}
doc := &map[string]string{
"spreed-webrtc_endpoint": url.String(),
}
data, err := json.MarshalIndent(doc, "", " ")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
w.Header().Set("Content-Type", "application/json")
w.Write(data)
}

12
src/app/spreed-webrtc-server/ws.go → src/app/spreed-webrtc-server/handler_ws.go

@ -25,6 +25,8 @@ import (
"log" "log"
"net/http" "net/http"
"github.com/strukturag/spreed-webrtc/go/channelling"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
@ -50,7 +52,7 @@ var (
} }
) )
func makeWSHandler(connectionCounter ConnectionCounter, sessionManager SessionManager, codec Codec, channellingAPI ChannellingAPI) http.HandlerFunc { func makeWSHandler(connectionCounter channelling.ConnectionCounter, sessionManager channelling.SessionManager, codec channelling.Codec, channellingAPI channelling.ChannellingAPI) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) { return func(w http.ResponseWriter, r *http.Request) {
// Validate incoming request. // Validate incoming request.
if r.Method != "GET" { if r.Method != "GET" {
@ -69,11 +71,11 @@ func makeWSHandler(connectionCounter ConnectionCounter, sessionManager SessionMa
// Create a new connection instance. // Create a new connection instance.
session := sessionManager.CreateSession(r) session := sessionManager.CreateSession(r)
client := NewClient(codec, channellingAPI, session) client := channelling.NewClient(codec, channellingAPI, session)
conn := NewConnection(connectionCounter.CountConnection(), ws, client) conn := channelling.NewConnection(connectionCounter.CountConnection(), ws, client)
// Start pumps (readPump blocks). // Start pumps (readPump blocks).
go conn.writePump() go conn.WritePump()
conn.readPump() conn.ReadPump()
} }
} }

259
src/app/spreed-webrtc-server/main.go

@ -22,230 +22,39 @@
package main package main
import ( import (
"bytes"
"crypto/rand" "crypto/rand"
"encoding/hex" "encoding/hex"
"encoding/json"
"flag" "flag"
"fmt" "fmt"
"github.com/gorilla/mux"
"github.com/strukturag/goacceptlanguageparser"
"github.com/strukturag/httputils"
"github.com/strukturag/phoenix"
"github.com/strukturag/sloth"
"html/template" "html/template"
"log" "log"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"net/url"
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
goruntime "runtime" goruntime "runtime"
"strconv"
"strings" "strings"
"syscall" "syscall"
"time" "time"
"github.com/strukturag/spreed-webrtc/go/channelling"
"github.com/strukturag/spreed-webrtc/go/channelling/api"
"github.com/strukturag/spreed-webrtc/go/channelling/server"
"github.com/gorilla/mux"
"github.com/strukturag/httputils"
"github.com/strukturag/phoenix"
"github.com/strukturag/sloth"
) )
var version = "unreleased" var version = "unreleased"
var defaultConfig = "./server.conf" var defaultConfig = "./server.conf"
var templates *template.Template var templates *template.Template
var config *Config var config *channelling.Config
// Helper to retrieve languages from request.
func getRequestLanguages(r *http.Request, supportedLanguages []string) []string {
acceptLanguageHeader, ok := r.Header["Accept-Language"]
var langs []string
if ok {
langs = goacceptlanguageparser.ParseAcceptLanguage(acceptLanguageHeader[0], supportedLanguages)
}
return langs
}
func mainHandler(w http.ResponseWriter, r *http.Request) {
handleRoomView("", w, r)
}
func roomHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
handleRoomView(vars["room"], w, r)
}
func sandboxHandler(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
// NOTE(longsleep): origin_scheme is window.location.protocol (eg. https:, http:).
originURL, err := url.Parse(fmt.Sprintf("%s//%s", vars["origin_scheme"], vars["origin_host"]))
if err != nil || originURL.Scheme == "" || originURL.Host == "" {
http.Error(w, "Invalid origin path", http.StatusBadRequest)
return
}
origin := fmt.Sprintf("%s://%s", originURL.Scheme, originURL.Host)
handleSandboxView(vars["sandbox"], origin, w, r)
}
func wellKnownHandler(w http.ResponseWriter, r *http.Request) {
// Detect if the request was made with SSL.
ssl := r.TLS != nil
scheme := "http"
proto, ok := r.Header["X-Forwarded-Proto"]
if ok {
ssl = proto[0] == "https"
}
if ssl {
scheme = "https"
}
// Construct our URL.
url := url.URL{
Scheme: scheme,
Host: r.Host,
Path: strings.TrimSuffix(config.B, "/"),
}
doc := &map[string]string{
"spreed-webrtc_endpoint": url.String(),
}
data, err := json.MarshalIndent(doc, "", " ")
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
w.Header().Set("Content-Type", "application/json")
w.Write(data)
}
func makeImageHandler(buddyImages ImageCache, expires time.Duration) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
image := buddyImages.Get(vars["imageid"])
if image == nil {
http.Error(w, "Unknown image", http.StatusNotFound)
return
}
w.Header().Set("Content-Type", image.mimetype)
w.Header().Set("ETag", image.lastChangeId)
age := time.Now().Sub(image.lastChange)
if age >= time.Second {
w.Header().Set("Age", strconv.Itoa(int(age.Seconds())))
}
if expires >= time.Second {
w.Header().Set("Expires", time.Now().Add(expires).Format(time.RFC1123))
w.Header().Set("Cache-Control", "public, no-transform, max-age="+strconv.Itoa(int(expires.Seconds())))
}
http.ServeContent(w, r, "", image.lastChange, bytes.NewReader(image.data))
}
}
func handleRoomView(room string, w http.ResponseWriter, r *http.Request) {
var err error
w.Header().Set("Content-Type", "text/html; charset=UTF-8")
w.Header().Set("Expires", "-1")
w.Header().Set("Cache-Control", "private, max-age=0")
csp := false
if config.contentSecurityPolicy != "" {
w.Header().Set("Content-Security-Policy", config.contentSecurityPolicy)
csp = true
}
if config.contentSecurityPolicyReportOnly != "" {
w.Header().Set("Content-Security-Policy-Report-Only", config.contentSecurityPolicyReportOnly)
csp = true
}
scheme := "http"
// Detect if the request was made with SSL.
ssl := r.TLS != nil
proto, ok := r.Header["X-Forwarded-Proto"]
if ok {
ssl = proto[0] == "https"
scheme = "https"
}
// Get languages from request.
langs := getRequestLanguages(r, []string{})
if len(langs) == 0 {
langs = append(langs, "en")
}
// Prepare context to deliver to HTML..
context := &Context{Cfg: config, App: "main", Host: r.Host, Scheme: scheme, Ssl: ssl, Csp: csp, Languages: langs, Room: room}
// Get URL parameters.
r.ParseForm()
// Check if incoming request is a crawler which supports AJAX crawling.
// See https://developers.google.com/webmasters/ajax-crawling/docs/getting-started for details.
if _, ok := r.Form["_escaped_fragment_"]; ok {
// Render crawlerPage template..
err = templates.ExecuteTemplate(w, "crawlerPage", &context)
} else {
// Render mainPage template.
err = templates.ExecuteTemplate(w, "mainPage", &context)
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
func handleSandboxView(sandbox string, origin string, w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "text/html; charset=UTF-8")
w.Header().Set("Expires", "-1")
w.Header().Set("Cache-Control", "private, max-age=0")
sandboxTemplateName := fmt.Sprintf("%s_sandbox.html", sandbox)
// Prepare context to deliver to HTML..
if t := templates.Lookup(sandboxTemplateName); t != nil {
// CSP support for sandboxes.
var csp string
switch sandbox {
case "odfcanvas":
csp = fmt.Sprintf("default-src 'none'; script-src %s; img-src data: blob:; style-src 'unsafe-inline'", origin)
case "pdfcanvas":
csp = fmt.Sprintf("default-src 'none'; script-src %s 'unsafe-eval'; img-src 'self' data: blob:; style-src 'unsafe-inline'", origin)
default:
csp = "default-src 'none'"
}
w.Header().Set("Content-Security-Policy", csp)
// Prepare context to deliver to HTML..
context := &Context{Cfg: config, Origin: origin, Csp: true}
err := t.Execute(w, &context)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
} else {
http.Error(w, "404 Unknown Sandbox", http.StatusNotFound)
}
}
func runner(runtime phoenix.Runtime) error { func runner(runtime phoenix.Runtime) error {
log.SetFlags(log.LstdFlags | log.Lmicroseconds) log.SetFlags(log.LstdFlags | log.Lmicroseconds)
rootFolder, err := runtime.GetString("http", "root") rootFolder, err := runtime.GetString("http", "root")
@ -333,10 +142,10 @@ func runner(runtime phoenix.Runtime) error {
} }
} }
var tokenProvider TokenProvider var tokenProvider channelling.TokenProvider
if tokenFile != "" { if tokenFile != "" {
log.Printf("Using token authorization from %s\n", tokenFile) log.Printf("Using token authorization from %s\n", tokenFile)
tokenProvider = TokenFileProvider(tokenFile) tokenProvider = channelling.TokenFileProvider(tokenFile)
} }
// Nats pub/sub supports. // Nats pub/sub supports.
@ -344,18 +153,18 @@ func runner(runtime phoenix.Runtime) error {
natsChannellingTriggerSubject, _ := runtime.GetString("nats", "channelling_trigger_subject") natsChannellingTriggerSubject, _ := runtime.GetString("nats", "channelling_trigger_subject")
if natsURL, err := runtime.GetString("nats", "url"); err == nil { if natsURL, err := runtime.GetString("nats", "url"); err == nil {
if natsURL != "" { if natsURL != "" {
DefaultNatsURL = natsURL channelling.DefaultNatsURL = natsURL
} }
} }
if natsEstablishTimeout, err := runtime.GetInt("nats", "establishTimeout"); err == nil { if natsEstablishTimeout, err := runtime.GetInt("nats", "establishTimeout"); err == nil {
if natsEstablishTimeout != 0 { if natsEstablishTimeout != 0 {
DefaultNatsEstablishTimeout = time.Duration(natsEstablishTimeout) * time.Second channelling.DefaultNatsEstablishTimeout = time.Duration(natsEstablishTimeout) * time.Second
} }
} }
natsClientId, _ := runtime.GetString("nats", "client_id") natsClientId, _ := runtime.GetString("nats", "client_id")
// Load remaining configuration items. // Load remaining configuration items.
config = NewConfig(runtime, tokenProvider != nil) config = server.NewConfig(runtime, tokenProvider != nil)
// Load templates. // Load templates.
templates = template.New("") templates = template.New("")
@ -448,15 +257,15 @@ func runner(runtime phoenix.Runtime) error {
} }
// Prepare services. // Prepare services.
buddyImages := NewImageCache() buddyImages := channelling.NewImageCache()
codec := NewCodec(incomingCodecLimit) codec := channelling.NewCodec(incomingCodecLimit)
roomManager := NewRoomManager(config, codec) roomManager := channelling.NewRoomManager(config, codec)
hub := NewHub(config, sessionSecret, encryptionSecret, turnSecret, codec) hub := channelling.NewHub(config, sessionSecret, encryptionSecret, turnSecret, codec)
tickets := NewTickets(sessionSecret, encryptionSecret, computedRealm) tickets := channelling.NewTickets(sessionSecret, encryptionSecret, computedRealm)
sessionManager := NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret) sessionManager := channelling.NewSessionManager(config, tickets, hub, roomManager, roomManager, buddyImages, sessionSecret)
statsManager := NewStatsManager(hub, roomManager, sessionManager) statsManager := channelling.NewStatsManager(hub, roomManager, sessionManager)
busManager := NewBusManager(natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject) busManager := channelling.NewBusManager(natsClientId, natsChannellingTrigger, natsChannellingTriggerSubject)
channellingAPI := NewChannellingAPI(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager) channellingAPI := api.New(config, roomManager, tickets, sessionManager, statsManager, hub, hub, hub, busManager)
// Add handlers. // Add handlers.
r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler)) r.HandleFunc("/", httputils.MakeGzipHandler(mainHandler))
@ -473,22 +282,22 @@ func runner(runtime phoenix.Runtime) error {
// Sandbox handler. // Sandbox handler.
r.HandleFunc("/sandbox/{origin_scheme}/{origin_host}/{sandbox}.html", httputils.MakeGzipHandler(sandboxHandler)) r.HandleFunc("/sandbox/{origin_scheme}/{origin_host}/{sandbox}.html", httputils.MakeGzipHandler(sandboxHandler))
// Add API end points. // Add RESTful API end points.
api := sloth.NewAPI() rest := sloth.NewAPI()
api.SetMux(r.PathPrefix("/api/v1/").Subrouter()) rest.SetMux(r.PathPrefix("/api/v1/").Subrouter())
api.AddResource(&Rooms{}, "/rooms") rest.AddResource(&server.Rooms{}, "/rooms")
api.AddResource(config, "/config") rest.AddResource(config, "/config")
api.AddResourceWithWrapper(&Tokens{tokenProvider}, httputils.MakeGzipHandler, "/tokens") rest.AddResourceWithWrapper(&server.Tokens{tokenProvider}, httputils.MakeGzipHandler, "/tokens")
if config.UsersEnabled { if config.UsersEnabled {
// Create Users handler. // Create Users handler.
users := NewUsers(hub, tickets, sessionManager, config.UsersMode, serverRealm, runtime) users := server.NewUsers(hub, tickets, sessionManager, config.UsersMode, serverRealm, runtime)
api.AddResource(&Sessions{tickets, hub, users}, "/sessions/{id}/") rest.AddResource(&server.Sessions{tickets, hub, users}, "/sessions/{id}/")
if config.UsersAllowRegistration { if config.UsersAllowRegistration {
api.AddResource(users, "/users") rest.AddResource(users, "/users")
} }
} }
if statsEnabled { if statsEnabled {
api.AddResourceWithWrapper(&Stats{statsManager}, httputils.MakeGzipHandler, "/stats") rest.AddResourceWithWrapper(&server.Stats{statsManager}, httputils.MakeGzipHandler, "/stats")
log.Println("Stats are enabled!") log.Println("Stats are enabled!")
} }

17
src/app/spreed-webrtc-server/utils.go

@ -0,0 +1,17 @@
package main
import (
"net/http"
"github.com/strukturag/goacceptlanguageparser"
)
// Helper to retrieve languages from request.
func getRequestLanguages(r *http.Request, supportedLanguages []string) []string {
acceptLanguageHeader, ok := r.Header["Accept-Language"]
var langs []string
if ok {
langs = goacceptlanguageparser.ParseAcceptLanguage(acceptLanguageHeader[0], supportedLanguages)
}
return langs
}
Loading…
Cancel
Save