Compare commits

...

9 Commits

  1. 1
      controllers/controllers.go
  2. 108
      core/chat/persistence.go
  3. 3
      core/data/defaults.go
  4. 1
      core/data/persistence.go
  5. 41
      core/user/user.go
  6. 38
      test/load/new/README.md
  7. 7
      test/load/new/run.sh
  8. 111
      test/load/new/test.js

1
controllers/controllers.go

@ -16,7 +16,6 @@ func InternalErrorHandler(w http.ResponseWriter, err error) { @@ -16,7 +16,6 @@ func InternalErrorHandler(w http.ResponseWriter, err error) {
return
}
w.WriteHeader(http.StatusInternalServerError)
if err := json.NewEncoder(w).Encode(j{"error": err.Error()}); err != nil {
InternalErrorHandler(w, err)
}

108
core/chat/persistence.go

@ -1,6 +1,7 @@ @@ -1,6 +1,7 @@
package chat
import (
"database/sql"
"fmt"
"strings"
"time"
@ -9,6 +10,7 @@ import ( @@ -9,6 +10,7 @@ import (
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/core/user"
"github.com/owncast/owncast/models"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
@ -19,6 +21,13 @@ const ( @@ -19,6 +21,13 @@ const (
maxBacklogNumber = 50 // Return max number of messages in history request
)
var (
_chatHistoryCache *[]interface{}
_adminChatHistoryCache *[]interface{}
_cachedAdminChatHistoryStatement *sql.Stmt
_cachedSaveEventStatement *sql.Stmt
)
func setupPersistence() {
_datastore = data.GetDatastore()
data.CreateMessagesTable(_datastore.DB)
@ -45,30 +54,23 @@ func saveFederatedAction(event events.FediverseEngagementEvent) { @@ -45,30 +54,23 @@ func saveFederatedAction(event events.FediverseEngagementEvent) {
// nolint: unparam
func saveEvent(id string, userID *string, body string, eventType string, hidden *time.Time, timestamp time.Time, image *string, link *string, title *string, subtitle *string) {
defer func() {
_historyCache = nil
_adminChatHistoryCache = nil
_chatHistoryCache = nil
}()
tx, err := _datastore.DB.Begin()
if err != nil {
log.Errorln("error saving", eventType, err)
return
}
defer tx.Rollback() // nolint
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
stmt, err := tx.Prepare("INSERT INTO messages(id, user_id, body, eventType, hidden_at, timestamp, image, link, title, subtitle) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
if err != nil {
log.Errorln("error saving", eventType, err)
return
if _cachedSaveEventStatement == nil {
stmt, err := _datastore.DB.Prepare("INSERT INTO messages(id, user_id, body, eventType, hidden_at, timestamp, image, link, title, subtitle) values(?, ?, ?, ?, ?, ?, ?, ?, ?, ?)")
if err != nil {
log.Errorln("error preparing save event statement", err)
return
}
_cachedSaveEventStatement = stmt
}
defer stmt.Close()
if _, err = stmt.Exec(id, userID, body, eventType, hidden, timestamp, image, link, title, subtitle); err != nil {
log.Errorln("error saving", eventType, err)
return
}
if err = tx.Commit(); err != nil {
if _, err := _cachedSaveEventStatement.Exec(id, userID, body, eventType, hidden, timestamp, image, link, title, subtitle); err != nil {
log.Errorln("error saving", eventType, err)
return
}
@ -207,15 +209,19 @@ type rowData struct { @@ -207,15 +209,19 @@ type rowData struct {
userType *string
}
func getChat(query string) []interface{} {
func getChat(query *sql.Stmt) []interface{} {
history := make([]interface{}, 0)
rows, err := _datastore.DB.Query(query)
rows, err := query.Query()
if err != nil || rows.Err() != nil {
log.Errorln("error fetching chat history", err)
return history
}
defer rows.Close()
rowDatas := []rowData{}
for rows.Next() {
row := rowData{}
@ -246,6 +252,10 @@ func getChat(query string) []interface{} { @@ -246,6 +252,10 @@ func getChat(query string) []interface{} {
break
}
rowDatas = append(rowDatas, row)
}
for _, row := range rowDatas {
var message interface{}
switch row.eventType {
@ -269,34 +279,50 @@ func getChat(query string) []interface{} { @@ -269,34 +279,50 @@ func getChat(query string) []interface{} {
return history
}
var _historyCache *[]interface{}
// GetChatModerationHistory will return all the chat messages suitable for moderation purposes.
func GetChatModerationHistory() []interface{} {
if _historyCache != nil {
return *_historyCache
if _adminChatHistoryCache != nil {
return *_adminChatHistoryCache
}
if _cachedAdminChatHistoryStatement == nil {
stmt, err := _datastore.DB.Prepare(`SELECT messages.id, user_id, body, title, subtitle, image, link, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, scopes, users.type FROM messages INNER JOIN users ON messages.user_id = users.id ORDER BY timestamp DESC`)
if err != nil {
log.Errorln("error preparing chat moderation history statement", err)
return nil
}
_cachedAdminChatHistoryStatement = stmt
}
// Get all messages regardless of visibility
query := "SELECT messages.id, user_id, body, title, subtitle, image, link, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, authenticated_at, scopes, type FROM messages INNER JOIN users ON messages.user_id = users.id ORDER BY timestamp DESC"
result := getChat(query)
result := getChat(_cachedAdminChatHistoryStatement)
_historyCache = &result
_adminChatHistoryCache = &result
return result
}
var cachedChatHistoryStatement *sql.Stmt
// GetChatHistory will return all the chat messages suitable for returning as user-facing chat history.
func GetChatHistory() []interface{} {
// Get all visible messages
query := fmt.Sprintf("SELECT messages.id,messages.user_id, messages.body, messages.title, messages.subtitle, messages.image, messages.link, messages.eventType, messages.hidden_at, messages.timestamp, users.display_name, users.display_color, users.created_at, users.disabled_at, users.previous_names, users.namechanged_at, users.authenticated_at, users.scopes, users.type FROM messages LEFT JOIN users ON messages.user_id = users.id WHERE hidden_at IS NULL AND disabled_at IS NULL ORDER BY timestamp DESC LIMIT %d", maxBacklogNumber)
m := getChat(query)
if _chatHistoryCache != nil {
return *_chatHistoryCache
}
// Invert order of messages
for i, j := 0, len(m)-1; i < j; i, j = i+1, j-1 {
m[i], m[j] = m[j], m[i]
if cachedChatHistoryStatement == nil {
stmt, err := _datastore.DB.Prepare(fmt.Sprintf("SELECT messages.id, messages.user_id, messages.body, messages.title, messages.subtitle, messages.image, messages.link, messages.eventType, messages.hidden_at, messages.timestamp, users.display_name, users.display_color, users.created_at, users.disabled_at, users.previous_names, users.namechanged_at, users.scopes, users.type FROM users JOIN messages ON users.id = messages.user_id WHERE hidden_at IS NULL AND disabled_at IS NULL ORDER BY timestamp DESC LIMIT %d", maxBacklogNumber))
if err != nil {
log.Errorln("error preparing chat history statement", err)
return nil
}
cachedChatHistoryStatement = stmt
}
// Get all visible messages
m := getChat(cachedChatHistoryStatement)
_chatHistoryCache = &m
return m
}
@ -304,13 +330,18 @@ func GetChatHistory() []interface{} { @@ -304,13 +330,18 @@ func GetChatHistory() []interface{} {
// and then send out visibility changed events to chat clients.
func SetMessageVisibilityForUserID(userID string, visible bool) error {
defer func() {
_historyCache = nil
_adminChatHistoryCache = nil
_chatHistoryCache = nil
}()
// Get a list of IDs to send to the connected clients to hide
ids := make([]string, 0)
query := fmt.Sprintf("SELECT messages.id, user_id, body, title, subtitle, image, link, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, authenticated, scopes, type FROM messages INNER JOIN users ON messages.user_id = users.id WHERE user_id IS '%s'", userID)
messages := getChat(query)
query := fmt.Sprintf("SELECT messages.id, user_id, body, title, subtitle, image, link, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, scopes, type FROM messages INNER JOIN users ON messages.user_id = users.id WHERE user_id IS '%s'", userID)
stmt, err := _datastore.DB.Prepare(query)
if err != nil {
return errors.Wrap(err, "error preparing chat history statement when setting message visibility")
}
messages := getChat(stmt)
if len(messages) == 0 {
return nil
@ -326,7 +357,8 @@ func SetMessageVisibilityForUserID(userID string, visible bool) error { @@ -326,7 +357,8 @@ func SetMessageVisibilityForUserID(userID string, visible bool) error {
func saveMessageVisibility(messageIDs []string, visible bool) error {
defer func() {
_historyCache = nil
_adminChatHistoryCache = nil
_chatHistoryCache = nil
}()
_datastore.DbLock.Lock()

3
core/data/defaults.go

@ -24,6 +24,8 @@ func hasPopulatedFederationDefaults() bool { @@ -24,6 +24,8 @@ func hasPopulatedFederationDefaults() bool {
// PopulateDefaults will set default values in the database.
func PopulateDefaults() {
_datastore.warmCache()
defaults := config.GetDefaults()
if HasPopulatedDefaults() {
@ -48,6 +50,5 @@ func PopulateDefaults() { @@ -48,6 +50,5 @@ func PopulateDefaults() {
},
})
_datastore.warmCache()
_ = _datastore.SetBool("HAS_POPULATED_DEFAULTS", true)
}

1
core/data/persistence.go

@ -67,6 +67,7 @@ func (ds *Datastore) Get(key string) (ConfigEntry, error) { @@ -67,6 +67,7 @@ func (ds *Datastore) Get(key string) (ConfigEntry, error) {
Key: resultKey,
Value: resultValue,
}
ds.SetCachedValue(resultKey, resultValue)
return result, nil
}

41
core/user/user.go

@ -17,7 +17,10 @@ import ( @@ -17,7 +17,10 @@ import (
log "github.com/sirupsen/logrus"
)
var _datastore *data.Datastore
var (
_datastore *data.Datastore
_cachedGetUserByTokenStatement *sql.Stmt
)
const (
moderatorScopeKey = "MODERATOR"
@ -132,31 +135,26 @@ func addAccessTokenForUser(accessToken, userID string) error { @@ -132,31 +135,26 @@ func addAccessTokenForUser(accessToken, userID string) error {
})
}
var createUserStatement *sql.Stmt
func create(user *User) error {
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
tx, err := _datastore.DB.Begin()
if err != nil {
log.Debugln(err)
}
defer func() {
_ = tx.Rollback()
}()
stmt, err := tx.Prepare("INSERT INTO users(id, display_name, display_color, previous_names, created_at) values(?, ?, ?, ?, ?)")
if err != nil {
log.Debugln(err)
if createUserStatement == nil {
stmt, err := _datastore.DB.Prepare("INSERT INTO users(id, display_name, display_color, previous_names, created_at) values(?, ?, ?, ?, ?, ?)")
if err != nil {
return errors.Wrap(err, "error preparing create user statement")
}
createUserStatement = stmt
}
defer stmt.Close()
_, err = stmt.Exec(user.ID, user.DisplayName, user.DisplayColor, user.DisplayName, user.CreatedAt)
_, err := createUserStatement.Exec(user.ID, user.DisplayName, user.DisplayColor, user.DisplayName, user.CreatedAt)
if err != nil {
log.Errorln("error creating new user", err)
return err
return errors.Wrap(err, "error creating user")
}
return tx.Commit()
return nil
}
// SetEnabled will set the enabled status of a single user by ID.
@ -198,6 +196,15 @@ func GetUserByToken(token string) *User { @@ -198,6 +196,15 @@ func GetUserByToken(token string) *User {
return nil
}
if _cachedGetUserByTokenStatement == nil {
stmt, err := _datastore.DB.Prepare("SELECT id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at, scopes FROM users WHERE access_token = ?")
if err != nil {
log.Errorln(err)
return nil
}
_cachedGetUserByTokenStatement = stmt
}
var scopes []string
if u.Scopes.Valid {
scopes = strings.Split(u.Scopes.String, ",")

38
test/load/new/README.md

@ -0,0 +1,38 @@ @@ -0,0 +1,38 @@
# Owncast Load Testing
Load testing is an important tool to surface bugs, race conditions, and to determine the overall server performance of Owncast. The primary goal is to test the server components, not so much the front-end, as the performance of the browser and the client components may be variable. While the test aims to push a bunch of traffic through the backend it's possible the frontend may not be able to handle it all. Working on the performance of the frontend is a goal that should be treated separately from what the backend load tests are designed to do.
## What it will test
The test aims to reproduce the same requests and actions performed by a normal user joining an Owncast session, but at a rate that's faster than most normal environments.
## This includes
1. Downloads the configuration.
1. Registering as a brand new chat user.
1. Fetches the chat history.
1. Connects to the chat websocket and sends messages.
1. Access the viewer count `ping` endpoint to count this virtual user as a viewer.
1. Fetches the current status.
## Setup your environment
1. Install [k6](https://k6.io/open-source) by following [the instructions for your local machine](https://k6.io/docs/getting-started/installation/).
1. Start Owncast on your local machine, listening on `localhost:8080`.
## Run the tests
1. To monitor the concurrent chat users open the admin chat user's page at http://localhost:8080/admin/chat/users/.
1. To monitor the concurrent "viewers" open the admin viewers page at http://localhost:8080/admin/viewer-info/.
1. Begin the test suite by running `k6 run test.js`.
## troubleshooting
If you receive the error `ERRO[0080] dial tcp 127.0.0.1:8080: socket: too many open files` it means your OS is not configured to have enough concurrent sockets open to perform the level of testing the test is trying to accomplish.
Using [ulimit](https://www.learnitguide.net/2015/07/how-to-increase-ulimit-values-in-linux.html) you can adjust this value.
Run `ulimit -n` and see what you're currently set at. The default is likely `1024`, meaning your system can only open 1024 resources (files or sockets or anything) at the same time. If you run `ulimit -Hn` you can get the _hard limit_ of your system. So you can adjust your limit to be something between where you're at now and your hard limit. `ulimit -n 10000` for example.
As a side note, Owncast automatically increases its own limit when you run the Owncast service to make it less likely that your Owncast server will hit this limit.

7
test/load/new/run.sh

@ -0,0 +1,7 @@ @@ -0,0 +1,7 @@
#!/bin/sh
# Enable more open files
ulimit -n 20000
# Run the test under k6
k6 run test.js

111
test/load/new/test.js

@ -0,0 +1,111 @@ @@ -0,0 +1,111 @@
import http from 'k6/http';
import ws from 'k6/ws';
import { check, sleep } from 'k6';
import {
randomIntBetween,
randomString,
} from 'https://jslib.k6.io/k6-utils/1.0.0/index.js';
const baseUserAgent = 'Owncast LoadTest/1.0';
function randomNumber() {
return randomIntBetween(1, 10000);
}
function pingViewerAPI(params) {
const response = http.get('http://localhost:8080/api/ping', params);
check(response, { 'status was 200': (r) => r.status == 200 });
}
function fetchStatusAPI(params) {
const response = http.get('http://localhost:8080/api/status', params);
check(response, { 'status was 200': (r) => r.status == 200 });
}
function fetchHLSPlaylist(params) {
const response = http.get('http://localhost:8080/hls/0/stream.m3u8', params);
check(response, { 'status was 200': (r) => r.status == 200 });
}
function fetchChatHistory(accessToken) {
const response = http.get(
`http://localhost:8080/api/chat?accessToken=${accessToken}`
);
check(response, { 'status was 200': (r) => r.status == 200 });
}
function registerUser() {
const response = http.post('http://localhost:8080/api/chat/register');
check(response, { 'status was 200': (r) => r.status == 200 });
const accessToken = response.json('accessToken');
return accessToken;
}
function connectToChat(accessToken, params, callback) {
// Connect to the chat server via web socket.
var chatSocket = ws.connect(
`ws://127.0.0.1:8080/ws?accessToken=${accessToken}`,
params,
callback
);
check(chatSocket, { 'status was 200': (r) => r.status == 200 });
return chatSocket;
}
export default function () {
// Fake the user-agent so the server side mapping to agent+ip
// sees each user as unique.
var params = {
headers: {
'User-Agent': baseUserAgent + ' ' + randomNumber(),
},
};
// Register a new chat user.
const accessToken = registerUser(params);
// Fetch chat history once you have an access token.
fetchChatHistory(accessToken);
// Connect to websocket and send messages
const callback = (chatSocket) => {
chatSocket.on('open', function (data) {
const testMessage = {
body: `${randomString(randomIntBetween(10, 1000))} ${randomNumber()}`,
type: 'CHAT',
};
chatSocket.send(JSON.stringify(testMessage));
sleep(4);
chatSocket.send(JSON.stringify(testMessage));
sleep(4);
chatSocket.send(JSON.stringify(testMessage));
sleep(4);
chatSocket.close();
});
};
connectToChat(accessToken, params, callback);
// Emulate a user playing back video and hitting the ping api.
pingViewerAPI(params);
fetchHLSPlaylist(params);
fetchStatusAPI(params);
}
export let options = {
userAgent: baseUserAgent,
scenarios: {
loadstages: {
executor: 'ramping-vus',
gracefulStop: '10s',
stages: [
{ duration: '10s', target: 200 },
{ duration: '25s', target: 3000 },
{ duration: '10s', target: 0 },
],
},
},
};
Loading…
Cancel
Save