Browse Source

Handle viewer counts outside of websocket connections

pull/5/head
Gabe Kangas 5 years ago
parent
commit
344da52e3d
  1. 9
      client.go
  2. 19
      main.go
  3. 11
      server.go
  4. 65
      stats.go
  5. 18
      utils.go

9
client.go

@ -10,11 +10,9 @@ import (
const channelBufSize = 100 const channelBufSize = 100
var maxId int = 0
// Chat client. // Chat client.
type Client struct { type Client struct {
id int id string
ws *websocket.Conn ws *websocket.Conn
server *Server server *Server
ch chan *Message ch chan *Message
@ -32,11 +30,10 @@ func NewClient(ws *websocket.Conn, server *Server) *Client {
panic("server cannot be nil") panic("server cannot be nil")
} }
maxId++
ch := make(chan *Message, channelBufSize) ch := make(chan *Message, channelBufSize)
doneCh := make(chan bool) doneCh := make(chan bool)
clientID := getClientIDFromRequest(ws.Request())
return &Client{maxId, ws, server, ch, doneCh} return &Client{clientID, ws, server, ch, doneCh}
} }
func (c *Client) Conn() *websocket.Conn { func (c *Client) Conn() *websocket.Conn {

19
main.go

@ -3,6 +3,7 @@ package main
import ( import (
"encoding/json" "encoding/json"
"net/http" "net/http"
"path"
"strconv" "strconv"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -49,7 +50,15 @@ func startChatServer() {
go server.Listen() go server.Listen()
// static files // static files
http.Handle("/", http.FileServer(http.Dir("webroot"))) http.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
http.ServeFile(w, r, path.Join("webroot", r.URL.Path))
if path.Ext(r.URL.Path) == ".m3u8" {
clientID := getClientIDFromRequest(r)
stats.SetClientActive(clientID)
}
})
http.HandleFunc("/status", getStatus) http.HandleFunc("/status", getStatus)
log.Printf("Starting public web server on port %d", configuration.WebServerPort) log.Printf("Starting public web server on port %d", configuration.WebServerPort)
@ -81,10 +90,10 @@ func streamDisconnected() {
stats.StreamDisconnected() stats.StreamDisconnected()
} }
func viewerAdded() { func viewerAdded(clientID string) {
stats.SetViewerCount(server.ClientCount()) stats.SetClientActive(clientID)
} }
func viewerRemoved() { func viewerRemoved(clientID string) {
stats.SetViewerCount(server.ClientCount()) stats.ViewerDisconnected(clientID)
} }

11
server.go

@ -11,7 +11,7 @@ import (
type Server struct { type Server struct {
pattern string pattern string
messages []*Message messages []*Message
clients map[int]*Client clients map[string]*Client
addCh chan *Client addCh chan *Client
delCh chan *Client delCh chan *Client
sendAllCh chan *Message sendAllCh chan *Message
@ -22,7 +22,7 @@ type Server struct {
// Create new chat server. // Create new chat server.
func NewServer(pattern string) *Server { func NewServer(pattern string) *Server {
messages := []*Message{} messages := []*Message{}
clients := make(map[int]*Client) clients := make(map[string]*Client)
addCh := make(chan *Client) addCh := make(chan *Client)
delCh := make(chan *Client) delCh := make(chan *Client)
sendAllCh := make(chan *Message) sendAllCh := make(chan *Message)
@ -100,17 +100,14 @@ func (s *Server) Listen() {
// Add new a client // Add new a client
case c := <-s.addCh: case c := <-s.addCh:
log.Println("Added new client")
s.clients[c.id] = c s.clients[c.id] = c
log.Println("Now", len(s.clients), "clients connected.") viewerAdded(c.id)
viewerAdded()
s.sendPastMessages(c) s.sendPastMessages(c)
// del a client // del a client
case c := <-s.delCh: case c := <-s.delCh:
log.Println("Delete client")
delete(s.clients, c.id) delete(s.clients, c.id)
viewerRemoved() viewerRemoved(c.id)
// broadcast message for all clients // broadcast message for all clients
case msg := <-s.sendAllCh: case msg := <-s.sendAllCh:

65
stats.go

@ -1,8 +1,18 @@
/*
Viewer counting doesn't just count the number of websocket clients that are currently connected,
because people may be watching the stream outside of the web browser via any HLS video client.
Instead we keep track of requests and consider each unique IP as a "viewer".
As a signal, however, we do use the websocket disconnect from a client as a signal that a viewer
dropped and we call ViewerDisconnected().
*/
package main package main
import ( import (
"encoding/json" "encoding/json"
"fmt"
"io/ioutil" "io/ioutil"
"log"
"math" "math"
"os" "os"
"time" "time"
@ -10,40 +20,53 @@ import (
type Stats struct { type Stats struct {
streamConnected bool `json:"-"` streamConnected bool `json:"-"`
ViewerCount int `json:"viewerCount"`
SessionMaxViewerCount int `json:"sessionMaxViewerCount"` SessionMaxViewerCount int `json:"sessionMaxViewerCount"`
OverallMaxViewerCount int `json:"overallMaxViewerCount"` OverallMaxViewerCount int `json:"overallMaxViewerCount"`
LastDisconnectTime time.Time `json:"lastDisconnectTime"` LastDisconnectTime time.Time `json:"lastDisconnectTime"`
clients map[string]time.Time
} }
func (s *Stats) Setup() { func (s *Stats) Setup() {
ticker := time.NewTicker(2 * time.Minute) s.clients = make(map[string]time.Time)
quit := make(chan struct{})
statsSaveTimer := time.NewTicker(2 * time.Minute)
go func() { go func() {
for { for {
select { select {
case <-ticker.C: case <-statsSaveTimer.C:
s.save() s.save()
case <-quit: }
ticker.Stop() }
return }()
staleViewerPurgeTimer := time.NewTicker(5 * time.Second)
go func() {
for {
select {
case <-staleViewerPurgeTimer.C:
s.purgeStaleViewers()
} }
} }
}() }()
} }
func (s *Stats) IsStreamConnected() bool { func (s *Stats) purgeStaleViewers() {
return s.streamConnected for clientID, lastConnectedtime := range s.clients {
timeSinceLastActive := time.Since(lastConnectedtime).Minutes()
if timeSinceLastActive > 2 {
s.ViewerDisconnected(clientID)
}
}
} }
func (s *Stats) SetViewerCount(count int) { func (s *Stats) IsStreamConnected() bool {
s.ViewerCount = count return s.streamConnected
s.SessionMaxViewerCount = int(math.Max(float64(s.ViewerCount), float64(s.SessionMaxViewerCount)))
s.OverallMaxViewerCount = int(math.Max(float64(s.SessionMaxViewerCount), float64(s.OverallMaxViewerCount)))
} }
func (s *Stats) GetViewerCount() int { func (s *Stats) GetViewerCount() int {
return s.ViewerCount return len(s.clients)
} }
func (s *Stats) GetSessionMaxViewerCount() int { func (s *Stats) GetSessionMaxViewerCount() int {
@ -54,10 +77,20 @@ func (s *Stats) GetOverallMaxViewerCount() int {
return s.OverallMaxViewerCount return s.OverallMaxViewerCount
} }
func (s *Stats) ViewerConnected() { func (s *Stats) SetClientActive(clientID string) {
fmt.Println("Marking client active:", clientID)
s.clients[clientID] = time.Now()
s.SessionMaxViewerCount = int(math.Max(float64(s.GetViewerCount()), float64(s.SessionMaxViewerCount)))
s.OverallMaxViewerCount = int(math.Max(float64(s.SessionMaxViewerCount), float64(s.OverallMaxViewerCount)))
fmt.Println("Now", s.GetViewerCount(), "clients connected.")
} }
func (s *Stats) ViewerDisconnected() { func (s *Stats) ViewerDisconnected(clientID string) {
log.Println("Removed client", clientID)
delete(s.clients, clientID)
} }
func (s *Stats) StreamConnected() { func (s *Stats) StreamConnected() {

18
utils.go

@ -3,6 +3,7 @@ package main
import ( import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"net/http"
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
@ -65,3 +66,20 @@ func resetDirectories(configuration Config) {
os.MkdirAll(path.Join(configuration.PublicHLSPath, strconv.Itoa(index)), 0777) os.MkdirAll(path.Join(configuration.PublicHLSPath, strconv.Itoa(index)), 0777)
} }
} }
func getClientIDFromRequest(req *http.Request) string {
var ipAddress string
xForwardedFor := req.Header.Get("X-FORWARDED-FOR")
if xForwardedFor != "" {
ipAddress = xForwardedFor
} else {
ipAddressString := req.RemoteAddr
ipAddressComponents := strings.Split(ipAddressString, ":")
ipAddressComponents[len(ipAddressComponents)-1] = ""
ipAddress = strings.Join(ipAddressComponents, ":")
}
// fmt.Println("IP address determined to be", ipAddress)
return ipAddress
}

Loading…
Cancel
Save