Browse Source

Move client map to sync.Map

gek/new-load-tests
Gabe Kangas 4 years ago
parent
commit
81a423ec52
  1. 17
      core/chat/chat.go
  2. 4
      core/chat/events.go
  3. 59
      core/chat/server.go

17
core/chat/chat.go

@ -29,10 +29,11 @@ func Start(getStatusFunc func() models.Status) error { @@ -29,10 +29,11 @@ func Start(getStatusFunc func() models.Status) error {
// GetClientsForUser will return chat connections that are owned by a specific user.
func GetClientsForUser(userID string) ([]*Client, error) {
clients := map[string][]*Client{}
for _, client := range _server.clients {
_server.clients.Range(func(k, v interface{}) bool {
client := v.(*Client)
clients[client.User.ID] = append(clients[client.User.ID], client)
}
return true
})
if _, exists := clients[userID]; !exists {
return nil, errors.New("no connections for user found")
@ -43,8 +44,8 @@ func GetClientsForUser(userID string) ([]*Client, error) { @@ -43,8 +44,8 @@ func GetClientsForUser(userID string) ([]*Client, error) {
// FindClientByID will return a single connected client by ID.
func FindClientByID(clientID uint) (*Client, bool) {
client, found := _server.clients[clientID]
return client, found
client, found := _server.clients.Load(clientID)
return client.(*Client), found
}
// GetClients will return all the current chat clients connected.
@ -52,9 +53,11 @@ func GetClients() []*Client { @@ -52,9 +53,11 @@ func GetClients() []*Client {
clients := []*Client{}
// Convert the keyed map to a slice.
for _, client := range _server.clients {
_server.clients.Range(func(k, v interface{}) bool {
client := v.(*Client)
clients = append(clients, client)
}
return true
})
sort.Slice(clients, func(i, j int) bool {
return clients[i].ConnectedAt.Before(clients[j].ConnectedAt)

4
core/chat/events.go

@ -86,7 +86,7 @@ func (s *Server) userMessageSent(eventData chatClientEvent) { @@ -86,7 +86,7 @@ func (s *Server) userMessageSent(eventData chatClientEvent) {
}
// Ignore if the stream has been offline
if !getStatus().Online && getStatus().LastDisconnectTime != nil {
if !getStatus().Online || getStatus().LastDisconnectTime != nil {
disconnectedTime := getStatus().LastDisconnectTime.Time
if time.Since(disconnectedTime) > 5*time.Minute {
return
@ -112,5 +112,5 @@ func (s *Server) userMessageSent(eventData chatClientEvent) { @@ -112,5 +112,5 @@ func (s *Server) userMessageSent(eventData chatClientEvent) {
SaveUserMessage(event)
eventData.client.MessageCount = eventData.client.MessageCount + 1
_lastSeenCache[event.User.ID] = time.Now()
_lastSeenCache.Store(event.User.ID, time.Now())
}

59
core/chat/server.go

@ -22,13 +22,12 @@ import ( @@ -22,13 +22,12 @@ import (
var _server *Server
// a map of user IDs and when they last were active.
var _lastSeenCache = map[string]time.Time{}
var _lastSeenCache = sync.Map{}
// Server represents an instance of the chat server.
type Server struct {
mu sync.RWMutex
seq uint
clients map[uint]*Client
clients sync.Map
maxSocketConnectionLimit int64
// send outbound message payload to all clients
@ -41,6 +40,7 @@ type Server struct { @@ -41,6 +40,7 @@ type Server struct {
unregister chan uint // the ChatClient id
geoipClient *geoip.Client
clientCount int64
}
// NewChat will return a new instance of the chat server.
@ -49,7 +49,7 @@ func NewChat() *Server { @@ -49,7 +49,7 @@ func NewChat() *Server {
setSystemConcurrentConnectionLimit(maximumConcurrentConnectionLimit)
server := &Server{
clients: map[uint]*Client{},
clients: sync.Map{},
outbound: make(chan []byte),
inbound: make(chan chatClientEvent),
unregister: make(chan uint),
@ -65,14 +65,12 @@ func (s *Server) Run() { @@ -65,14 +65,12 @@ func (s *Server) Run() {
for {
select {
case clientID := <-s.unregister:
if _, ok := s.clients[clientID]; ok {
s.mu.Lock()
delete(s.clients, clientID)
s.mu.Unlock()
if _, ok := s.clients.Load(clientID); ok {
s.clients.Delete(clientID)
}
case message := <-s.inbound:
s.eventReceived(message)
go s.eventReceived(message)
}
}
}
@ -85,27 +83,24 @@ func (s *Server) Addclient(conn *websocket.Conn, user *user.User, accessToken st @@ -85,27 +83,24 @@ func (s *Server) Addclient(conn *websocket.Conn, user *user.User, accessToken st
User: user,
ipAddress: ipAddress,
accessToken: accessToken,
send: make(chan []byte, 256),
send: make(chan []byte),
UserAgent: userAgent,
ConnectedAt: time.Now(),
}
// Do not send user re-joined broadcast message if they've been active within 5 minutes.
shouldSendJoinedMessages := true
if previouslyLastSeen, ok := _lastSeenCache[user.ID]; ok && time.Since(previouslyLastSeen) < time.Minute*5 {
if previouslyLastSeen, ok := _lastSeenCache.Load(user.ID); ok && time.Since(previouslyLastSeen.(time.Time)) < time.Minute*5 {
shouldSendJoinedMessages = false
}
s.mu.Lock()
{
client.id = s.seq
s.clients[client.id] = client
s.seq++
_lastSeenCache[user.ID] = time.Now()
}
s.mu.Unlock()
client.id = s.seq
s.clients.Store(client.id, client)
s.seq++
_lastSeenCache.Store(client.id, client)
log.Traceln("Adding client", client.id, "total count:", len(s.clients))
s.clientCount++
log.Traceln("Adding client", client.id, "total count:", s.clientCount)
go client.writePump()
go client.readPump()
@ -143,14 +138,13 @@ func (s *Server) sendUserJoinedMessage(c *Client) { @@ -143,14 +138,13 @@ func (s *Server) sendUserJoinedMessage(c *Client) {
// ClientClosed is fired when a client disconnects or connection is dropped.
func (s *Server) ClientClosed(c *Client) {
s.mu.Lock()
defer s.mu.Unlock()
c.close()
if _, ok := s.clients[c.id]; ok {
if _, ok := s.clients.Load(c.id); ok {
log.Debugln("Deleting", c.id)
delete(s.clients, c.id)
s.clients.Delete(c.id)
}
s.clientCount--
}
// HandleClientConnection is fired when a single client connects to the websocket.
@ -161,7 +155,7 @@ func (s *Server) HandleClientConnection(w http.ResponseWriter, r *http.Request) @@ -161,7 +155,7 @@ func (s *Server) HandleClientConnection(w http.ResponseWriter, r *http.Request)
}
// Limit concurrent chat connections
if int64(len(s.clients)) >= s.maxSocketConnectionLimit {
if int64(s.clientCount) >= s.maxSocketConnectionLimit {
log.Warnln("rejecting incoming client connection as it exceeds the max client count of", s.maxSocketConnectionLimit)
_, _ = w.Write([]byte(events.ErrorMaxConnectionsExceeded))
return
@ -216,18 +210,16 @@ func (s *Server) Broadcast(payload events.EventPayload) error { @@ -216,18 +210,16 @@ func (s *Server) Broadcast(payload events.EventPayload) error {
}
go func() {
s.mu.Lock()
defer s.mu.Unlock()
defer func() {
if a := recover(); a != nil {
fmt.Println("RECOVER", a)
}
}()
for _, client := range s.clients {
s.clients.Range(func(k, v interface{}) bool {
client := v.(*Client)
if client == nil {
continue
return false
}
if client.send != nil {
@ -235,10 +227,11 @@ func (s *Server) Broadcast(payload events.EventPayload) error { @@ -235,10 +227,11 @@ func (s *Server) Broadcast(payload events.EventPayload) error {
case client.send <- data:
default:
client.close()
delete(s.clients, client.id)
s.clients.Delete(client.id)
}
}
}
return true
})
}()
return nil
@ -259,9 +252,7 @@ func (s *Server) Send(payload events.EventPayload, client *Client) { @@ -259,9 +252,7 @@ func (s *Server) Send(payload events.EventPayload, client *Client) {
// DisconnectUser will forcefully disconnect all clients belonging to a user by ID.
func (s *Server) DisconnectUser(userID string) {
s.mu.Lock()
clients, err := GetClientsForUser(userID)
s.mu.Unlock()
if err != nil || clients == nil || len(clients) == 0 {
log.Debugln("Requested to disconnect user", userID, err)

Loading…
Cancel
Save