@ -18,12 +18,13 @@ import (
@@ -18,12 +18,13 @@ import (
"github.com/owncast/owncast/utils"
)
var _server * Chat Server
var _server * Server
type ChatServer struct {
// Server represents an instance of the chat server.
type Server struct {
mu sync . RWMutex
seq uint
clients map [ uint ] * ChatC lient
clients map [ uint ] * Client
maxSocketConnectionLimit int64
// send outbound message payload to all clients
@ -36,12 +37,13 @@ type ChatServer struct {
@@ -36,12 +37,13 @@ type ChatServer struct {
unregister chan uint // the ChatClient id
}
func NewChat ( ) * ChatServer {
// NewChat will return a new instance of the chat server.
func NewChat ( ) * Server {
maximumConcurrentConnectionLimit := getMaximumConcurrentConnectionLimit ( )
setSystemConcurrentConnectionLimit ( maximumConcurrentConnectionLimit )
server := & Chat Server{
clients : map [ uint ] * ChatC lient { } ,
server := & Server {
clients : map [ uint ] * Client { } ,
outbound : make ( chan [ ] byte ) ,
inbound : make ( chan chatClientEvent ) ,
unregister : make ( chan uint ) ,
@ -51,13 +53,14 @@ func NewChat() *ChatServer {
@@ -51,13 +53,14 @@ func NewChat() *ChatServer {
return server
}
func ( s * ChatServer ) Run ( ) {
// Run will start the chat server.
func ( s * Server ) Run ( ) {
for {
select {
case clientId := <- s . unregister :
if _ , ok := s . clients [ clientId ] ; ok {
case clientID := <- s . unregister :
if _ , ok := s . clients [ clientID ] ; ok {
s . mu . Lock ( )
delete ( s . clients , clientId )
delete ( s . clients , clientID )
s . mu . Unlock ( )
}
@ -68,8 +71,8 @@ func (s *ChatServer) Run() {
@@ -68,8 +71,8 @@ func (s *ChatServer) Run() {
}
// Addclient registers new connection as a User.
func ( s * Chat Server) Addclient ( conn * websocket . Conn , user * user . User , accessToken string , userAgent string , ipAddress string ) * Chat Client {
client := & ChatC lient {
func ( s * Server ) Addclient ( conn * websocket . Conn , user * user . User , accessToken string , userAgent string , ipAddress string ) * Client {
client := & Client {
server : s ,
conn : conn ,
User : user ,
@ -101,14 +104,14 @@ func (s *ChatServer) Addclient(conn *websocket.Conn, user *user.User, accessToke
@@ -101,14 +104,14 @@ func (s *ChatServer) Addclient(conn *websocket.Conn, user *user.User, accessToke
}
// Asynchronously, optionally, fetch GeoIP data.
go func ( client * ChatC lient ) {
go func ( client * Client ) {
client . Geo = geoip . GetGeoFromIP ( ipAddress )
} ( client )
return client
}
func ( s * Chat Server) sendUserJoinedMessage ( c * Chat Client) {
func ( s * Server ) sendUserJoinedMessage ( c * Client ) {
userJoinedEvent := events . UserJoinedEvent { }
userJoinedEvent . SetDefaults ( )
userJoinedEvent . User = c . User
@ -121,7 +124,8 @@ func (s *ChatServer) sendUserJoinedMessage(c *ChatClient) {
@@ -121,7 +124,8 @@ func (s *ChatServer) sendUserJoinedMessage(c *ChatClient) {
webhooks . SendChatEventUserJoined ( userJoinedEvent )
}
func ( s * ChatServer ) ClientClosed ( c * ChatClient ) {
// ClientClosed is fired when a client disconnects or connection is dropped.
func ( s * Server ) ClientClosed ( c * Client ) {
s . mu . Lock ( )
defer s . mu . Unlock ( )
c . close ( )
@ -132,7 +136,8 @@ func (s *ChatServer) ClientClosed(c *ChatClient) {
@@ -132,7 +136,8 @@ func (s *ChatServer) ClientClosed(c *ChatClient) {
}
}
func ( s * ChatServer ) HandleClientConnection ( w http . ResponseWriter , r * http . Request ) {
// HandleClientConnection is fired when a single client connects to the websocket.
func ( s * Server ) HandleClientConnection ( w http . ResponseWriter , r * http . Request ) {
if data . GetChatDisabled ( ) {
_ , _ = w . Write ( [ ] byte ( events . ChatDisabled ) )
return
@ -155,7 +160,7 @@ func (s *ChatServer) HandleClientConnection(w http.ResponseWriter, r *http.Reque
@@ -155,7 +160,7 @@ func (s *ChatServer) HandleClientConnection(w http.ResponseWriter, r *http.Reque
if accessToken == "" {
log . Errorln ( "Access token is required" )
// Return HTTP status code
conn . Close ( )
_ = conn . Close ( )
return
}
@ -166,17 +171,17 @@ func (s *ChatServer) HandleClientConnection(w http.ResponseWriter, r *http.Reque
@@ -166,17 +171,17 @@ func (s *ChatServer) HandleClientConnection(w http.ResponseWriter, r *http.Reque
"type" : events . ErrorNeedsRegistration ,
} )
// Send error that registration is required
conn . Close ( )
_ = conn . Close ( )
return
}
// User is disabled therefore we should disconnect.
if user . DisabledAt != nil {
log . Traceln ( "Disabled user" , user . Id , user . DisplayName , "rejected" )
log . Traceln ( "Disabled user" , user . ID , user . DisplayName , "rejected" )
_ = conn . WriteJSON ( events . EventPayload {
"type" : events . ErrorUserDisabled ,
} )
conn . Close ( )
_ = conn . Close ( )
return
}
@ -187,7 +192,7 @@ func (s *ChatServer) HandleClientConnection(w http.ResponseWriter, r *http.Reque
@@ -187,7 +192,7 @@ func (s *ChatServer) HandleClientConnection(w http.ResponseWriter, r *http.Reque
}
// Broadcast sends message to all connected clients.
func ( s * Chat Server) Broadcast ( payload events . EventPayload ) error {
func ( s * Server ) Broadcast ( payload events . EventPayload ) error {
data , err := json . Marshal ( payload )
if err != nil {
return err
@ -212,7 +217,8 @@ func (s *ChatServer) Broadcast(payload events.EventPayload) error {
@@ -212,7 +217,8 @@ func (s *ChatServer) Broadcast(payload events.EventPayload) error {
return nil
}
func ( s * ChatServer ) Send ( payload events . EventPayload , client * ChatClient ) {
// Send will send a single payload to a single connected client.
func ( s * Server ) Send ( payload events . EventPayload , client * Client ) {
data , err := json . Marshal ( payload )
if err != nil {
log . Errorln ( err )
@ -223,7 +229,7 @@ func (s *ChatServer) Send(payload events.EventPayload, client *ChatClient) {
@@ -223,7 +229,7 @@ func (s *ChatServer) Send(payload events.EventPayload, client *ChatClient) {
}
// DisconnectUser will forcefully disconnect all clients belonging to a user by ID.
func ( s * Chat Server) DisconnectUser ( userID string ) {
func ( s * Server ) DisconnectUser ( userID string ) {
s . mu . Lock ( )
clients , err := GetClientsForUser ( userID )
s . mu . Unlock ( )
@ -234,9 +240,9 @@ func (s *ChatServer) DisconnectUser(userID string) {
@@ -234,9 +240,9 @@ func (s *ChatServer) DisconnectUser(userID string) {
}
for _ , client := range clients {
log . Traceln ( "Disconnecting client" , client . User . Id , "owned by" , client . User . DisplayName )
log . Traceln ( "Disconnecting client" , client . User . ID , "owned by" , client . User . DisplayName )
go func ( client * ChatC lient ) {
go func ( client * Client ) {
event := events . UserDisabledEvent { }
event . SetDefaults ( )
@ -257,7 +263,7 @@ func (s *ChatServer) DisconnectUser(userID string) {
@@ -257,7 +263,7 @@ func (s *ChatServer) DisconnectUser(userID string) {
}
}
func ( s * Chat Server) eventReceived ( event chatClientEvent ) {
func ( s * Server ) eventReceived ( event chatClientEvent ) {
var typecheck map [ string ] interface { }
if err := json . Unmarshal ( event . data , & typecheck ) ; err != nil {
log . Debugln ( err )
@ -277,7 +283,7 @@ func (s *ChatServer) eventReceived(event chatClientEvent) {
@@ -277,7 +283,7 @@ func (s *ChatServer) eventReceived(event chatClientEvent) {
}
}
func ( s * Chat Server) sendWelcomeMessageToClient ( c * Chat Client) {
func ( s * Server ) sendWelcomeMessageToClient ( c * Client ) {
// Add an artificial delay so people notice this message come in.
time . Sleep ( 7 * time . Second )
@ -288,7 +294,7 @@ func (s *ChatServer) sendWelcomeMessageToClient(c *ChatClient) {
@@ -288,7 +294,7 @@ func (s *ChatServer) sendWelcomeMessageToClient(c *ChatClient) {
}
}
func ( s * Chat Server) sendAllWelcomeMessage ( ) {
func ( s * Server ) sendAllWelcomeMessage ( ) {
welcomeMessage := utils . RenderSimpleMarkdown ( data . GetServerWelcomeMessage ( ) )
if welcomeMessage != "" {
@ -303,7 +309,7 @@ func (s *ChatServer) sendAllWelcomeMessage() {
@@ -303,7 +309,7 @@ func (s *ChatServer) sendAllWelcomeMessage() {
}
}
func ( s * Chat Server) sendSystemMessageToClient ( c * Chat Client, message string ) {
func ( s * Server ) sendSystemMessageToClient ( c * Client , message string ) {
clientMessage := events . SystemMessageEvent {
Event : events . Event { } ,
MessageEvent : events . MessageEvent {
@ -314,7 +320,7 @@ func (s *ChatServer) sendSystemMessageToClient(c *ChatClient, message string) {
@@ -314,7 +320,7 @@ func (s *ChatServer) sendSystemMessageToClient(c *ChatClient, message string) {
s . Send ( clientMessage . GetBroadcastPayload ( ) , c )
}
func ( s * Chat Server) sendActionToClient ( c * Chat Client, message string ) {
func ( s * Server ) sendActionToClient ( c * Client , message string ) {
clientMessage := events . ActionEvent {
MessageEvent : events . MessageEvent {
Body : message ,