diff --git a/controllers/hls.go b/controllers/hls.go index 1cdb4e837..62251a9d9 100644 --- a/controllers/hls.go +++ b/controllers/hls.go @@ -8,7 +8,6 @@ import ( "strings" "github.com/owncast/owncast/config" - "github.com/owncast/owncast/core" "github.com/owncast/owncast/core/data" "github.com/owncast/owncast/router/middleware" "github.com/owncast/owncast/utils" @@ -39,8 +38,7 @@ func HandleHLSRequest(w http.ResponseWriter, r *http.Request) { middleware.DisableCache(w) // Use this as an opportunity to mark this viewer as active. - id := utils.GenerateClientIDFromRequest(r) - core.SetViewerIDActive(id) + Ping(w, r) } else { cacheTime := utils.GetCacheDurationSecondsForPath(relativePath) w.Header().Set("Cache-Control", "public, max-age="+strconv.Itoa(cacheTime)) diff --git a/core/chat/chatclient.go b/core/chat/chatclient.go index c0635989d..7cfb7349d 100644 --- a/core/chat/chatclient.go +++ b/core/chat/chatclient.go @@ -3,6 +3,7 @@ package chat import ( "bytes" "encoding/json" + "sync" "time" log "github.com/sirupsen/logrus" @@ -32,6 +33,7 @@ type Client struct { MessageCount int `json:"messageCount"` UserAgent string `json:"userAgent"` ConnectedAt time.Time `json:"connectedAt"` + mu sync.Mutex } type chatClientEvent struct { @@ -173,10 +175,18 @@ func (c *Client) handleEvent(data []byte) { } func (c *Client) close() { + defer func() { + if a := recover(); a != nil { + log.Println("RECOVER", a) + } + }() + log.Traceln("client closed:", c.User.DisplayName, c.id, c.ipAddress) _ = c.conn.Close() c.server.unregister <- c.id + c.mu.Lock() + defer c.mu.Unlock() if c.send != nil { close(c.send) c.send = nil @@ -212,6 +222,8 @@ func (c *Client) sendPayload(payload events.EventPayload) { return } + c.mu.Lock() + defer c.mu.Unlock() if c.send != nil { c.send <- data } diff --git a/core/chat/persistence.go b/core/chat/persistence.go index 082ffc4f8..563b8db0a 100644 --- a/core/chat/persistence.go +++ b/core/chat/persistence.go @@ -1,6 +1,7 @@ package chat import ( + "database/sql" "fmt" "strings" "time" @@ -42,9 +43,6 @@ func saveEvent(id string, userID string, body string, eventType string, hidden * _historyCache = nil }() - _datastore.DbLock.Lock() - defer _datastore.DbLock.Unlock() - tx, err := _datastore.DB.Begin() if err != nil { log.Errorln("error saving", eventType, err) @@ -71,9 +69,9 @@ func saveEvent(id string, userID string, body string, eventType string, hidden * } } -func getChat(query string) []events.UserMessageEvent { +func getChat(query *sql.Stmt) []events.UserMessageEvent { history := make([]events.UserMessageEvent, 0) - rows, err := _datastore.DB.Query(query) + rows, err := query.Query() if err != nil { log.Errorln("error fetching chat history", err) return history @@ -154,6 +152,8 @@ func getChat(query string) []events.UserMessageEvent { var _historyCache *[]events.UserMessageEvent +var _chatModerationHistoryQuery *sql.Stmt + // GetChatModerationHistory will return all the chat messages suitable for moderation purposes. func GetChatModerationHistory() []events.UserMessageEvent { if _historyCache != nil { @@ -161,19 +161,38 @@ func GetChatModerationHistory() []events.UserMessageEvent { } // Get all messages regardless of visibility - var query = "SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages INNER JOIN users ON messages.user_id = users.id ORDER BY timestamp DESC" - result := getChat(query) + if _chatModerationHistoryQuery == nil { + var query = "SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages INNER JOIN users ON messages.user_id = users.id ORDER BY timestamp DESC" + q, err := _datastore.DB.Prepare(query) + if err != nil { + log.Errorln(err) + return []events.UserMessageEvent{} + } + _chatModerationHistoryQuery = q + } + result := getChat(_chatModerationHistoryQuery) _historyCache = &result return result } +var _getChatHistoryQuery *sql.Stmt + // GetChatHistory will return all the chat messages suitable for returning as user-facing chat history. func GetChatHistory() []events.UserMessageEvent { + if _getChatHistoryQuery == nil { + var query = fmt.Sprintf("SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages, users WHERE messages.user_id = users.id AND hidden_at IS NULL AND disabled_at IS NULL ORDER BY timestamp DESC LIMIT %d", maxBacklogNumber) + q, err := _datastore.DB.Prepare(query) + if err != nil { + log.Errorln(err) + return []events.UserMessageEvent{} + } + _getChatHistoryQuery = q + } + // Get all visible messages - var query = fmt.Sprintf("SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages, users WHERE messages.user_id = users.id AND hidden_at IS NULL AND disabled_at IS NULL ORDER BY timestamp DESC LIMIT %d", maxBacklogNumber) - m := getChat(query) + m := getChat(_getChatHistoryQuery) // Invert order of messages for i, j := 0, len(m)-1; i < j; i, j = i+1, j-1 { @@ -192,7 +211,10 @@ func SetMessageVisibilityForUserID(userID string, visible bool) error { // Get a list of IDs from this user within the 5hr window to send to the connected clients to hide ids := make([]string, 0) - query := fmt.Sprintf("SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages INNER JOIN users ON messages.user_id = users.id WHERE user_id IS '%s'", userID) + query, err := _datastore.DB.Prepare(fmt.Sprintf("SELECT messages.id, user_id, body, eventType, hidden_at, timestamp, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM messages INNER JOIN users ON messages.user_id = users.id WHERE user_id IS '%s'", userID)) + if err != nil { + return err + } messages := getChat(query) if len(messages) == 0 { @@ -212,9 +234,6 @@ func saveMessageVisibility(messageIDs []string, visible bool) error { _historyCache = nil }() - _datastore.DbLock.Lock() - defer _datastore.DbLock.Unlock() - tx, err := _datastore.DB.Begin() if err != nil { return err @@ -290,9 +309,6 @@ func getMessageByID(messageID string) (*events.UserMessageEvent, error) { // Only keep recent messages so we don't keep more chat data than needed // for privacy and efficiency reasons. func runPruner() { - _datastore.DbLock.Lock() - defer _datastore.DbLock.Unlock() - log.Traceln("Removing chat messages older than", maxBacklogHours, "hours") deleteStatement := `DELETE FROM messages WHERE timestamp <= datetime('now', 'localtime', ?)` diff --git a/core/data/cache.go b/core/data/cache.go index 2af34ab42..7065bc25a 100644 --- a/core/data/cache.go +++ b/core/data/cache.go @@ -2,19 +2,13 @@ package data import ( "errors" - "sync" ) -var _cacheLock = sync.Mutex{} - // GetCachedValue will return a value for key from the cache. func (ds *Datastore) GetCachedValue(key string) ([]byte, error) { - _cacheLock.Lock() - defer _cacheLock.Unlock() - // Check for a cached value - if val, ok := ds.cache[key]; ok { - return val, nil + if val, ok := ds.cache.Load(key); ok { + return val.([]byte), nil } return nil, errors.New(key + " not found in cache") @@ -22,8 +16,5 @@ func (ds *Datastore) GetCachedValue(key string) ([]byte, error) { // SetCachedValue will set a value for key in the cache. func (ds *Datastore) SetCachedValue(key string, b []byte) { - _cacheLock.Lock() - defer _cacheLock.Unlock() - - ds.cache[key] = b + ds.cache.Store(key, b) } diff --git a/core/data/data.go b/core/data/data.go index 7ca010800..68f834c59 100644 --- a/core/data/data.go +++ b/core/data/data.go @@ -51,7 +51,7 @@ func SetupPersistence(file string) error { // Some SQLite optimizations _, _ = db.Exec("pragma journal_mode = WAL") - _, _ = db.Exec("pragma synchronous = normal") + _, _ = db.Exec("pragma synchronous = NORMAL") _, _ = db.Exec("pragma temp_store = memory") _, _ = db.Exec("pragma wal_checkpoint(full)") diff --git a/core/data/messages.go b/core/data/messages.go index efb9d49e9..5fb8941fb 100644 --- a/core/data/messages.go +++ b/core/data/messages.go @@ -16,11 +16,12 @@ func CreateMessagesTable(db *sql.DB) { "hidden_at" DATETIME, "timestamp" DATETIME, PRIMARY KEY (id) - );CREATE INDEX index ON messages (id, user_id, hidden_at, timestamp); + );CREATE UNIQUE INDEX index ON messages (id, user_id, hidden_at, timestamp); CREATE INDEX id ON messages (id); CREATE INDEX user_id ON messages (user_id); CREATE INDEX hidden_at ON messages (hidden_at); - CREATE INDEX timestamp ON messages (timestamp);` + CREATE INDEX timestamp ON messages (timestamp); + CREATE UNIQUE INDEX userid_hidden ON messages (user_id, hidden_at);` stmt, err := db.Prepare(createTableSQL) if err != nil { diff --git a/core/data/persistence.go b/core/data/persistence.go index d1a6c9432..aaac63451 100644 --- a/core/data/persistence.go +++ b/core/data/persistence.go @@ -13,9 +13,9 @@ import ( // Datastore is the global key/value store for configuration values. type Datastore struct { - DB *sql.DB - cache map[string][]byte - DbLock *sync.Mutex + DB *sql.DB + //cache map[string][]byte + cache sync.Map } func (ds *Datastore) warmCache() { @@ -33,7 +33,7 @@ func (ds *Datastore) warmCache() { if err := res.Scan(&rowKey, &rowValue); err != nil { log.Errorln("error pre-caching config row", err) } - ds.cache[rowKey] = rowValue + ds.cache.Store(rowKey, rowValue) } } @@ -65,9 +65,6 @@ func (ds *Datastore) Get(key string) (ConfigEntry, error) { // Save will save the ConfigEntry to the database. func (ds *Datastore) Save(e ConfigEntry) error { - ds.DbLock.Lock() - defer ds.DbLock.Unlock() - var dataGob bytes.Buffer enc := gob.NewEncoder(&dataGob) if err := enc.Encode(e.Value); err != nil { @@ -101,9 +98,7 @@ func (ds *Datastore) Save(e ConfigEntry) error { // Setup will create the datastore table and perform initial initialization. func (ds *Datastore) Setup() { - ds.cache = make(map[string][]byte) ds.DB = GetDatabase() - ds.DbLock = &sync.Mutex{} createTableSQL := `CREATE TABLE IF NOT EXISTS datastore ( "key" string NOT NULL PRIMARY KEY, diff --git a/core/user/externalAPIUser.go b/core/user/externalAPIUser.go index 20f3732b8..62b8a0760 100644 --- a/core/user/externalAPIUser.go +++ b/core/user/externalAPIUser.go @@ -44,9 +44,6 @@ var validAccessTokenScopes = []string{ func InsertExternalAPIUser(token string, name string, color int, scopes []string) error { log.Traceln("Adding new API user:", name) - _datastore.DbLock.Lock() - defer _datastore.DbLock.Unlock() - scopesString := strings.Join(scopes, ",") id := shortid.MustGenerate() @@ -76,9 +73,6 @@ func InsertExternalAPIUser(token string, name string, color int, scopes []string func DeleteExternalAPIUser(token string) error { log.Traceln("Deleting access token:", token) - _datastore.DbLock.Lock() - defer _datastore.DbLock.Unlock() - tx, err := _datastore.DB.Begin() if err != nil { return err diff --git a/core/user/user.go b/core/user/user.go index 62f92740a..50150bb86 100644 --- a/core/user/user.go +++ b/core/user/user.go @@ -71,9 +71,6 @@ func CreateAnonymousUser(username string) (*User, error) { // ChangeUsername will change the user associated to userID from one display name to another. func ChangeUsername(userID string, username string) { - _datastore.DbLock.Lock() - defer _datastore.DbLock.Unlock() - tx, err := _datastore.DB.Begin() if err != nil { @@ -103,9 +100,6 @@ func ChangeUsername(userID string, username string) { } func create(user *User) error { - _datastore.DbLock.Lock() - defer _datastore.DbLock.Unlock() - tx, err := _datastore.DB.Begin() if err != nil { log.Debugln(err) @@ -131,9 +125,6 @@ func create(user *User) error { // SetEnabled will will set the enabled flag on a single user assigned to userID. func SetEnabled(userID string, enabled bool) error { - _datastore.DbLock.Lock() - defer _datastore.DbLock.Unlock() - tx, err := _datastore.DB.Begin() if err != nil { return err @@ -161,24 +152,39 @@ func SetEnabled(userID string, enabled bool) error { return tx.Commit() } +var _getUserByTokenQuery *sql.Stmt + // GetUserByToken will return a user by an access token. func GetUserByToken(token string) *User { - _datastore.DbLock.Lock() - defer _datastore.DbLock.Unlock() + if _getUserByTokenQuery == nil { + query := "SELECT id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM users WHERE access_token = ?" + q, err := _datastore.DB.Prepare(query) + if err != nil { + return nil + } + _getUserByTokenQuery = q + } - query := "SELECT id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM users WHERE access_token = ?" - row := _datastore.DB.QueryRow(query, token) + row := _getUserByTokenQuery.QueryRow(token) - return getUserFromRow(row) + u := getUserFromRow(row) + + return u } +var _getUserByIdQuery *sql.Stmt + // GetUserByID will return a user by a user ID. func GetUserByID(id string) *User { - _datastore.DbLock.Lock() - defer _datastore.DbLock.Unlock() - - query := "SELECT id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM users WHERE id = ?" - row := _datastore.DB.QueryRow(query, id) + if _getUserByIdQuery == nil { + query := "SELECT id, display_name, display_color, created_at, disabled_at, previous_names, namechanged_at FROM users WHERE id = ?" + q, err := _datastore.DB.Prepare(query) + if err != nil { + return nil + } + _getUserByIdQuery = q + } + row := _getUserByIdQuery.QueryRow(id) if row == nil { log.Errorln(row) return nil diff --git a/go.mod b/go.mod index bca85956e..b33e219cc 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,6 @@ require ( github.com/tklauser/go-sysconf v0.3.5 // indirect github.com/tklauser/numcpus v0.2.2 // indirect golang.org/x/net v0.0.0-20210614182718-04defd469f4e // indirect - golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 // indirect + golang.org/x/sys v0.0.0-20210902050250-f475640dd07b // indirect gopkg.in/yaml.v2 v2.4.0 // indirect ) diff --git a/go.sum b/go.sum index 1363c832e..682ee0e50 100644 --- a/go.sum +++ b/go.sum @@ -90,8 +90,8 @@ golang.org/x/sys v0.0.0-20191224085550-c709ea063b76/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210316164454-77fc1eacc6aa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210514084401-e8d321eab015 h1:hZR0X1kPW+nwyJ9xRxqZk1vx5RUObAPBdKVvXPDUH/E= -golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210902050250-f475640dd07b h1:S7hKs0Flbq0bbc9xgYt4stIEG1zNDFqyrPwAX2Wj/sE= +golang.org/x/sys v0.0.0-20210902050250-f475640dd07b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.6 h1:aRYxNxv6iGQlyVaZmk6ZgYEDa+Jg18DxebPSrd6bg1M=