Browse Source

Misc performance improvements.

- Pre-built prepared query statements
- Remove DB mutex as it is not needed.
- Add mutex around chat client send chan.
- Replace maps in chat with sync.Maps and remove locks around them.
gek/new-load-tests
Gabe Kangas 4 years ago
parent
commit
17b4ad7a08
  1. 4
      controllers/hls.go
  2. 12
      core/chat/chatclient.go
  3. 46
      core/chat/persistence.go
  4. 15
      core/data/cache.go
  5. 2
      core/data/data.go
  6. 5
      core/data/messages.go
  7. 11
      core/data/persistence.go
  8. 6
      core/user/externalAPIUser.go
  9. 42
      core/user/user.go
  10. 2
      go.mod
  11. 4
      go.sum

4
controllers/hls.go

@ -8,7 +8,6 @@ import (
"strings" "strings"
"github.com/owncast/owncast/config" "github.com/owncast/owncast/config"
"github.com/owncast/owncast/core"
"github.com/owncast/owncast/core/data" "github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/router/middleware" "github.com/owncast/owncast/router/middleware"
"github.com/owncast/owncast/utils" "github.com/owncast/owncast/utils"
@ -39,8 +38,7 @@ func HandleHLSRequest(w http.ResponseWriter, r *http.Request) {
middleware.DisableCache(w) middleware.DisableCache(w)
// Use this as an opportunity to mark this viewer as active. // Use this as an opportunity to mark this viewer as active.
id := utils.GenerateClientIDFromRequest(r) Ping(w, r)
core.SetViewerIDActive(id)
} else { } else {
cacheTime := utils.GetCacheDurationSecondsForPath(relativePath) cacheTime := utils.GetCacheDurationSecondsForPath(relativePath)
w.Header().Set("Cache-Control", "public, max-age="+strconv.Itoa(cacheTime)) w.Header().Set("Cache-Control", "public, max-age="+strconv.Itoa(cacheTime))

12
core/chat/chatclient.go

@ -3,6 +3,7 @@ package chat
import ( import (
"bytes" "bytes"
"encoding/json" "encoding/json"
"sync"
"time" "time"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
@ -32,6 +33,7 @@ type Client struct {
MessageCount int `json:"messageCount"` MessageCount int `json:"messageCount"`
UserAgent string `json:"userAgent"` UserAgent string `json:"userAgent"`
ConnectedAt time.Time `json:"connectedAt"` ConnectedAt time.Time `json:"connectedAt"`
mu sync.Mutex
} }
type chatClientEvent struct { type chatClientEvent struct {
@ -173,10 +175,18 @@ func (c *Client) handleEvent(data []byte) {
} }
func (c *Client) close() { func (c *Client) close() {
defer func() {
if a := recover(); a != nil {
log.Println("RECOVER", a)
}
}()
log.Traceln("client closed:", c.User.DisplayName, c.id, c.ipAddress) log.Traceln("client closed:", c.User.DisplayName, c.id, c.ipAddress)
_ = c.conn.Close() _ = c.conn.Close()
c.server.unregister <- c.id c.server.unregister <- c.id
c.mu.Lock()
defer c.mu.Unlock()
if c.send != nil { if c.send != nil {
close(c.send) close(c.send)
c.send = nil c.send = nil
@ -212,6 +222,8 @@ func (c *Client) sendPayload(payload events.EventPayload) {
return return
} }
c.mu.Lock()
defer c.mu.Unlock()
if c.send != nil { if c.send != nil {
c.send <- data c.send <- data
} }

46
core/chat/persistence.go

@ -1,6 +1,7 @@
package chat package chat
import ( import (
"database/sql"
"fmt" "fmt"
"strings" "strings"
"time" "time"
@ -42,9 +43,6 @@ func saveEvent(id string, userID string, body string, eventType string, hidden *
_historyCache = nil _historyCache = nil
}() }()
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
tx, err := _datastore.DB.Begin() tx, err := _datastore.DB.Begin()
if err != nil { if err != nil {
log.Errorln("error saving", eventType, err) log.Errorln("error saving", eventType, err)
@ -71,9 +69,9 @@ func saveEvent(id string, userID string, body string, eventType string, hidden *
} }
} }
func getChat(query string) []events.UserMessageEvent { func getChat(query *sql.Stmt) []events.UserMessageEvent {
history := make([]events.UserMessageEvent, 0) history := make([]events.UserMessageEvent, 0)
rows, err := _datastore.DB.Query(query) rows, err := query.Query()
if err != nil { if err != nil {
log.Errorln("error fetching chat history", err) log.Errorln("error fetching chat history", err)
return history return history
@ -154,6 +152,8 @@ func getChat(query string) []events.UserMessageEvent {
var _historyCache *[]events.UserMessageEvent var _historyCache *[]events.UserMessageEvent
var _chatModerationHistoryQuery *sql.Stmt
// GetChatModerationHistory will return all the chat messages suitable for moderation purposes. // GetChatModerationHistory will return all the chat messages suitable for moderation purposes.
func GetChatModerationHistory() []events.UserMessageEvent { func GetChatModerationHistory() []events.UserMessageEvent {
if _historyCache != nil { if _historyCache != nil {
@ -161,19 +161,38 @@ func GetChatModerationHistory() []events.UserMessageEvent {
} }
// Get all messages regardless of visibility // Get all messages regardless of visibility
if _chatModerationHistoryQuery == nil {
var query = "SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages INNER JOIN users ON messages.user_id = users.id ORDER BY timestamp DESC" var query = "SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages INNER JOIN users ON messages.user_id = users.id ORDER BY timestamp DESC"
result := getChat(query) q, err := _datastore.DB.Prepare(query)
if err != nil {
log.Errorln(err)
return []events.UserMessageEvent{}
}
_chatModerationHistoryQuery = q
}
result := getChat(_chatModerationHistoryQuery)
_historyCache = &result _historyCache = &result
return result return result
} }
var _getChatHistoryQuery *sql.Stmt
// GetChatHistory will return all the chat messages suitable for returning as user-facing chat history. // GetChatHistory will return all the chat messages suitable for returning as user-facing chat history.
func GetChatHistory() []events.UserMessageEvent { func GetChatHistory() []events.UserMessageEvent {
// Get all visible messages if _getChatHistoryQuery == nil {
var query = fmt.Sprintf("SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages, users WHERE messages.user_id = users.id AND hidden_at IS NULL AND disabled_at IS NULL ORDER BY timestamp DESC LIMIT %d", maxBacklogNumber) var query = fmt.Sprintf("SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages, users WHERE messages.user_id = users.id AND hidden_at IS NULL AND disabled_at IS NULL ORDER BY timestamp DESC LIMIT %d", maxBacklogNumber)
m := getChat(query) q, err := _datastore.DB.Prepare(query)
if err != nil {
log.Errorln(err)
return []events.UserMessageEvent{}
}
_getChatHistoryQuery = q
}
// Get all visible messages
m := getChat(_getChatHistoryQuery)
// Invert order of messages // Invert order of messages
for i, j := 0, len(m)-1; i < j; i, j = i+1, j-1 { for i, j := 0, len(m)-1; i < j; i, j = i+1, j-1 {
@ -192,7 +211,10 @@ func SetMessageVisibilityForUserID(userID string, visible bool) error {
// Get a list of IDs from this user within the 5hr window to send to the connected clients to hide // Get a list of IDs from this user within the 5hr window to send to the connected clients to hide
ids := make([]string, 0) ids := make([]string, 0)
query := fmt.Sprintf("SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages INNER JOIN users ON messages.user_id = users.id WHERE user_id IS '%s'", userID) query, err := _datastore.DB.Prepare(fmt.Sprintf("SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages INNER JOIN users ON messages.user_id = users.id WHERE user_id IS '%s'", userID))
if err != nil {
return err
}
messages := getChat(query) messages := getChat(query)
if len(messages) == 0 { if len(messages) == 0 {
@ -212,9 +234,6 @@ func saveMessageVisibility(messageIDs []string, visible bool) error {
_historyCache = nil _historyCache = nil
}() }()
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
tx, err := _datastore.DB.Begin() tx, err := _datastore.DB.Begin()
if err != nil { if err != nil {
return err return err
@ -290,9 +309,6 @@ func getMessageByID(messageID string) (*events.UserMessageEvent, error) {
// Only keep recent messages so we don't keep more chat data than needed // Only keep recent messages so we don't keep more chat data than needed
// for privacy and efficiency reasons. // for privacy and efficiency reasons.
func runPruner() { func runPruner() {
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
log.Traceln("Removing chat messages older than", maxBacklogHours, "hours") log.Traceln("Removing chat messages older than", maxBacklogHours, "hours")
deleteStatement := `DELETE FROM messages WHERE timestamp <= datetime('now', 'localtime', ?)` deleteStatement := `DELETE FROM messages WHERE timestamp <= datetime('now', 'localtime', ?)`

15
core/data/cache.go

@ -2,19 +2,13 @@ package data
import ( import (
"errors" "errors"
"sync"
) )
var _cacheLock = sync.Mutex{}
// GetCachedValue will return a value for key from the cache. // GetCachedValue will return a value for key from the cache.
func (ds *Datastore) GetCachedValue(key string) ([]byte, error) { func (ds *Datastore) GetCachedValue(key string) ([]byte, error) {
_cacheLock.Lock()
defer _cacheLock.Unlock()
// Check for a cached value // Check for a cached value
if val, ok := ds.cache[key]; ok { if val, ok := ds.cache.Load(key); ok {
return val, nil return val.([]byte), nil
} }
return nil, errors.New(key + " not found in cache") return nil, errors.New(key + " not found in cache")
@ -22,8 +16,5 @@ func (ds *Datastore) GetCachedValue(key string) ([]byte, error) {
// SetCachedValue will set a value for key in the cache. // SetCachedValue will set a value for key in the cache.
func (ds *Datastore) SetCachedValue(key string, b []byte) { func (ds *Datastore) SetCachedValue(key string, b []byte) {
_cacheLock.Lock() ds.cache.Store(key, b)
defer _cacheLock.Unlock()
ds.cache[key] = b
} }

2
core/data/data.go

@ -51,7 +51,7 @@ func SetupPersistence(file string) error {
// Some SQLite optimizations // Some SQLite optimizations
_, _ = db.Exec("pragma journal_mode = WAL") _, _ = db.Exec("pragma journal_mode = WAL")
_, _ = db.Exec("pragma synchronous = normal") _, _ = db.Exec("pragma synchronous = NORMAL")
_, _ = db.Exec("pragma temp_store = memory") _, _ = db.Exec("pragma temp_store = memory")
_, _ = db.Exec("pragma wal_checkpoint(full)") _, _ = db.Exec("pragma wal_checkpoint(full)")

5
core/data/messages.go

@ -16,11 +16,12 @@ func CreateMessagesTable(db *sql.DB) {
"hidden_at" DATETIME, "hidden_at" DATETIME,
"timestamp" DATETIME, "timestamp" DATETIME,
PRIMARY KEY (id) PRIMARY KEY (id)
);CREATE INDEX index ON messages (id, user_id, hidden_at, timestamp); );CREATE UNIQUE INDEX index ON messages (id, user_id, hidden_at, timestamp);
CREATE INDEX id ON messages (id); CREATE INDEX id ON messages (id);
CREATE INDEX user_id ON messages (user_id); CREATE INDEX user_id ON messages (user_id);
CREATE INDEX hidden_at ON messages (hidden_at); CREATE INDEX hidden_at ON messages (hidden_at);
CREATE INDEX timestamp ON messages (timestamp);` CREATE INDEX timestamp ON messages (timestamp);
CREATE UNIQUE INDEX userid_hidden ON messages (user_id, hidden_at);`
stmt, err := db.Prepare(createTableSQL) stmt, err := db.Prepare(createTableSQL)
if err != nil { if err != nil {

11
core/data/persistence.go

@ -14,8 +14,8 @@ import (
// Datastore is the global key/value store for configuration values. // Datastore is the global key/value store for configuration values.
type Datastore struct { type Datastore struct {
DB *sql.DB DB *sql.DB
cache map[string][]byte //cache map[string][]byte
DbLock *sync.Mutex cache sync.Map
} }
func (ds *Datastore) warmCache() { func (ds *Datastore) warmCache() {
@ -33,7 +33,7 @@ func (ds *Datastore) warmCache() {
if err := res.Scan(&rowKey, &rowValue); err != nil { if err := res.Scan(&rowKey, &rowValue); err != nil {
log.Errorln("error pre-caching config row", err) log.Errorln("error pre-caching config row", err)
} }
ds.cache[rowKey] = rowValue ds.cache.Store(rowKey, rowValue)
} }
} }
@ -65,9 +65,6 @@ func (ds *Datastore) Get(key string) (ConfigEntry, error) {
// Save will save the ConfigEntry to the database. // Save will save the ConfigEntry to the database.
func (ds *Datastore) Save(e ConfigEntry) error { func (ds *Datastore) Save(e ConfigEntry) error {
ds.DbLock.Lock()
defer ds.DbLock.Unlock()
var dataGob bytes.Buffer var dataGob bytes.Buffer
enc := gob.NewEncoder(&dataGob) enc := gob.NewEncoder(&dataGob)
if err := enc.Encode(e.Value); err != nil { if err := enc.Encode(e.Value); err != nil {
@ -101,9 +98,7 @@ func (ds *Datastore) Save(e ConfigEntry) error {
// Setup will create the datastore table and perform initial initialization. // Setup will create the datastore table and perform initial initialization.
func (ds *Datastore) Setup() { func (ds *Datastore) Setup() {
ds.cache = make(map[string][]byte)
ds.DB = GetDatabase() ds.DB = GetDatabase()
ds.DbLock = &sync.Mutex{}
createTableSQL := `CREATE TABLE IF NOT EXISTS datastore ( createTableSQL := `CREATE TABLE IF NOT EXISTS datastore (
"key" string NOT NULL PRIMARY KEY, "key" string NOT NULL PRIMARY KEY,

6
core/user/externalAPIUser.go

@ -44,9 +44,6 @@ var validAccessTokenScopes = []string{
func InsertExternalAPIUser(token string, name string, color int, scopes []string) error { func InsertExternalAPIUser(token string, name string, color int, scopes []string) error {
log.Traceln("Adding new API user:", name) log.Traceln("Adding new API user:", name)
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
scopesString := strings.Join(scopes, ",") scopesString := strings.Join(scopes, ",")
id := shortid.MustGenerate() id := shortid.MustGenerate()
@ -76,9 +73,6 @@ func InsertExternalAPIUser(token string, name string, color int, scopes []string
func DeleteExternalAPIUser(token string) error { func DeleteExternalAPIUser(token string) error {
log.Traceln("Deleting access token:", token) log.Traceln("Deleting access token:", token)
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
tx, err := _datastore.DB.Begin() tx, err := _datastore.DB.Begin()
if err != nil { if err != nil {
return err return err

42
core/user/user.go

@ -71,9 +71,6 @@ func CreateAnonymousUser(username string) (*User, error) {
// ChangeUsername will change the user associated to userID from one display name to another. // ChangeUsername will change the user associated to userID from one display name to another.
func ChangeUsername(userID string, username string) { func ChangeUsername(userID string, username string) {
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
tx, err := _datastore.DB.Begin() tx, err := _datastore.DB.Begin()
if err != nil { if err != nil {
@ -103,9 +100,6 @@ func ChangeUsername(userID string, username string) {
} }
func create(user *User) error { func create(user *User) error {
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
tx, err := _datastore.DB.Begin() tx, err := _datastore.DB.Begin()
if err != nil { if err != nil {
log.Debugln(err) log.Debugln(err)
@ -131,9 +125,6 @@ func create(user *User) error {
// SetEnabled will will set the enabled flag on a single user assigned to userID. // SetEnabled will will set the enabled flag on a single user assigned to userID.
func SetEnabled(userID string, enabled bool) error { func SetEnabled(userID string, enabled bool) error {
_datastore.DbLock.Lock()
defer _datastore.DbLock.Unlock()
tx, err := _datastore.DB.Begin() tx, err := _datastore.DB.Begin()
if err != nil { if err != nil {
return err return err
@ -161,24 +152,39 @@ func SetEnabled(userID string, enabled bool) error {
return tx.Commit() return tx.Commit()
} }
var _getUserByTokenQuery *sql.Stmt
// GetUserByToken will return a user by an access token. // GetUserByToken will return a user by an access token.
func GetUserByToken(token string) *User { func GetUserByToken(token string) *User {
_datastore.DbLock.Lock() if _getUserByTokenQuery == nil {
defer _datastore.DbLock.Unlock()
query := "SELECT id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM users WHERE access_token = ?" query := "SELECT id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM users WHERE access_token = ?"
row := _datastore.DB.QueryRow(query, token) q, err := _datastore.DB.Prepare(query)
if err != nil {
return nil
}
_getUserByTokenQuery = q
}
return getUserFromRow(row) row := _getUserByTokenQuery.QueryRow(token)
u := getUserFromRow(row)
return u
} }
var _getUserByIdQuery *sql.Stmt
// GetUserByID will return a user by a user ID. // GetUserByID will return a user by a user ID.
func GetUserByID(id string) *User { func GetUserByID(id string) *User {
_datastore.DbLock.Lock() if _getUserByIdQuery == nil {
defer _datastore.DbLock.Unlock()
query := "SELECT id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM users WHERE id = ?" query := "SELECT id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM users WHERE id = ?"
row := _datastore.DB.QueryRow(query, id) q, err := _datastore.DB.Prepare(query)
if err != nil {
return nil
}
_getUserByIdQuery = q
}
row := _getUserByIdQuery.QueryRow(id)
if row == nil { if row == nil {
log.Errorln(row) log.Errorln(row)
return nil return nil

2
go.mod

@ -38,6 +38,6 @@ require (
github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tklauser/go-sysconf v0.3.5 // indirect
github.com/tklauser/numcpus v0.2.2 // indirect github.com/tklauser/numcpus v0.2.2 // indirect
golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 // indirect golang.org/x/sys v0.0.0-20210902050250-f475640dd07b // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect
) )

4
go.sum

@ -90,8 +90,8 @@ golang.org/x/sys v0.0.0-20191224085550-c709ea063b76/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 h1:hZR0X1kPW+nwyJ9xRxqZk1vx5RUObAPBdKVvXPDUH/E= golang.org/x/sys v0.0.0-20210902050250-f475640dd07b h1:S7hKs0Flbq0bbc9xgYt4stIEG1zNDFqyrPwAX2Wj/sE=
golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210902050250-f475640dd07b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M= golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=

Loading…
Cancel
Save