Compare commits

...

9 Commits

  1. 1
      controllers/controllers.go
  2. 108
      core/chat/persistence.go
  3. 3
      core/data/defaults.go
  4. 1
      core/data/persistence.go
  5. 41
      core/user/user.go
  6. 38
      test/load/new/README.md
  7. 7
      test/load/new/run.sh
  8. 111
      test/load/new/test.js

1
controllers/controllers.go

@ -16,7 +16,6 @@ func InternalErrorHandler(w http.ResponseWriter, err error) {
return return
} }
w.WriteHeader(http.StatusInternalServerError)
if err := json.NewEncoder(w).Encode(j{"error": err.Error()}); err != nil { if err := json.NewEncoder(w).Encode(j{"error": err.Error()}); err != nil {
InternalErrorHandler(w, err) InternalErrorHandler(w, err)
} }

108
core/chat/persistence.go

@ -1,6 +1,7 @@
package chat package chat
import ( import (
"database/sql"
"fmt" "fmt"
"strings" "strings"
"time" "time"
@ -9,6 +10,7 @@ import (
"github.com/owncast/owncast/core/data" "github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/core/user" "github.com/owncast/owncast/core/user"
"github.com/owncast/owncast/models" "github.com/owncast/owncast/models"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
@ -19,6 +21,13 @@ const (
maxBacklogNumber = 50 // Return max number of messages in history request maxBacklogNumber = 50 // Return max number of messages in history request
) )
var (
_chatHistoryCache *[]interface{}
_adminChatHistoryCache *[]interface{}
_cachedAdminChatHistoryStatement *sql.Stmt
_cachedSaveEventStatement *sql.Stmt
)
func setupPersistence() { func setupPersistence() {
_datastore = data.GetDatastore() _datastore = data.GetDatastore()
data.CreateMessagesTable(_datastore.DB) data.CreateMessagesTable(_datastore.DB)
@ -45,30 +54,23 @@ func saveFederatedAction(event events.FediverseEngagementEvent) {
// nolint: unparam // nolint: unparam
func saveEvent(id string, userID *string, body string, eventType string, hidden *time.Time, timestamp time.Time, image *string, link *string, title *string, subtitle *string) { func saveEvent(id string, userID *string, body string, eventType string, hidden *time.Time, timestamp time.Time, image *string, link *string, title *string, subtitle *string) {
defer func() { defer func() {
_historyCache = nil _adminChatHistoryCache = nil
_chatHistoryCache = nil
}() }()
tx, err := _datastore.DB.Begin() _datastore.DbLock.Lock()
if err != nil { defer _datastore.DbLock.Unlock()
log.Errorln("error saving", eventType, err)
return
}
defer tx.Rollback() // nolint
stmt, err := tx.Prepare("INSERT INTO messages(id, user_id, body, eventType, hidden_at, timestamp, image, link, title, subtitle) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)") if _cachedSaveEventStatement == nil {
if err != nil { stmt, err := _datastore.DB.Prepare("INSERT INTO messages(id, user_id, body, eventType, hidden_at, timestamp, image, link, title, subtitle) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
log.Errorln("error saving", eventType, err) if err != nil {
return log.Errorln("error preparing save event statement", err)
return
}
_cachedSaveEventStatement = stmt
} }
defer stmt.Close() if _, err := _cachedSaveEventStatement.Exec(id, userID, body, eventType, hidden, timestamp, image, link, title, subtitle); err != nil {
if _, err = stmt.Exec(id, userID, body, eventType, hidden, timestamp, image, link, title, subtitle); err != nil {
log.Errorln("error saving", eventType, err)
return
}
if err = tx.Commit(); err != nil {
log.Errorln("error saving", eventType, err) log.Errorln("error saving", eventType, err)
return return
} }
@ -207,15 +209,19 @@ type rowData struct {
userType *string userType *string
} }
func getChat(query string) []interface{} { func getChat(query *sql.Stmt) []interface{} {
history := make([]interface{}, 0) history := make([]interface{}, 0)
rows, err := _datastore.DB.Query(query) rows, err := query.Query()
if err != nil || rows.Err() != nil { if err != nil || rows.Err() != nil {
log.Errorln("error fetching chat history", err) log.Errorln("error fetching chat history", err)
return history return history
} }
defer rows.Close() defer rows.Close()
rowDatas := []rowData{}
for rows.Next() { for rows.Next() {
row := rowData{} row := rowData{}
@ -246,6 +252,10 @@ func getChat(query string) []interface{} {
break break
} }
rowDatas = append(rowDatas, row)
}
for _, row := range rowDatas {
var message interface{} var message interface{}
switch row.eventType { switch row.eventType {
@ -269,34 +279,50 @@ func getChat(query string) []interface{} {
return history return history
} }
var _historyCache *[]interface{}
// GetChatModerationHistory will return all the chat messages suitable for moderation purposes. // GetChatModerationHistory will return all the chat messages suitable for moderation purposes.
func GetChatModerationHistory() []interface{} { func GetChatModerationHistory() []interface{} {
if _historyCache != nil { if _adminChatHistoryCache != nil {
return *_historyCache return *_adminChatHistoryCache
}
if _cachedAdminChatHistoryStatement == nil {
stmt, err := _datastore.DB.Prepare(`SELECT messages.id, user_id, body, title, subtitle, image, link, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, scopes, users.type FROM messages INNER JOIN users ON messages.user_id = users.id ORDER BY timestamp DESC`)
if err != nil {
log.Errorln("error preparing chat moderation history statement", err)
return nil
}
_cachedAdminChatHistoryStatement = stmt
} }
// Get all messages regardless of visibility // Get all messages regardless of visibility
query := "SELECT messages.id, user_id, body, title, subtitle, image, link, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, authenticated_at, scopes, type FROM messages INNER JOIN users ON messages.user_id = users.id ORDER BY timestamp DESC" result := getChat(_cachedAdminChatHistoryStatement)
result := getChat(query)
_historyCache = &result _adminChatHistoryCache = &result
return result return result
} }
var cachedChatHistoryStatement *sql.Stmt
// GetChatHistory will return all the chat messages suitable for returning as user-facing chat history. // GetChatHistory will return all the chat messages suitable for returning as user-facing chat history.
func GetChatHistory() []interface{} { func GetChatHistory() []interface{} {
// Get all visible messages if _chatHistoryCache != nil {
query := fmt.Sprintf("SELECT messages.id,messages.user_id, messages.body, messages.title, messages.subtitle, messages.image, messages.link, messages.eventType, messages.hidden_at, messages.timestamp, users.display_name, users.display_color, users.created_at, users.disabled_at, users.previous_names, users.namechanged_at, users.authenticated_at, users.scopes, users.type FROM messages LEFT JOIN users ON messages.user_id = users.id WHERE hidden_at IS NULL AND disabled_at IS NULL ORDER BY timestamp DESC LIMIT %d", maxBacklogNumber) return *_chatHistoryCache
m := getChat(query) }
// Invert order of messages if cachedChatHistoryStatement == nil {
for i, j := 0, len(m)-1; i < j; i, j = i+1, j-1 { stmt, err := _datastore.DB.Prepare(fmt.Sprintf("SELECT messages.id, messages.user_id, messages.body, messages.title, messages.subtitle, messages.image, messages.link, messages.eventType, messages.hidden_at, messages.timestamp, users.display_name, users.display_color, users.created_at, users.disabled_at, users.previous_names, users.namechanged_at, users.scopes, users.type FROM users JOIN messages ON users.id = messages.user_id WHERE hidden_at IS NULL AND disabled_at IS NULL ORDER BY timestamp DESC LIMIT %d", maxBacklogNumber))
m[i], m[j] = m[j], m[i] if err != nil {
log.Errorln("error preparing chat history statement", err)
return nil
}
cachedChatHistoryStatement = stmt
} }
// Get all visible messages
m := getChat(cachedChatHistoryStatement)
_chatHistoryCache = &m
return m return m
} }
@ -304,13 +330,18 @@ func GetChatHistory() []interface{} {
// and then send out visibility changed events to chat clients. // and then send out visibility changed events to chat clients.
func SetMessageVisibilityForUserID(userID string, visible bool) error { func SetMessageVisibilityForUserID(userID string, visible bool) error {
defer func() { defer func() {
_historyCache = nil _adminChatHistoryCache = nil
_chatHistoryCache = nil
}() }()
// Get a list of IDs to send to the connected clients to hide // Get a list of IDs to send to the connected clients to hide
ids := make([]string, 0) ids := make([]string, 0)
query := fmt.Sprintf("SELECT messages.id, user_id, body, title, subtitle, image, link, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, authenticated, scopes, type FROM messages INNER JOIN users ON messages.user_id = users.id WHERE user_id IS '%s'", userID) query := fmt.Sprintf("SELECT messages.id, user_id, body, title, subtitle, image, link, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, scopes, type FROM messages INNER JOIN users ON messages.user_id = users.id WHERE user_id IS '%s'", userID)
messages := getChat(query) stmt, err := _datastore.DB.Prepare(query)
if err != nil {
return errors.Wrap(err, "error preparing chat history statement when setting message visibility")
}
messages := getChat(stmt)
if len(messages) == 0 { if len(messages) == 0 {
return nil return nil
@ -326,7 +357,8 @@ func SetMessageVisibilityForUserID(userID string, visible bool) error {
func saveMessageVisibility(messageIDs []string, visible bool) error { func saveMessageVisibility(messageIDs []string, visible bool) error {
defer func() { defer func() {
_historyCache = nil _adminChatHistoryCache = nil
_chatHistoryCache = nil
}() }()
_datastore.DbLock.Lock() _datastore.DbLock.Lock()

3
core/data/defaults.go

@ -24,6 +24,8 @@ func hasPopulatedFederationDefaults() bool {
// PopulateDefaults will set default values in the database. // PopulateDefaults will set default values in the database.
func PopulateDefaults() { func PopulateDefaults() {
_datastore.warmCache()
defaults := config.GetDefaults() defaults := config.GetDefaults()
if HasPopulatedDefaults() { if HasPopulatedDefaults() {
@ -48,6 +50,5 @@ func PopulateDefaults() {
}, },
}) })
_datastore.warmCache()
_ = _datastore.SetBool("HAS_POPULATED_DEFAULTS", true) _ = _datastore.SetBool("HAS_POPULATED_DEFAULTS", true)
} }

1
core/data/persistence.go

@ -67,6 +67,7 @@ func (ds *Datastore) Get(key string) (ConfigEntry, error) {
Key: resultKey, Key: resultKey,
Value: resultValue, Value: resultValue,
} }
ds.SetCachedValue(resultKey, resultValue)
return result, nil return result, nil
} }

41
core/user/user.go

@ -17,7 +17,10 @@ import (
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
) )
var _datastore *data.Datastore var (
_datastore *data.Datastore
_cachedGetUserByTokenStatement *sql.Stmt
)
const ( const (
moderatorScopeKey = "MODERATOR" moderatorScopeKey = "MODERATOR"
@ -132,31 +135,26 @@ func addAccessTokenForUser(accessToken, userID string) error {
}) })
} }
var createUserStatement *sql.Stmt
func create(user *User) error { func create(user *User) error {
_datastore.DbLock.Lock() _datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock() defer _datastore.DbLock.Unlock()
tx, err := _datastore.DB.Begin() if createUserStatement == nil {
if err != nil { stmt, err := _datastore.DB.Prepare("INSERT INTO users(id, display_name, display_color, previous_names, created_at) values(?, ?, ?, ?, ?, ?)")
log.Debugln(err) if err != nil {
} return errors.Wrap(err, "error preparing create user statement")
defer func() { }
_ = tx.Rollback() createUserStatement = stmt
}()
stmt, err := tx.Prepare("INSERT INTO users(id, display_name, display_color, previous_names, created_at) values(?, ?, ?, ?, ?)")
if err != nil {
log.Debugln(err)
} }
defer stmt.Close()
_, err = stmt.Exec(user.ID, user.DisplayName, user.DisplayColor, user.DisplayName, user.CreatedAt) _, err := createUserStatement.Exec(user.ID, user.DisplayName, user.DisplayColor, user.DisplayName, user.CreatedAt)
if err != nil { if err != nil {
log.Errorln("error creating new user", err) return errors.Wrap(err, "error creating user")
return err
} }
return tx.Commit() return nil
} }
// SetEnabled will set the enabled status of a single user by ID. // SetEnabled will set the enabled status of a single user by ID.
@ -198,6 +196,15 @@ func GetUserByToken(token string) *User {
return nil return nil
} }
if _cachedGetUserByTokenStatement == nil {
stmt, err := _datastore.DB.Prepare("SELECT id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, scopes FROM users WHERE access_token = ?")
if err != nil {
log.Errorln(err)
return nil
}
_cachedGetUserByTokenStatement = stmt
}
var scopes []string var scopes []string
if u.Scopes.Valid { if u.Scopes.Valid {
scopes = strings.Split(u.Scopes.String, ",") scopes = strings.Split(u.Scopes.String, ",")

38
test/load/new/README.md

@ -0,0 +1,38 @@
# Owncast Load Testing
Load testing is an important tool to surface bugs, race conditions, and to determine the overall server performance of Owncast. The primary goal is to test the server components, not so much the front-end, as the performance of the browser and the client components may be variable. While the test aims to push a bunch of traffic through the backend it's possible the frontend may not be able to handle it all. Working on the performance of the frontend is a goal that should be treated separately from what the backend load tests are designed to do.
## What it will test
The test aims to reproduce the same requests and actions performed by a normal user joining an Owncast session, but at a rate that's faster than most normal environments.
## This includes
1. Downloads the configuration.
1. Registering as a brand new chat user.
1. Fetches the chat history.
1. Connects to the chat websocket and sends messages.
1. Access the viewer count `ping` endpoint to count this virtual user as a viewer.
1. Fetches the current status.
## Setup your environment
1. Install [k6](https://k6.io/open-source) by following [the instructions for your local machine](https://k6.io/docs/getting-started/installation/).
1. Start Owncast on your local machine, listening on `localhost:8080`.
## Run the tests
1. To monitor the concurrent chat users open the admin chat user's page at http://localhost:8080/admin/chat/users/.
1. To monitor the concurrent "viewers" open the admin viewers page at http://localhost:8080/admin/viewer-info/.
1. Begin the test suite by running `k6 run test.js`.
## troubleshooting
If you receive the error `ERRO[0080] dial tcp 127.0.0.1:8080: socket: too many open files` it means your OS is not configured to have enough concurrent sockets open to perform the level of testing the test is trying to accomplish.
Using [ulimit](https://www.learnitguide.net/2015/07/how-to-increase-ulimit-values-in-linux.html) you can adjust this value.
Run `ulimit -n` and see what you're currently set at. The default is likely `1024`, meaning your system can only open 1024 resources (files or sockets or anything) at the same time. If you run `ulimit -Hn` you can get the _hard limit_ of your system. So you can adjust your limit to be something between where you're at now and your hard limit. `ulimit -n 10000` for example.
As a side note, Owncast automatically increases its own limit when you run the Owncast service to make it less likely that your Owncast server will hit this limit.

7
test/load/new/run.sh

@ -0,0 +1,7 @@
#!/bin/sh
# Enable more open files
ulimit -n 20000
# Run the test under k6
k6 run test.js

111
test/load/new/test.js

@ -0,0 +1,111 @@
import http from 'k6/http';
import ws from 'k6/ws';
import { check, sleep } from 'k6';
import {
randomIntBetween,
randomString,
} from 'https://jslib.k6.io/k6-utils/1.0.0/index.js';
const baseUserAgent = 'Owncast LoadTest/1.0';
function randomNumber() {
return randomIntBetween(1, 10000);
}
function pingViewerAPI(params) {
const response = http.get('http://localhost:8080/api/ping', params);
check(response, { 'status was 200': (r) => r.status == 200 });
}
function fetchStatusAPI(params) {
const response = http.get('http://localhost:8080/api/status', params);
check(response, { 'status was 200': (r) => r.status == 200 });
}
function fetchHLSPlaylist(params) {
const response = http.get('http://localhost:8080/hls/0/stream.m3u8', params);
check(response, { 'status was 200': (r) => r.status == 200 });
}
function fetchChatHistory(accessToken) {
const response = http.get(
`http://localhost:8080/api/chat?accessToken=${accessToken}`
);
check(response, { 'status was 200': (r) => r.status == 200 });
}
function registerUser() {
const response = http.post('http://localhost:8080/api/chat/register');
check(response, { 'status was 200': (r) => r.status == 200 });
const accessToken = response.json('accessToken');
return accessToken;
}
function connectToChat(accessToken, params, callback) {
// Connect to the chat server via web socket.
var chatSocket = ws.connect(
`ws://127.0.0.1:8080/ws?accessToken=${accessToken}`,
params,
callback
);
check(chatSocket, { 'status was 200': (r) => r.status == 200 });
return chatSocket;
}
export default function () {
// Fake the user-agent so the server side mapping to agent+ip
// sees each user as unique.
var params = {
headers: {
'User-Agent': baseUserAgent + ' ' + randomNumber(),
},
};
// Register a new chat user.
const accessToken = registerUser(params);
// Fetch chat history once you have an access token.
fetchChatHistory(accessToken);
// Connect to websocket and send messages
const callback = (chatSocket) => {
chatSocket.on('open', function (data) {
const testMessage = {
body: `${randomString(randomIntBetween(10, 1000))} ${randomNumber()}`,
type: 'CHAT',
};
chatSocket.send(JSON.stringify(testMessage));
sleep(4);
chatSocket.send(JSON.stringify(testMessage));
sleep(4);
chatSocket.send(JSON.stringify(testMessage));
sleep(4);
chatSocket.close();
});
};
connectToChat(accessToken, params, callback);
// Emulate a user playing back video and hitting the ping api.
pingViewerAPI(params);
fetchHLSPlaylist(params);
fetchStatusAPI(params);
}
export let options = {
userAgent: baseUserAgent,
scenarios: {
loadstages: {
executor: 'ramping-vus',
gracefulStop: '10s',
stages: [
{ duration: '10s', target: 200 },
{ duration: '25s', target: 3000 },
{ duration: '10s', target: 0 },
],
},
},
};
Loading…
Cancel
Save