Compare commits

...

10 Commits

Author SHA1 Message Date
Owncast eeda334416 Commit updated Javascript packages 4 years ago
Gabe Kangas a393e213aa Create a pool for the type checking 4 years ago
Gabe Kangas 4c4c9400b0 Fix using nil disconnect time reference 4 years ago
Gabe Kangas 17b4ad7a08 Misc performance improvements. 4 years ago
Gabe Kangas 18eb0769bb Pass user agent to requests 4 years ago
Gabe Kangas 81a423ec52 Move client map to sync.Map 4 years ago
Gabe Kangas 3f65849801 Continued work of new load test 4 years ago
Gabe Kangas 3c594918a9 Pass along just the client id instead a reference to the entire client when unregistering the client 4 years ago
Gabe Kangas b0f9465138 WIP performance and load related updates 4 years ago
Gabe Kangas 4242d1731d WIP new load test suite 4 years ago
  1. 2994
      build/javascript/package-lock.json
  2. 4
      controllers/hls.go
  3. 17
      core/chat/chat.go
  4. 12
      core/chat/chatclient.go
  5. 2
      core/chat/events.go
  6. 48
      core/chat/persistence.go
  7. 106
      core/chat/server.go
  8. 15
      core/data/cache.go
  9. 2
      core/data/data.go
  10. 5
      core/data/messages.go
  11. 13
      core/data/persistence.go
  12. 6
      core/user/externalAPIUser.go
  13. 44
      core/user/user.go
  14. 2
      go.mod
  15. 4
      go.sum
  16. 38
      test/load/new/README.md
  17. 7
      test/load/new/run.sh
  18. 108
      test/load/new/test.js
  19. 2
      webroot/js/utils/websocket.js
  20. 2
      webroot/js/web_modules/@videojs/http-streaming/dist/videojs-http-streaming.min.js
  21. 2
      webroot/js/web_modules/htm.js
  22. 4
      webroot/js/web_modules/markjs/dist/mark.es6.min.js
  23. 2
      webroot/js/web_modules/micromodal/dist/micromodal.min.js
  24. 2
      webroot/js/web_modules/videojs/core.js

2994
build/javascript/package-lock.json generated

File diff suppressed because it is too large Load Diff

4
controllers/hls.go

