From 48d21a25db2a06c383bdcfb0cbd54b6ca8d16a4e Mon Sep 17 00:00:00 2001 From: Simon Eisenmann Date: Mon, 10 Mar 2014 16:22:26 +0100 Subject: [PATCH] Removed large loops over connections and users and implemented per room processing. --- src/app/spreed-speakfreely-server/hub.go | 115 ++++------------ .../spreed-speakfreely-server/roomworker.go | 129 ++++++++++++++++-- src/app/spreed-speakfreely-server/server.go | 45 ++++-- src/app/spreed-speakfreely-server/user.go | 21 ++- 4 files changed, 198 insertions(+), 112 deletions(-) diff --git a/src/app/spreed-speakfreely-server/hub.go b/src/app/spreed-speakfreely-server/hub.go index e29ab947..adf082c3 100644 --- a/src/app/spreed-speakfreely-server/hub.go +++ b/src/app/spreed-speakfreely-server/hub.go @@ -138,6 +138,32 @@ func (h *Hub) GetRoom(id string) *RoomWorker { } +func (h *Hub) GetGlobalConnections() []*Connection { + + if h.config.globalRoomid == "" { + return make([]*Connection, 0) + } + h.mutex.RLock() + if room, ok := h.roomTable[h.config.globalRoomid]; ok { + h.mutex.RUnlock() + return room.GetConnections() + } else { + h.mutex.RUnlock() + } + return make([]*Connection, 0) + +} + +func (h *Hub) RunForAllRooms(f func(room *RoomWorker)) { + + h.mutex.RLock() + for _, room := range h.roomTable { + f(room) + } + h.mutex.RUnlock() + +} + func (h *Hub) isGlobalRoomid(id string) bool { return id != "" && (id == h.config.globalRoomid) @@ -195,65 +221,6 @@ func (h *Hub) unregisterHandler(c *Connection) { } -func (h *Hub) broadcastHandler(m *MessageRequest) { - - h.mutex.RLock() - - //fmt.Println("in h.broadcast", h.userTable, h.connectionTable) - roomid := m.Id - users := make([]string, len(h.userTable)) - i := 0 - // TODO(longsleep): Keep a userTable per room to avoid looping all users every time. - for id, u := range h.userTable { - if id == m.From || (u.Roomid != roomid && !h.isGlobalRoomid(roomid)) { - // Skip self and users not in the correct room. - continue - } - users[i] = id - i++ - } - h.mutex.RUnlock() - - room := h.GetRoom(roomid) - worker := func() { - - for _, id := range users { - h.mutex.RLock() - u, ok := h.userTable[id] - if !ok { - // User gone. - h.mutex.RUnlock() - continue - } - ec, ok := h.connectionTable[id] - if !ok { - // Connection gone. - h.mutex.RUnlock() - continue - } - userRoomid := u.Roomid - //fmt.Println("in h.broadcast id", id, m.From, userRoomid, roomid) - //fmt.Println("broadcasting to", id, ec.Idx, userRoomid, roomid) - h.mutex.RUnlock() - if userRoomid != roomid && !h.isGlobalRoomid(roomid) { - // Skip other rooms. - continue - } - //fmt.Printf("%s\n", m.Message) - ec.send(m.Message) - } - - } - - // Run worker in room. - if !room.Run(worker) { - // This handles the case that the room was cleaned up while we retrieved. - room = h.GetRoom(roomid) - room.Run(worker) - } - -} - func (h *Hub) unicastHandler(m *MessageRequest) { h.mutex.RLock() @@ -267,34 +234,6 @@ func (h *Hub) unicastHandler(m *MessageRequest) { } -func (h *Hub) usersHandler(c *Connection) { - - h.mutex.RLock() - users := &DataUsers{Type: "Users", Index: 0, Batch: 0} - usersList := users.Users - roomid := c.User.Roomid - // TODO(longsleep): Keep per room userTable to avoid looping all users. - for id, u := range h.userTable { - if u.Roomid == roomid || h.isGlobalRoomid(u.Roomid) { - user := &DataUser{Type: "Online", Id: id, Ua: u.Ua, Status: u.Status, Rev: u.UpdateRev} - usersList = append(usersList, user) - if len(usersList) >= maxUsersLength { - log.Println("Limiting users response length in channel", roomid) - break - } - } - } - h.mutex.RUnlock() - users.Users = usersList - usersJson, err := json.Marshal(&DataOutgoing{From: c.Id, Data: users}) - if err != nil { - log.Println("Users error while encoding JSON", err) - return - } - c.send(usersJson) - -} - func (h *Hub) aliveHandler(c *Connection, alive *DataAlive) { aliveJson, err := json.Marshal(&DataOutgoing{From: c.Id, Data: alive}) @@ -314,9 +253,7 @@ func (h *Hub) userupdateHandler(u *UserUpdate) uint64 { h.mutex.RUnlock() var rev uint64 if ok { - h.mutex.Lock() rev = user.Update(u) - h.mutex.Unlock() } else { log.Printf("Update data for unknown user %s\n", u.Id) } diff --git a/src/app/spreed-speakfreely-server/roomworker.go b/src/app/spreed-speakfreely-server/roomworker.go index 139f8761..6aa7b4af 100644 --- a/src/app/spreed-speakfreely-server/roomworker.go +++ b/src/app/spreed-speakfreely-server/roomworker.go @@ -21,7 +21,9 @@ package main import ( + "encoding/json" "log" + "sync" "time" ) @@ -30,14 +32,23 @@ const ( roomExpiryDuration = 60 * time.Second ) +type RoomConnectionUpdate struct { + Id string + Userid string + Status bool + Connection *Connection +} + type RoomWorker struct { // References. h *Hub // Data handling. - workers chan (func()) - expired chan (bool) - timer *time.Timer + workers chan (func()) + expired chan (bool) + connections map[string]*Connection + timer *time.Timer + mutex sync.RWMutex // Metadata. Id string @@ -53,10 +64,10 @@ func NewRoomWorker(h *Hub, id string) *RoomWorker { } r.workers = make(chan func(), roomMaxWorkers) r.expired = make(chan bool) + r.connections = make(map[string]*Connection) // Create expire timer. r.timer = time.AfterFunc(roomExpiryDuration, func() { - log.Printf("Room worker not in use - cleaning up '%s'\n", r.Id) r.expired <- true }) @@ -69,18 +80,24 @@ func (r *RoomWorker) Start() { // Main blocking worker. L: for { - r.timer.Reset(roomExpiryDuration) - select { - case <-r.expired: - //fmt.Println("Work room expired", r.Id) - break L case w := <-r.workers: //fmt.Println("Running worker", r.Id, w) w() + case <-r.expired: + //fmt.Println("Work room expired", r.Id) + //fmt.Println("Work room expired", r.Id, len(r.connections)) + r.mutex.RLock() + if len(r.connections) == 0 { + // Cleanup room when it is empty. + r.mutex.RUnlock() + log.Printf("Room worker not in use - cleaning up '%s'\n", r.Id) + break L + } else { + r.mutex.RUnlock() + } } - } r.timer.Stop() @@ -89,6 +106,18 @@ L: } +func (r *RoomWorker) GetConnections() []*Connection { + + r.mutex.RLock() + defer r.mutex.RUnlock() + connections := make([]*Connection, 0, len(r.connections)) + for _, connection := range r.connections { + connections = append(connections, connection) + } + return connections + +} + func (r *RoomWorker) Run(f func()) bool { select { @@ -100,3 +129,83 @@ func (r *RoomWorker) Run(f func()) bool { } } + +func (r *RoomWorker) usersHandler(c *Connection) { + + worker := func() { + users := &DataUsers{Type: "Users"} + ul := users.Users + appender := func(ec *Connection) bool { + user := ec.User.Data() + user.Type = "Online" + ul = append(ul, user) + if len(ul) > maxUsersLength { + log.Println("Limiting users response length in channel", r.Id) + return false + } + return true + } + r.mutex.RLock() + // Include connections in this room. + for _, ec := range r.connections { + if !appender(ec) { + break + } + } + r.mutex.RUnlock() + // Include connections to global room. + for _, ec := range c.h.GetGlobalConnections() { + if !appender(ec) { + break + } + } + users.Users = ul + usersJson, err := json.Marshal(&DataOutgoing{From: c.Id, Data: users}) + if err != nil { + log.Println("Users error while encoding JSON", err) + return + } + c.send(usersJson) + + } + + r.Run(worker) + +} + +func (r *RoomWorker) broadcastHandler(m *MessageRequest) { + + worker := func() { + r.mutex.RLock() + defer r.mutex.RUnlock() + for id, ec := range r.connections { + if id == m.From { + // Skip broadcast to self. + continue + } + //fmt.Printf("%s\n", m.Message) + ec.send(m.Message) + } + } + + r.Run(worker) + +} + +func (r *RoomWorker) connectionHandler(rcu *RoomConnectionUpdate) { + + worker := func() { + r.mutex.Lock() + defer r.mutex.Unlock() + if rcu.Status { + r.connections[rcu.Userid] = rcu.Connection + } else { + if _, ok := r.connections[rcu.Userid]; ok { + delete(r.connections, rcu.Userid) + } + } + } + + r.Run(worker) + +} diff --git a/src/app/spreed-speakfreely-server/server.go b/src/app/spreed-speakfreely-server/server.go index 491e586b..8ffc2aa4 100644 --- a/src/app/spreed-speakfreely-server/server.go +++ b/src/app/spreed-speakfreely-server/server.go @@ -46,6 +46,7 @@ func (s *Server) OnRegister(c *Connection) { func (s *Server) OnUnregister(c *Connection) { //log.Println("OnUnregister", c.id) if c.Hello { + s.UpdateRoomConnection(c, &RoomConnectionUpdate{Id: c.Roomid}) s.Broadcast(c, &DataUser{Type: "Left", Id: c.Id, Status: "hard"}) } else { //log.Println("Ingoring OnUnregister because of no Hello", c.Idx) @@ -72,11 +73,13 @@ func (s *Server) OnText(c *Connection, b []byte) { s.UpdateUser(c, &UserUpdate{Types: []string{"Roomid", "Ua"}, Roomid: msg.Hello.Id, Ua: msg.Hello.Ua}) if c.Hello && c.Roomid != msg.Hello.Id { // Room changed. + s.UpdateRoomConnection(c, &RoomConnectionUpdate{Id: c.Roomid}) s.Broadcast(c, &DataUser{Type: "Left", Id: c.Id, Status: "soft"}) } c.Roomid = msg.Hello.Id if c.h.config.defaultRoomEnabled || !c.h.isDefaultRoomid(c.Roomid) { c.Hello = true + s.UpdateRoomConnection(c, &RoomConnectionUpdate{Id: c.Roomid, Status: true}) s.Broadcast(c, &DataUser{Type: "Joined", Id: c.Id, Ua: msg.Hello.Ua}) } else { c.Hello = false @@ -155,6 +158,19 @@ func (s *Server) Unicast(c *Connection, to string, m interface{}) { } +func (s *Server) Alive(c *Connection, alive *DataAlive) { + + c.h.aliveHandler(c, alive) + +} + +func (s *Server) UpdateUser(c *Connection, userupdate *UserUpdate) uint64 { + + userupdate.Id = c.Id + return c.h.userupdateHandler(userupdate) + +} + func (s *Server) Broadcast(c *Connection, m interface{}) { b, err := json.Marshal(&DataOutgoing{From: c.Id, Data: m}) @@ -163,26 +179,31 @@ func (s *Server) Broadcast(c *Connection, m interface{}) { return } - var msg = &MessageRequest{From: c.Id, Message: b, Id: c.Roomid} - c.h.broadcastHandler(msg) + if c.h.isGlobalRoomid(c.Roomid) { + c.h.RunForAllRooms(func(room *RoomWorker) { + var msg = &MessageRequest{From: c.Id, Message: b, Id: room.Id} + room.broadcastHandler(msg) + }) + } else { + var msg = &MessageRequest{From: c.Id, Message: b, Id: c.Roomid} + room := c.h.GetRoom(c.Roomid) + room.broadcastHandler(msg) + } } func (s *Server) Users(c *Connection) { - c.h.usersHandler(c) - -} - -func (s *Server) Alive(c *Connection, alive *DataAlive) { - - c.h.aliveHandler(c, alive) + room := c.h.GetRoom(c.Roomid) + room.usersHandler(c) } -func (s *Server) UpdateUser(c *Connection, userupdate *UserUpdate) uint64 { +func (s *Server) UpdateRoomConnection(c *Connection, rcu *RoomConnectionUpdate) { - userupdate.Id = c.Id - return c.h.userupdateHandler(userupdate) + rcu.Userid = c.Id + rcu.Connection = c + room := c.h.GetRoom(c.Roomid) + room.connectionHandler(rcu) } diff --git a/src/app/spreed-speakfreely-server/user.go b/src/app/spreed-speakfreely-server/user.go index 4c30c018..593ef7f8 100644 --- a/src/app/spreed-speakfreely-server/user.go +++ b/src/app/spreed-speakfreely-server/user.go @@ -20,7 +20,9 @@ */ package main -import () +import ( + "sync" +) type User struct { Id string @@ -28,11 +30,14 @@ type User struct { Ua string UpdateRev uint64 Status interface{} + mutex sync.RWMutex } func (u *User) Update(update *UserUpdate) uint64 { //user := reflect.ValueOf(&u).Elem() + u.mutex.Lock() + defer u.mutex.Unlock() for _, key := range update.Types { @@ -53,6 +58,20 @@ func (u *User) Update(update *UserUpdate) uint64 { } +func (u *User) Data() *DataUser { + + u.mutex.RLock() + defer u.mutex.RUnlock() + + return &DataUser{ + Id: u.Id, + Ua: u.Ua, + Status: u.Status, + Rev: u.UpdateRev, + } + +} + type UserUpdate struct { Id string Types []string