Browse Source

Removed large loops over connections and users and implemented per room processing.

pull/3/head
Simon Eisenmann 12 years ago
parent
commit
48d21a25db
  1. 115
      src/app/spreed-speakfreely-server/hub.go
  2. 129
      src/app/spreed-speakfreely-server/roomworker.go
  3. 45
      src/app/spreed-speakfreely-server/server.go
  4. 21
      src/app/spreed-speakfreely-server/user.go

115
src/app/spreed-speakfreely-server/hub.go

@ -138,6 +138,32 @@ func (h *Hub) GetRoom(id string) *RoomWorker {
} }
func (h *Hub) GetGlobalConnections() []*Connection {
if h.config.globalRoomid == "" {
return make([]*Connection, 0)
}
h.mutex.RLock()
if room, ok := h.roomTable[h.config.globalRoomid]; ok {
h.mutex.RUnlock()
return room.GetConnections()
} else {
h.mutex.RUnlock()
}
return make([]*Connection, 0)
}
func (h *Hub) RunForAllRooms(f func(room *RoomWorker)) {
h.mutex.RLock()
for _, room := range h.roomTable {
f(room)
}
h.mutex.RUnlock()
}
func (h *Hub) isGlobalRoomid(id string) bool { func (h *Hub) isGlobalRoomid(id string) bool {
return id != "" && (id == h.config.globalRoomid) return id != "" && (id == h.config.globalRoomid)
@ -195,65 +221,6 @@ func (h *Hub) unregisterHandler(c *Connection) {
} }
func (h *Hub) broadcastHandler(m *MessageRequest) {
h.mutex.RLock()
//fmt.Println("in h.broadcast", h.userTable, h.connectionTable)
roomid := m.Id
users := make([]string, len(h.userTable))
i := 0
// TODO(longsleep): Keep a userTable per room to avoid looping all users every time.
for id, u := range h.userTable {
if id == m.From || (u.Roomid != roomid && !h.isGlobalRoomid(roomid)) {
// Skip self and users not in the correct room.
continue
}
users[i] = id
i++
}
h.mutex.RUnlock()
room := h.GetRoom(roomid)
worker := func() {
for _, id := range users {
h.mutex.RLock()
u, ok := h.userTable[id]
if !ok {
// User gone.
h.mutex.RUnlock()
continue
}
ec, ok := h.connectionTable[id]
if !ok {
// Connection gone.
h.mutex.RUnlock()
continue
}
userRoomid := u.Roomid
//fmt.Println("in h.broadcast id", id, m.From, userRoomid, roomid)
//fmt.Println("broadcasting to", id, ec.Idx, userRoomid, roomid)
h.mutex.RUnlock()
if userRoomid != roomid && !h.isGlobalRoomid(roomid) {
// Skip other rooms.
continue
}
//fmt.Printf("%s\n", m.Message)
ec.send(m.Message)
}
}
// Run worker in room.
if !room.Run(worker) {
// This handles the case that the room was cleaned up while we retrieved.
room = h.GetRoom(roomid)
room.Run(worker)
}
}
func (h *Hub) unicastHandler(m *MessageRequest) { func (h *Hub) unicastHandler(m *MessageRequest) {
h.mutex.RLock() h.mutex.RLock()
@ -267,34 +234,6 @@ func (h *Hub) unicastHandler(m *MessageRequest) {
} }
func (h *Hub) usersHandler(c *Connection) {
h.mutex.RLock()
users := &DataUsers{Type: "Users", Index: 0, Batch: 0}
usersList := users.Users
roomid := c.User.Roomid
// TODO(longsleep): Keep per room userTable to avoid looping all users.
for id, u := range h.userTable {
if u.Roomid == roomid || h.isGlobalRoomid(u.Roomid) {
user := &DataUser{Type: "Online", Id: id, Ua: u.Ua, Status: u.Status, Rev: u.UpdateRev}
usersList = append(usersList, user)
if len(usersList) >= maxUsersLength {
log.Println("Limiting users response length in channel", roomid)
break
}
}
}
h.mutex.RUnlock()
users.Users = usersList
usersJson, err := json.Marshal(&DataOutgoing{From: c.Id, Data: users})
if err != nil {
log.Println("Users error while encoding JSON", err)
return
}
c.send(usersJson)
}
func (h *Hub) aliveHandler(c *Connection, alive *DataAlive) { func (h *Hub) aliveHandler(c *Connection, alive *DataAlive) {
aliveJson, err := json.Marshal(&DataOutgoing{From: c.Id, Data: alive}) aliveJson, err := json.Marshal(&DataOutgoing{From: c.Id, Data: alive})
@ -314,9 +253,7 @@ func (h *Hub) userupdateHandler(u *UserUpdate) uint64 {
h.mutex.RUnlock() h.mutex.RUnlock()
var rev uint64 var rev uint64
if ok { if ok {
h.mutex.Lock()
rev = user.Update(u) rev = user.Update(u)
h.mutex.Unlock()
} else { } else {
log.Printf("Update data for unknown user %s\n", u.Id) log.Printf("Update data for unknown user %s\n", u.Id)
} }

129
src/app/spreed-speakfreely-server/roomworker.go

@ -21,7 +21,9 @@
package main package main
import ( import (
"encoding/json"
"log" "log"
"sync"
"time" "time"
) )
@ -30,14 +32,23 @@ const (
roomExpiryDuration = 60 * time.Second roomExpiryDuration = 60 * time.Second
) )
type RoomConnectionUpdate struct {
Id string
Userid string
Status bool
Connection *Connection
}
type RoomWorker struct { type RoomWorker struct {
// References. // References.
h *Hub h *Hub
// Data handling. // Data handling.
workers chan (func()) workers chan (func())
expired chan (bool) expired chan (bool)
timer *time.Timer connections map[string]*Connection
timer *time.Timer
mutex sync.RWMutex
// Metadata. // Metadata.
Id string Id string
@ -53,10 +64,10 @@ func NewRoomWorker(h *Hub, id string) *RoomWorker {
} }
r.workers = make(chan func(), roomMaxWorkers) r.workers = make(chan func(), roomMaxWorkers)
r.expired = make(chan bool) r.expired = make(chan bool)
r.connections = make(map[string]*Connection)
// Create expire timer. // Create expire timer.
r.timer = time.AfterFunc(roomExpiryDuration, func() { r.timer = time.AfterFunc(roomExpiryDuration, func() {
log.Printf("Room worker not in use - cleaning up '%s'\n", r.Id)
r.expired <- true r.expired <- true
}) })
@ -69,18 +80,24 @@ func (r *RoomWorker) Start() {
// Main blocking worker. // Main blocking worker.
L: L:
for { for {
r.timer.Reset(roomExpiryDuration) r.timer.Reset(roomExpiryDuration)
select { select {
case <-r.expired:
//fmt.Println("Work room expired", r.Id)
break L
case w := <-r.workers: case w := <-r.workers:
//fmt.Println("Running worker", r.Id, w) //fmt.Println("Running worker", r.Id, w)
w() w()
case <-r.expired:
//fmt.Println("Work room expired", r.Id)
//fmt.Println("Work room expired", r.Id, len(r.connections))
r.mutex.RLock()
if len(r.connections) == 0 {
// Cleanup room when it is empty.
r.mutex.RUnlock()
log.Printf("Room worker not in use - cleaning up '%s'\n", r.Id)
break L
} else {
r.mutex.RUnlock()
}
} }
} }
r.timer.Stop() r.timer.Stop()
@ -89,6 +106,18 @@ L:
} }
func (r *RoomWorker) GetConnections() []*Connection {
r.mutex.RLock()
defer r.mutex.RUnlock()
connections := make([]*Connection, 0, len(r.connections))
for _, connection := range r.connections {
connections = append(connections, connection)
}
return connections
}
func (r *RoomWorker) Run(f func()) bool { func (r *RoomWorker) Run(f func()) bool {
select { select {
@ -100,3 +129,83 @@ func (r *RoomWorker) Run(f func()) bool {
} }
} }
func (r *RoomWorker) usersHandler(c *Connection) {
worker := func() {
users := &DataUsers{Type: "Users"}
ul := users.Users
appender := func(ec *Connection) bool {
user := ec.User.Data()
user.Type = "Online"
ul = append(ul, user)
if len(ul) > maxUsersLength {
log.Println("Limiting users response length in channel", r.Id)
return false
}
return true
}
r.mutex.RLock()
// Include connections in this room.
for _, ec := range r.connections {
if !appender(ec) {
break
}
}
r.mutex.RUnlock()
// Include connections to global room.
for _, ec := range c.h.GetGlobalConnections() {
if !appender(ec) {
break
}
}
users.Users = ul
usersJson, err := json.Marshal(&DataOutgoing{From: c.Id, Data: users})
if err != nil {
log.Println("Users error while encoding JSON", err)
return
}
c.send(usersJson)
}
r.Run(worker)
}
func (r *RoomWorker) broadcastHandler(m *MessageRequest) {
worker := func() {
r.mutex.RLock()
defer r.mutex.RUnlock()
for id, ec := range r.connections {
if id == m.From {
// Skip broadcast to self.
continue
}
//fmt.Printf("%s\n", m.Message)
ec.send(m.Message)
}
}
r.Run(worker)
}
func (r *RoomWorker) connectionHandler(rcu *RoomConnectionUpdate) {
worker := func() {
r.mutex.Lock()
defer r.mutex.Unlock()
if rcu.Status {
r.connections[rcu.Userid] = rcu.Connection
} else {
if _, ok := r.connections[rcu.Userid]; ok {
delete(r.connections, rcu.Userid)
}
}
}
r.Run(worker)
}

45
src/app/spreed-speakfreely-server/server.go

@ -46,6 +46,7 @@ func (s *Server) OnRegister(c *Connection) {
func (s *Server) OnUnregister(c *Connection) { func (s *Server) OnUnregister(c *Connection) {
//log.Println("OnUnregister", c.id) //log.Println("OnUnregister", c.id)
if c.Hello { if c.Hello {
s.UpdateRoomConnection(c, &RoomConnectionUpdate{Id: c.Roomid})
s.Broadcast(c, &DataUser{Type: "Left", Id: c.Id, Status: "hard"}) s.Broadcast(c, &DataUser{Type: "Left", Id: c.Id, Status: "hard"})
} else { } else {
//log.Println("Ingoring OnUnregister because of no Hello", c.Idx) //log.Println("Ingoring OnUnregister because of no Hello", c.Idx)
@ -72,11 +73,13 @@ func (s *Server) OnText(c *Connection, b []byte) {
s.UpdateUser(c, &UserUpdate{Types: []string{"Roomid", "Ua"}, Roomid: msg.Hello.Id, Ua: msg.Hello.Ua}) s.UpdateUser(c, &UserUpdate{Types: []string{"Roomid", "Ua"}, Roomid: msg.Hello.Id, Ua: msg.Hello.Ua})
if c.Hello && c.Roomid != msg.Hello.Id { if c.Hello && c.Roomid != msg.Hello.Id {
// Room changed. // Room changed.
s.UpdateRoomConnection(c, &RoomConnectionUpdate{Id: c.Roomid})
s.Broadcast(c, &DataUser{Type: "Left", Id: c.Id, Status: "soft"}) s.Broadcast(c, &DataUser{Type: "Left", Id: c.Id, Status: "soft"})
} }
c.Roomid = msg.Hello.Id c.Roomid = msg.Hello.Id
if c.h.config.defaultRoomEnabled || !c.h.isDefaultRoomid(c.Roomid) { if c.h.config.defaultRoomEnabled || !c.h.isDefaultRoomid(c.Roomid) {
c.Hello = true c.Hello = true
s.UpdateRoomConnection(c, &RoomConnectionUpdate{Id: c.Roomid, Status: true})
s.Broadcast(c, &DataUser{Type: "Joined", Id: c.Id, Ua: msg.Hello.Ua}) s.Broadcast(c, &DataUser{Type: "Joined", Id: c.Id, Ua: msg.Hello.Ua})
} else { } else {
c.Hello = false c.Hello = false
@ -155,6 +158,19 @@ func (s *Server) Unicast(c *Connection, to string, m interface{}) {
} }
func (s *Server) Alive(c *Connection, alive *DataAlive) {
c.h.aliveHandler(c, alive)
}
func (s *Server) UpdateUser(c *Connection, userupdate *UserUpdate) uint64 {
userupdate.Id = c.Id
return c.h.userupdateHandler(userupdate)
}
func (s *Server) Broadcast(c *Connection, m interface{}) { func (s *Server) Broadcast(c *Connection, m interface{}) {
b, err := json.Marshal(&DataOutgoing{From: c.Id, Data: m}) b, err := json.Marshal(&DataOutgoing{From: c.Id, Data: m})
@ -163,26 +179,31 @@ func (s *Server) Broadcast(c *Connection, m interface{}) {
return return
} }
var msg = &MessageRequest{From: c.Id, Message: b, Id: c.Roomid} if c.h.isGlobalRoomid(c.Roomid) {
c.h.broadcastHandler(msg) c.h.RunForAllRooms(func(room *RoomWorker) {
var msg = &MessageRequest{From: c.Id, Message: b, Id: room.Id}
room.broadcastHandler(msg)
})
} else {
var msg = &MessageRequest{From: c.Id, Message: b, Id: c.Roomid}
room := c.h.GetRoom(c.Roomid)
room.broadcastHandler(msg)
}
} }
func (s *Server) Users(c *Connection) { func (s *Server) Users(c *Connection) {
c.h.usersHandler(c) room := c.h.GetRoom(c.Roomid)
room.usersHandler(c)
}
func (s *Server) Alive(c *Connection, alive *DataAlive) {
c.h.aliveHandler(c, alive)
} }
func (s *Server) UpdateUser(c *Connection, userupdate *UserUpdate) uint64 { func (s *Server) UpdateRoomConnection(c *Connection, rcu *RoomConnectionUpdate) {
userupdate.Id = c.Id rcu.Userid = c.Id
return c.h.userupdateHandler(userupdate) rcu.Connection = c
room := c.h.GetRoom(c.Roomid)
room.connectionHandler(rcu)
} }

21
src/app/spreed-speakfreely-server/user.go

@ -20,7 +20,9 @@
*/ */
package main package main
import () import (
"sync"
)
type User struct { type User struct {
Id string Id string
@ -28,11 +30,14 @@ type User struct {
Ua string Ua string
UpdateRev uint64 UpdateRev uint64
Status interface{} Status interface{}
mutex sync.RWMutex
} }
func (u *User) Update(update *UserUpdate) uint64 { func (u *User) Update(update *UserUpdate) uint64 {
//user := reflect.ValueOf(&u).Elem() //user := reflect.ValueOf(&u).Elem()
u.mutex.Lock()
defer u.mutex.Unlock()
for _, key := range update.Types { for _, key := range update.Types {
@ -53,6 +58,20 @@ func (u *User) Update(update *UserUpdate) uint64 {
} }
func (u *User) Data() *DataUser {
u.mutex.RLock()
defer u.mutex.RUnlock()
return &DataUser{
Id: u.Id,
Ua: u.Ua,
Status: u.Status,
Rev: u.UpdateRev,
}
}
type UserUpdate struct { type UserUpdate struct {
Id string Id string
Types []string Types []string

Loading…
Cancel
Save