@ -8,7 +8,6 @@ import ( @@ -8,7 +8,6 @@ import (
"strings"
"github.com/owncast/owncast/config"
"github.com/owncast/owncast/core"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/router/middleware"
"github.com/owncast/owncast/utils"
@ -39,8 +38,7 @@ func HandleHLSRequest(w http.ResponseWriter, r *http.Request) { @@ -39,8 +38,7 @@ func HandleHLSRequest(w http.ResponseWriter, r *http.Request) {
middleware.DisableCache(w)
// Use this as an opportunity to mark this viewer as active.
id := utils.GenerateClientIDFromRequest(r)
core.SetViewerIDActive(id)
Ping(w, r)
} else {
cacheTime := utils.GetCacheDurationSecondsForPath(relativePath)
w.Header().Set("Cache-Control", "public, max-age="+strconv.Itoa(cacheTime))

17
core/chat/chat.go

@ -29,10 +29,11 @@ func Start(getStatusFunc func() models.Status) error { @@ -29,10 +29,11 @@ func Start(getStatusFunc func() models.Status) error {
// GetClientsForUser will return chat connections that are owned by a specific user.
func GetClientsForUser(userID string) ([]*Client, error) {
clients := map[string][]*Client{}
for _, client := range _server.clients {
_server.clients.Range(func(k, v interface{}) bool {
client := v.(*Client)
clients[client.User.ID] = append(clients[client.User.ID], client)
}
return true
})
if _, exists := clients[userID]; !exists {
return nil, errors.New("no connections for user found")
@ -43,8 +44,8 @@ func GetClientsForUser(userID string) ([]*Client, error) { @@ -43,8 +44,8 @@ func GetClientsForUser(userID string) ([]*Client, error) {
// FindClientByID will return a single connected client by ID.
func FindClientByID(clientID uint) (*Client, bool) {
client, found := _server.clients[clientID]
return client, found
client, found := _server.clients.Load(clientID)
return client.(*Client), found
}
// GetClients will return all the current chat clients connected.
@ -52,9 +53,11 @@ func GetClients() []*Client { @@ -52,9 +53,11 @@ func GetClients() []*Client {
clients := []*Client{}
// Convert the keyed map to a slice.
for _, client := range _server.clients {
_server.clients.Range(func(k, v interface{}) bool {
client := v.(*Client)
clients = append(clients, client)
}
return true
})
sort.Slice(clients, func(i, j int) bool {
return clients[i].ConnectedAt.Before(clients[j].ConnectedAt)

12
core/chat/chatclient.go

@ -3,6 +3,7 @@ package chat @@ -3,6 +3,7 @@ package chat
import (
"bytes"
"encoding/json"
"sync"
"time"
log "github.com/sirupsen/logrus"
@ -32,6 +33,7 @@ type Client struct { @@ -32,6 +33,7 @@ type Client struct {
MessageCount int `json:"messageCount"`
UserAgent string `json:"userAgent"`
ConnectedAt time.Time `json:"connectedAt"`
mu sync.Mutex
}
type chatClientEvent struct {
@ -173,10 +175,18 @@ func (c *Client) handleEvent(data []byte) { @@ -173,10 +175,18 @@ func (c *Client) handleEvent(data []byte) {
}
func (c *Client) close() {
defer func() {
if a := recover(); a != nil {
log.Println("RECOVER", a)
}
}()
log.Traceln("client closed:", c.User.DisplayName, c.id, c.ipAddress)
_ = c.conn.Close()
c.server.unregister <- c.id
c.mu.Lock()
defer c.mu.Unlock()
if c.send != nil {
close(c.send)
c.send = nil
@ -212,6 +222,8 @@ func (c *Client) sendPayload(payload events.EventPayload) { @@ -212,6 +222,8 @@ func (c *Client) sendPayload(payload events.EventPayload) {
return
}
c.mu.Lock()
defer c.mu.Unlock()
if c.send != nil {
c.send <- data
}

2
core/chat/events.go

@ -112,5 +112,5 @@ func (s *Server) userMessageSent(eventData chatClientEvent) { @@ -112,5 +112,5 @@ func (s *Server) userMessageSent(eventData chatClientEvent) {
SaveUserMessage(event)
eventData.client.MessageCount = eventData.client.MessageCount + 1
_lastSeenCache[event.User.ID] = time.Now()
_lastSeenCache.Store(event.User.ID, time.Now())
}

48
core/chat/persistence.go

@ -1,6 +1,7 @@ @@ -1,6 +1,7 @@
package chat
import (
"database/sql"
"fmt"
"strings"
"time"
@ -42,9 +43,6 @@ func saveEvent(id string, userID string, body string, eventType string, hidden * @@ -42,9 +43,6 @@ func saveEvent(id string, userID string, body string, eventType string, hidden *
_historyCache = nil
}()
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
tx, err := _datastore.DB.Begin()
if err != nil {
log.Errorln("error saving", eventType, err)
@ -71,9 +69,9 @@ func saveEvent(id string, userID string, body string, eventType string, hidden * @@ -71,9 +69,9 @@ func saveEvent(id string, userID string, body string, eventType string, hidden *
}
}
func getChat(query string) []events.UserMessageEvent {
func getChat(query *sql.Stmt) []events.UserMessageEvent {
history := make([]events.UserMessageEvent, 0)
rows, err := _datastore.DB.Query(query)
rows, err := query.Query()
if err != nil {
log.Errorln("error fetching chat history", err)
return history
@ -154,6 +152,8 @@ func getChat(query string) []events.UserMessageEvent { @@ -154,6 +152,8 @@ func getChat(query string) []events.UserMessageEvent {
var _historyCache *[]events.UserMessageEvent
var _chatModerationHistoryQuery *sql.Stmt
// GetChatModerationHistory will return all the chat messages suitable for moderation purposes.
func GetChatModerationHistory() []events.UserMessageEvent {
if _historyCache != nil {
@ -161,19 +161,38 @@ func GetChatModerationHistory() []events.UserMessageEvent { @@ -161,19 +161,38 @@ func GetChatModerationHistory() []events.UserMessageEvent {
}
// Get all messages regardless of visibility
var query = "SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages INNER JOIN users ON messages.user_id = users.id ORDER BY timestamp DESC"
result := getChat(query)
if _chatModerationHistoryQuery == nil {
var query = "SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages INNER JOIN users ON messages.user_id = users.id ORDER BY timestamp DESC"
q, err := _datastore.DB.Prepare(query)
if err != nil {
log.Errorln(err)
return []events.UserMessageEvent{}
}
_chatModerationHistoryQuery = q
}
result := getChat(_chatModerationHistoryQuery)
_historyCache = &result
return result
}
var _getChatHistoryQuery *sql.Stmt
// GetChatHistory will return all the chat messages suitable for returning as user-facing chat history.
func GetChatHistory() []events.UserMessageEvent {
if _getChatHistoryQuery == nil {
var query = fmt.Sprintf("SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages, users WHERE messages.user_id = users.id AND hidden_at IS NULL AND disabled_at IS NULL ORDER BY timestamp DESC LIMIT %d", maxBacklogNumber)
q, err := _datastore.DB.Prepare(query)
if err != nil {
log.Errorln(err)
return []events.UserMessageEvent{}
}
_getChatHistoryQuery = q
}
// Get all visible messages
var query = fmt.Sprintf("SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages, users WHERE messages.user_id = users.id AND hidden_at IS NULL AND disabled_at IS NULL ORDER BY timestamp DESC LIMIT %d", maxBacklogNumber)
m := getChat(query)
m := getChat(_getChatHistoryQuery)
// Invert order of messages
for i, j := 0, len(m)-1; i < j; i, j = i+1, j-1 {
@ -192,7 +211,10 @@ func SetMessageVisibilityForUserID(userID string, visible bool) error { @@ -192,7 +211,10 @@ func SetMessageVisibilityForUserID(userID string, visible bool) error {
// Get a list of IDs from this user within the 5hr window to send to the connected clients to hide
ids := make([]string, 0)
query := fmt.Sprintf("SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages INNER JOIN users ON messages.user_id = users.id WHERE user_id IS '%s'", userID)
query, err := _datastore.DB.Prepare(fmt.Sprintf("SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages INNER JOIN users ON messages.user_id = users.id WHERE user_id IS '%s'", userID))
if err != nil {
return err
}
messages := getChat(query)
if len(messages) == 0 {
@ -212,9 +234,6 @@ func saveMessageVisibility(messageIDs []string, visible bool) error { @@ -212,9 +234,6 @@ func saveMessageVisibility(messageIDs []string, visible bool) error {
_historyCache = nil
}()
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
tx, err := _datastore.DB.Begin()
if err != nil {
return err
@ -290,9 +309,6 @@ func getMessageByID(messageID string) (*events.UserMessageEvent, error) { @@ -290,9 +309,6 @@ func getMessageByID(messageID string) (*events.UserMessageEvent, error) {
// Only keep recent messages so we don't keep more chat data than needed
// for privacy and efficiency reasons.
func runPruner() {
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
log.Traceln("Removing chat messages older than", maxBacklogHours, "hours")
deleteStatement := `DELETE FROM messages WHERE timestamp <= datetime('now', 'localtime', ?)`

106
core/chat/server.go

@ -2,6 +2,7 @@ package chat @@ -2,6 +2,7 @@ package chat
import (
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
@ -21,13 +22,20 @@ import ( @@ -21,13 +22,20 @@ import (
var _server *Server
// a map of user IDs and when they last were active.
var _lastSeenCache = map[string]time.Time{}
var _lastSeenCache = sync.Map{}
type typeCheck struct {
Type string `json:"type"`
}
var _typeCheckPool = sync.Pool{
New: func() interface{} { return new(typeCheck) },
}
// Server represents an instance of the chat server.
type Server struct {
mu sync.RWMutex
seq uint
clients map[uint]*Client
clients sync.Map
maxSocketConnectionLimit int64
// send outbound message payload to all clients
@ -40,6 +48,7 @@ type Server struct { @@ -40,6 +48,7 @@ type Server struct {
unregister chan uint // the ChatClient id
geoipClient *geoip.Client
clientCount int64
}
// NewChat will return a new instance of the chat server.
@ -48,7 +57,7 @@ func NewChat() *Server { @@ -48,7 +57,7 @@ func NewChat() *Server {
setSystemConcurrentConnectionLimit(maximumConcurrentConnectionLimit)
server := &Server{
clients: map[uint]*Client{},
clients: sync.Map{},
outbound: make(chan []byte),
inbound: make(chan chatClientEvent),
unregister: make(chan uint),
@ -64,14 +73,12 @@ func (s *Server) Run() { @@ -64,14 +73,12 @@ func (s *Server) Run() {
for {
select {
case clientID := <-s.unregister:
if _, ok := s.clients[clientID]; ok {
s.mu.Lock()
delete(s.clients, clientID)
s.mu.Unlock()
if _, ok := s.clients.Load(clientID); ok {
s.clients.Delete(clientID)
}
case message := <-s.inbound:
s.eventReceived(message)
go s.eventReceived(message)
}
}
}
@ -84,27 +91,24 @@ func (s *Server) Addclient(conn *websocket.Conn, user *user.User, accessToken st @@ -84,27 +91,24 @@ func (s *Server) Addclient(conn *websocket.Conn, user *user.User, accessToken st
User: user,
ipAddress: ipAddress,
accessToken: accessToken,
send: make(chan []byte, 256),
send: make(chan []byte),
UserAgent: userAgent,
ConnectedAt: time.Now(),
}
// Do not send user re-joined broadcast message if they've been active within 5 minutes.
shouldSendJoinedMessages := true
if previouslyLastSeen, ok := _lastSeenCache[user.ID]; ok && time.Since(previouslyLastSeen) < time.Minute*5 {
if previouslyLastSeen, ok := _lastSeenCache.Load(user.ID); ok && time.Since(previouslyLastSeen.(time.Time)) < time.Minute*5 {
shouldSendJoinedMessages = false
}
s.mu.Lock()
{
client.id = s.seq
s.clients[client.id] = client
s.seq++
_lastSeenCache[user.ID] = time.Now()
}
s.mu.Unlock()
client.id = s.seq
s.clients.Store(client.id, client)
s.seq++
_lastSeenCache.Store(client.id, client)
log.Traceln("Adding client", client.id, "total count:", len(s.clients))
s.clientCount++
log.Traceln("Adding client", client.id, "total count:", s.clientCount)
go client.writePump()
go client.readPump()
@ -142,14 +146,13 @@ func (s *Server) sendUserJoinedMessage(c *Client) { @@ -142,14 +146,13 @@ func (s *Server) sendUserJoinedMessage(c *Client) {
// ClientClosed is fired when a client disconnects or connection is dropped.
func (s *Server) ClientClosed(c *Client) {
s.mu.Lock()
defer s.mu.Unlock()
c.close()
if _, ok := s.clients[c.id]; ok {
if _, ok := s.clients.Load(c.id); ok {
log.Debugln("Deleting", c.id)
delete(s.clients, c.id)
s.clients.Delete(c.id)
}
s.clientCount--
}
// HandleClientConnection is fired when a single client connects to the websocket.
@ -160,7 +163,7 @@ func (s *Server) HandleClientConnection(w http.ResponseWriter, r *http.Request) @@ -160,7 +163,7 @@ func (s *Server) HandleClientConnection(w http.ResponseWriter, r *http.Request)
}
// Limit concurrent chat connections
if int64(len(s.clients)) >= s.maxSocketConnectionLimit {
if int64(s.clientCount) >= s.maxSocketConnectionLimit {
log.Warnln("rejecting incoming client connection as it exceeds the max client count of", s.maxSocketConnectionLimit)
_, _ = w.Write([]byte(events.ErrorMaxConnectionsExceeded))
return
@ -214,21 +217,30 @@ func (s *Server) Broadcast(payload events.EventPayload) error { @@ -214,21 +217,30 @@ func (s *Server) Broadcast(payload events.EventPayload) error {
return err
}
s.mu.Lock()
defer s.mu.Unlock()
go func() {
defer func() {
if a := recover(); a != nil {
fmt.Println("RECOVER", a)
}
}()
for _, client := range s.clients {
if client == nil {
continue
}
s.clients.Range(func(k, v interface{}) bool {
client := v.(*Client)
if client == nil {
return false
}
select {
case client.send <- data:
default:
client.close()
delete(s.clients, client.id)
}
}
if client.send != nil {
select {
case client.send <- data:
default:
client.close()
s.clients.Delete(client.id)
}
}
return true
})
}()
return nil
}
@ -241,14 +253,14 @@ func (s *Server) Send(payload events.EventPayload, client *Client) { @@ -241,14 +253,14 @@ func (s *Server) Send(payload events.EventPayload, client *Client) {
return
}
client.send <- data
if client.send != nil {
client.send <- data
}
}
// DisconnectUser will forcefully disconnect all clients belonging to a user by ID.
func (s *Server) DisconnectUser(userID string) {
s.mu.Lock()
clients, err := GetClientsForUser(userID)
s.mu.Unlock()
if err != nil || clients == nil || len(clients) == 0 {
log.Debugln("Requested to disconnect user", userID, err)
@ -280,14 +292,14 @@ func (s *Server) DisconnectUser(userID string) { @@ -280,14 +292,14 @@ func (s *Server) DisconnectUser(userID string) {
}
func (s *Server) eventReceived(event chatClientEvent) {
var typecheck map[string]interface{}
if err := json.Unmarshal(event.data, &typecheck); err != nil {
tc := _typeCheckPool.Get().(*typeCheck)
defer _typeCheckPool.Put(tc)
if err := json.Unmarshal(event.data, tc); err != nil {
log.Debugln(err)
}
eventType := typecheck["type"]
switch eventType {
switch tc.Type {
case events.MessageSent:
s.userMessageSent(event)
@ -295,7 +307,7 @@ func (s *Server) eventReceived(event chatClientEvent) { @@ -295,7 +307,7 @@ func (s *Server) eventReceived(event chatClientEvent) {
s.userNameChanged(event)
default:
log.Debugln(eventType, "event not found:", typecheck)
log.Debugln(tc.Type, "event not found:", tc)
}
}

15
core/data/cache.go

@ -2,19 +2,13 @@ package data @@ -2,19 +2,13 @@ package data
import (
"errors"
"sync"
)
var _cacheLock = sync.Mutex{}
// GetCachedValue will return a value for key from the cache.
func (ds *Datastore) GetCachedValue(key string) ([]byte, error) {
_cacheLock.Lock()
defer _cacheLock.Unlock()
// Check for a cached value
if val, ok := ds.cache[key]; ok {
return val, nil
if val, ok := ds.cache.Load(key); ok {
return val.([]byte), nil
}
return nil, errors.New(key + " not found in cache")
@ -22,8 +16,5 @@ func (ds *Datastore) GetCachedValue(key string) ([]byte, error) { @@ -22,8 +16,5 @@ func (ds *Datastore) GetCachedValue(key string) ([]byte, error) {
// SetCachedValue will set a value for key in the cache.
func (ds *Datastore) SetCachedValue(key string, b []byte) {
_cacheLock.Lock()
defer _cacheLock.Unlock()
ds.cache[key] = b
ds.cache.Store(key, b)
}

2
core/data/data.go

@ -51,7 +51,7 @@ func SetupPersistence(file string) error { @@ -51,7 +51,7 @@ func SetupPersistence(file string) error {
// Some SQLite optimizations
_, _ = db.Exec("pragma journal_mode = WAL")
_, _ = db.Exec("pragma synchronous = normal")
_, _ = db.Exec("pragma synchronous = NORMAL")
_, _ = db.Exec("pragma temp_store = memory")
_, _ = db.Exec("pragma wal_checkpoint(full)")

5
core/data/messages.go

@ -16,11 +16,12 @@ func CreateMessagesTable(db *sql.DB) { @@ -16,11 +16,12 @@ func CreateMessagesTable(db *sql.DB) {
"hidden_at" DATETIME,
"timestamp" DATETIME,
PRIMARY KEY (id)
);CREATE INDEX index ON messages (id, user_id, hidden_at, timestamp);
);CREATE UNIQUE INDEX index ON messages (id, user_id, hidden_at, timestamp);
CREATE INDEX id ON messages (id);
CREATE INDEX user_id ON messages (user_id);
CREATE INDEX hidden_at ON messages (hidden_at);
CREATE INDEX timestamp ON messages (timestamp);`
CREATE INDEX timestamp ON messages (timestamp);
CREATE UNIQUE INDEX userid_hidden ON messages (user_id, hidden_at);`
stmt, err := db.Prepare(createTableSQL)
if err != nil {

13
core/data/persistence.go

@ -13,9 +13,9 @@ import ( @@ -13,9 +13,9 @@ import (
// Datastore is the global key/value store for configuration values.
type Datastore struct {
DB *sql.DB
cache map[string][]byte
DbLock *sync.Mutex
DB *sql.DB
//cache map[string][]byte
cache sync.Map
}
func (ds *Datastore) warmCache() {
@ -33,7 +33,7 @@ func (ds *Datastore) warmCache() { @@ -33,7 +33,7 @@ func (ds *Datastore) warmCache() {
if err := res.Scan(&rowKey, &rowValue); err != nil {
log.Errorln("error pre-caching config row", err)
}
ds.cache[rowKey] = rowValue
ds.cache.Store(rowKey, rowValue)
}
}
@ -65,9 +65,6 @@ func (ds *Datastore) Get(key string) (ConfigEntry, error) { @@ -65,9 +65,6 @@ func (ds *Datastore) Get(key string) (ConfigEntry, error) {
// Save will save the ConfigEntry to the database.
func (ds *Datastore) Save(e ConfigEntry) error {
ds.DbLock.Lock()
defer ds.DbLock.Unlock()
var dataGob bytes.Buffer
enc := gob.NewEncoder(&dataGob)
if err := enc.Encode(e.Value); err != nil {
@ -101,9 +98,7 @@ func (ds *Datastore) Save(e ConfigEntry) error { @@ -101,9 +98,7 @@ func (ds *Datastore) Save(e ConfigEntry) error {
// Setup will create the datastore table and perform initial initialization.
func (ds *Datastore) Setup() {
ds.cache = make(map[string][]byte)
ds.DB = GetDatabase()
ds.DbLock = &sync.Mutex{}
createTableSQL := `CREATE TABLE IF NOT EXISTS datastore (
"key" string NOT NULL PRIMARY KEY,

6
core/user/externalAPIUser.go

@ -44,9 +44,6 @@ var validAccessTokenScopes = []string{ @@ -44,9 +44,6 @@ var validAccessTokenScopes = []string{
func InsertExternalAPIUser(token string, name string, color int, scopes []string) error {
log.Traceln("Adding new API user:", name)
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
scopesString := strings.Join(scopes, ",")
id := shortid.MustGenerate()
@ -76,9 +73,6 @@ func InsertExternalAPIUser(token string, name string, color int, scopes []string @@ -76,9 +73,6 @@ func InsertExternalAPIUser(token string, name string, color int, scopes []string
func DeleteExternalAPIUser(token string) error {
log.Traceln("Deleting access token:", token)
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
tx, err := _datastore.DB.Begin()
if err != nil {
return err

44
core/user/user.go

@ -71,9 +71,6 @@ func CreateAnonymousUser(username string) (*User, error) { @@ -71,9 +71,6 @@ func CreateAnonymousUser(username string) (*User, error) {
// ChangeUsername will change the user associated to userID from one display name to another.
func ChangeUsername(userID string, username string) {
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
tx, err := _datastore.DB.Begin()
if err != nil {
@ -103,9 +100,6 @@ func ChangeUsername(userID string, username string) { @@ -103,9 +100,6 @@ func ChangeUsername(userID string, username string) {
}
func create(user *User) error {
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
tx, err := _datastore.DB.Begin()
if err != nil {
log.Debugln(err)
@ -131,9 +125,6 @@ func create(user *User) error { @@ -131,9 +125,6 @@ func create(user *User) error {
// SetEnabled will will set the enabled flag on a single user assigned to userID.
func SetEnabled(userID string, enabled bool) error {
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
tx, err := _datastore.DB.Begin()
if err != nil {
return err
@ -161,24 +152,39 @@ func SetEnabled(userID string, enabled bool) error { @@ -161,24 +152,39 @@ func SetEnabled(userID string, enabled bool) error {
return tx.Commit()
}
var _getUserByTokenQuery *sql.Stmt
// GetUserByToken will return a user by an access token.
func GetUserByToken(token string) *User {
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
if _getUserByTokenQuery == nil {
query := "SELECT id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM users WHERE access_token = ?"
q, err := _datastore.DB.Prepare(query)
if err != nil {
return nil
}
_getUserByTokenQuery = q
}
query := "SELECT id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM users WHERE access_token = ?"
row := _datastore.DB.QueryRow(query, token)
row := _getUserByTokenQuery.QueryRow(token)
return getUserFromRow(row)
u := getUserFromRow(row)
return u
}
var _getUserByIdQuery *sql.Stmt
// GetUserByID will return a user by a user ID.
func GetUserByID(id string) *User {
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
query := "SELECT id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM users WHERE id = ?"
row := _datastore.DB.QueryRow(query, id)
if _getUserByIdQuery == nil {
query := "SELECT id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM users WHERE id = ?"
q, err := _datastore.DB.Prepare(query)
if err != nil {
return nil
}
_getUserByIdQuery = q
}
row := _getUserByIdQuery.QueryRow(id)
if row == nil {
log.Errorln(row)
return nil

2
go.mod

@ -38,6 +38,6 @@ require ( @@ -38,6 +38,6 @@ require (
github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect
golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 // indirect
golang.org/x/sys v0.0.0-20210902050250-f475640dd07b // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)

4
go.sum

@ -90,8 +90,8 @@ golang.org/x/sys v0.0.0-20191224085550-c709ea063b76/go.mod h1:h1NjWce9XRLGQEsW7w @@ -90,8 +90,8 @@ golang.org/x/sys v0.0.0-20191224085550-c709ea063b76/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 h1:hZR0X1kPW+nwyJ9xRxqZk1vx5RUObAPBdKVvXPDUH/E=
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210902050250-f475640dd07b h1:S7hKs0Flbq0bbc9xgYt4stIEG1zNDFqyrPwAX2Wj/sE=
golang.org/x/sys v0.0.0-20210902050250-f475640dd07b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=

38
test/load/new/README.md

@ -0,0 +1,38 @@ @@ -0,0 +1,38 @@
# Owncast Load Testing
Load testing is an important tool to surface bugs, race conditions, and to determine the overall server performance of Owncast. The primary goal is to test the server components, not so much the front-end, as the performance of the browser and the client components may be variable. While the test aims to push a bunch of traffic through the backend it's possible the frontend may not be able to handle it all. Working on the performance of the frontend is a goal that should be treated separately from what the backend load tests are designed to do.
## What it will test
The test aims to reproduce the same requests and actions performed by a normal user joining an Owncast session, but at a rate that's faster than most normal environments.
## This includes
1. Downloads the configuration.
1. Registering as a brand new chat user.
1. Fetches the chat history.
1. Connects to the chat websocket and sends messages.
1. Access the viewer count `ping` endpoint to count this virtual user as a viewer.
1. Fetches the current status.
## Setup your environment
1. Install [k6](https://k6.io/open-source) by following [the instructions for your local machine](https://k6.io/docs/getting-started/installation/).
1. Start Owncast on your local machine, listening on `localhost:8080`.
## Run the tests
1. To monitor the concurrent chat users open the admin chat user's page at http://localhost:8080/admin/chat/users/.
1. To monitor the concurrent "viewers" open the admin viewers page at http://localhost:8080/admin/viewer-info/.
1. Begin the test suite by running `k6 run test.js`.
## troubleshooting
If you receive the error `ERRO[0080] dial tcp 127.0.0.1:8080: socket: too many open files` it means your OS is not configured to have enough concurrent sockets open to perform the level of testing the test is trying to accomplish.
Using [ulimit](https://www.learnitguide.net/2015/07/how-to-increase-ulimit-values-in-linux.html) you can adjust this value.
Run `ulimit -n` and see what you're currently set at. The default is likely `1024`, meaning your system can only open 1024 resources (files or sockets or anything) at the same time. If you run `ulimit -Hn` you can get the _hard limit_ of your system. So you can adjust your limit to be something between where you're at now and your hard limit. `ulimit -n 10000` for example.
As a side note, Owncast automatically increases its own limit when you run the Owncast service to make it less likely that your Owncast server will hit this limit.

7
test/load/new/run.sh

@ -0,0 +1,7 @@ @@ -0,0 +1,7 @@
#!/bin/sh
# Enable more open files
ulimit -n 20000
# Run the test under k6
k6 run test.js

108
test/load/new/test.js

@ -0,0 +1,108 @@ @@ -0,0 +1,108 @@
import http from 'k6/http';
import ws from 'k6/ws';
import { check, sleep } from 'k6';
import { randomIntBetween, randomString } from "https://jslib.k6.io/k6-utils/1.0.0/index.js";
const baseUserAgent = 'Owncast LoadTest/1.0';
function randomNumber() {
return randomIntBetween(1, 10000);
}
function randomSleep() {
sleep(randomIntBetween(10, 60));
}
function pingViewerAPI(params) {
const response = http.get('http://localhost:8080/api/ping', params);
check(response, { 'status was 200': (r) => r.status == 200 });
}
function fetchHLSPlaylist(params) {
const response = http.get('http://localhost:8080/hls/0/stream.m3u8', params);
check(response, { 'status was 200': (r) => r.status == 200 });
}
function fetchChatHistory(accessToken) {
const response = http.get(
`http://localhost:8080/api/chat?accessToken=${accessToken}`
);
check(response, { 'status was 200': (r) => r.status == 200 });
}
function registerUser() {
const response = http.post('http://localhost:8080/api/chat/register');
check(response, { 'status was 200': (r) => r.status == 200 });
const accessToken = response.json('accessToken');
return accessToken;
}
function connectToChat(accessToken, params) {
// Connect to the chat server via web socket.
var wsResponse = ws.connect(
`ws://127.0.0.1:8080/ws?accessToken=${accessToken}`,
params,
function (socket) {
socket.on('open', function (data) {
const testMessage = {
body: `${randomString(randomIntBetween(10, 1000))} ${randomNumber()}`,
type: 'CHAT',
};
randomSleep(); // After a user joins they wait to send a message
socket.send(JSON.stringify(testMessage));
randomSleep(); // The user waits after sending a message to leave.
socket.close();
});
}
);
check(wsResponse, { 'status was 200': (r) => r.status == 200 });
}
export default function () {
// Fake the user-agent so the server side mapping to agent+ip
// sees each user as unique.
var params = {
headers: {
'User-Agent': baseUserAgent + ' ' + randomNumber(),
},
};
const accessToken = registerUser(params);
// Fetch chat history once you have an access token.
fetchChatHistory(accessToken);
// A client hits the ping API every once in a while to
// keep track of the number of viewers. So we can emulate
// that.
pingViewerAPI(params);
// Emulate loading the master HLS playlist
fetchHLSPlaylist(params);
// Register as a new chat user and connect.
connectToChat(accessToken, params);
sleep(8); // Emulate the ping timer on the client.
pingViewerAPI(params);
}
export let options = {
userAgent: baseUserAgent,
scenarios: {
loadstages: {
executor: 'ramping-vus',
startVUs: 0,
gracefulStop: '120s',
stages: [
{ duration: '10s', target: 20 },
{ duration: '120s', target: 1000 },
{ duration: '300s', target: 4000 },
],
gracefulRampDown: '10s',
},
},
};

2
webroot/js/utils/websocket.js

@ -163,7 +163,7 @@ export default class Websocket { @@ -163,7 +163,7 @@ export default class Websocket {
console.error(e, e.data);
return;
}
if (!model.type) {
console.error('No type provided', model);
return;

2
webroot/js/web_modules/@videojs/http-streaming/dist/videojs-http-streaming.min.js vendored

@ -2835,4 +2835,4 @@ var e; @@ -2835,4 +2835,4 @@ var e;
var videojsHttpStreaming_min$1 = /*@__PURE__*/getDefaultExportFromCjs(videojsHttpStreaming_min);
export default videojsHttpStreaming_min$1;
export { videojsHttpStreaming_min$1 as default };

2
webroot/js/web_modules/htm.js vendored

@ -1,3 +1,3 @@ @@ -1,3 +1,3 @@
var n=function(t,s,r,e){var u;s[0]=0;for(var h=1;h<s.length;h++){var p=s[h++],a=s[h]?(s[0]|=p?1:2,r[s[h++]]):s[++h];3===p?e[0]=a:4===p?e[1]=Object.assign(e[1]||{},a):5===p?(e[1]=e[1]||{})[s[++h]]=a:6===p?e[1][s[++h]]+=a+"":p?(u=t.apply(a,n(t,a,r,["",null])),e.push(u),a[0]?s[0]|=2:(s[h-2]=0,s[h]=u)):e.push(a);}return e},t=new Map;function htm_module(s){var r=t.get(this);return r||(r=new Map,t.set(this,r)),(r=n(this,r.get(s)||(r.set(s,r=function(n){for(var t,s,r=1,e="",u="",h=[0],p=function(n){1===r&&(n||(e=e.replace(/^\s*\n\s*|\s*\n\s*$/g,"")))?h.push(0,n,e):3===r&&(n||e)?(h.push(3,n,e),r=2):2===r&&"..."===e&&n?h.push(4,n,0):2===r&&e&&!n?h.push(5,0,!0,e):r>=5&&((e||!n&&5===r)&&(h.push(r,0,e,s),r=6),n&&(h.push(r,n,0,s),r=6)),e="";},a=0;a<n.length;a++){a&&(1===r&&p(),p(a));for(var l=0;l<n[a].length;l++)t=n[a][l],1===r?"<"===t?(p(),h=[h],r=3):e+=t:4===r?"--"===e&&">"===t?(r=1,e=""):e=t+e[0]:u?t===u?u="":e+=t:'"'===t||"'"===t?u=t:">"===t?(p(),r=1):r&&("="===t?(r=5,s=e,e=""):"/"===t&&(r<5||">"===n[a][l+1])?(p(),3===r&&(h=h[0]),r=h,(h=h[0]).push(2,0,r),r=0):" "===t||"\t"===t||"\n"===t||"\r"===t?(p(),r=2):e+=t),3===r&&"!--"===e&&(r=4,h=h[0]);}return p(),h}(s)),r),arguments,[])).length>1?r:r[0]}
export default htm_module;
export { htm_module as default };

4
webroot/js/web_modules/markjs/dist/mark.es6.min.js vendored

File diff suppressed because one or more lines are too long

2
webroot/js/web_modules/micromodal/dist/micromodal.min.js vendored

File diff suppressed because one or more lines are too long

2
webroot/js/web_modules/videojs/core.js vendored

@ -30771,4 +30771,4 @@ videojs.addLanguage('en', { @@ -30771,4 +30771,4 @@ videojs.addLanguage('en', {
var core = videojs;
export default core;
export { core as default };

Loading…
Cancel
Save