161 changed files with 2300 additions and 1803 deletions
@ -1,61 +0,0 @@
@@ -1,61 +0,0 @@
|
||||
package activitypub |
||||
|
||||
import ( |
||||
"net/http" |
||||
|
||||
"github.com/owncast/owncast/activitypub/crypto" |
||||
"github.com/owncast/owncast/activitypub/inbox" |
||||
"github.com/owncast/owncast/activitypub/outbox" |
||||
"github.com/owncast/owncast/activitypub/persistence" |
||||
"github.com/owncast/owncast/activitypub/workerpool" |
||||
"github.com/owncast/owncast/storage/configrepository" |
||||
"github.com/owncast/owncast/storage/data" |
||||
|
||||
"github.com/owncast/owncast/models" |
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
var configRepository = configrepository.Get() |
||||
|
||||
// Start will initialize and start the federation support.
|
||||
func Start(datastore *data.Store, router *http.ServeMux) { |
||||
persistence.Setup(datastore) |
||||
workerpool.InitOutboundWorkerPool() |
||||
inbox.InitInboxWorkerPool() |
||||
StartRouter(router) |
||||
|
||||
// Generate the keys for signing federated activity if needed.
|
||||
if configRepository.GetPrivateKey() == "" { |
||||
privateKey, publicKey, err := crypto.GenerateKeys() |
||||
_ = configRepository.SetPrivateKey(string(privateKey)) |
||||
_ = configRepository.SetPublicKey(string(publicKey)) |
||||
if err != nil { |
||||
log.Errorln("Unable to get private key", err) |
||||
} |
||||
} |
||||
} |
||||
|
||||
// SendLive will send a "Go Live" message to followers.
|
||||
func SendLive() error { |
||||
return outbox.SendLive() |
||||
} |
||||
|
||||
// SendPublicFederatedMessage will send an arbitrary provided message to followers.
|
||||
func SendPublicFederatedMessage(message string) error { |
||||
return outbox.SendPublicMessage(message) |
||||
} |
||||
|
||||
// SendDirectFederatedMessage will send a direct message to a single account.
|
||||
func SendDirectFederatedMessage(message, account string) error { |
||||
return outbox.SendDirectMessageToAccount(message, account) |
||||
} |
||||
|
||||
// GetFollowerCount will return the local tracked follower count.
|
||||
func GetFollowerCount() (int64, error) { |
||||
return persistence.GetFollowerCount() |
||||
} |
||||
|
||||
// GetPendingFollowRequests will return the pending follow requests.
|
||||
func GetPendingFollowRequests() ([]models.Follower, error) { |
||||
return persistence.GetPendingFollowRequests() |
||||
} |
@ -1,64 +0,0 @@
@@ -1,64 +0,0 @@
|
||||
package inbox |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/go-fed/activity/streams/vocab" |
||||
"github.com/owncast/owncast/activitypub/resolvers" |
||||
"github.com/owncast/owncast/core/chat" |
||||
"github.com/owncast/owncast/core/chat/events" |
||||
"github.com/owncast/owncast/storage/configrepository" |
||||
) |
||||
|
||||
var configRepository = configrepository.Get() |
||||
|
||||
func handleEngagementActivity(eventType events.EventType, isLiveNotification bool, actorReference vocab.ActivityStreamsActorProperty, action string) error { |
||||
// Do nothing if displaying engagement actions has been turned off.
|
||||
if !configRepository.GetFederationShowEngagement() { |
||||
return nil |
||||
} |
||||
|
||||
// Do nothing if chat is disabled
|
||||
if configRepository.GetChatDisabled() { |
||||
return nil |
||||
} |
||||
|
||||
// Get actor of the action
|
||||
actor, _ := resolvers.GetResolvedActorFromActorProperty(actorReference) |
||||
|
||||
// Send chat message
|
||||
actorName := actor.Name |
||||
if actorName == "" { |
||||
actorName = actor.Username |
||||
} |
||||
actorIRI := actorReference.Begin().GetIRI().String() |
||||
|
||||
userPrefix := fmt.Sprintf("%s ", actorName) |
||||
var suffix string |
||||
if isLiveNotification && action == events.FediverseEngagementLike { |
||||
suffix = "liked that this stream went live." |
||||
} else if action == events.FediverseEngagementLike { |
||||
suffix = fmt.Sprintf("liked a post from %s.", configRepository.GetServerName()) |
||||
} else if isLiveNotification && action == events.FediverseEngagementRepost { |
||||
suffix = "shared this stream with their followers." |
||||
} else if action == events.FediverseEngagementRepost { |
||||
suffix = fmt.Sprintf("shared a post from %s.", configRepository.GetServerName()) |
||||
} else if action == events.FediverseEngagementFollow { |
||||
suffix = "followed this stream." |
||||
} else { |
||||
return fmt.Errorf("could not handle event for sending to chat: %s", action) |
||||
} |
||||
body := fmt.Sprintf("%s %s", userPrefix, suffix) |
||||
|
||||
var image *string |
||||
if actor.Image != nil { |
||||
s := actor.Image.String() |
||||
image = &s |
||||
} |
||||
|
||||
if err := chat.SendFediverseAction(eventType, actor.FullUsername, image, body, actorIRI); err != nil { |
||||
return err |
||||
} |
||||
|
||||
return nil |
||||
} |
@ -1,25 +0,0 @@
@@ -1,25 +0,0 @@
|
||||
package inbox |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/go-fed/activity/streams/vocab" |
||||
"github.com/owncast/owncast/activitypub/persistence" |
||||
"github.com/owncast/owncast/activitypub/resolvers" |
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
func handleUpdateRequest(c context.Context, activity vocab.ActivityStreamsUpdate) error { |
||||
// We only care about update events to followers.
|
||||
if !activity.GetActivityStreamsObject().At(0).IsActivityStreamsPerson() { |
||||
return nil |
||||
} |
||||
|
||||
actor, err := resolvers.GetResolvedActorFromActorProperty(activity.GetActivityStreamsActor()) |
||||
if err != nil { |
||||
log.Errorln(err) |
||||
return err |
||||
} |
||||
|
||||
return persistence.UpdateFollower(actor.ActorIri.String(), actor.Inbox.String(), actor.Name, actor.FullUsername, actor.Image.String()) |
||||
} |
@ -1,35 +0,0 @@
@@ -1,35 +0,0 @@
|
||||
package activitypub |
||||
|
||||
import ( |
||||
"net/http" |
||||
|
||||
"github.com/owncast/owncast/activitypub/controllers" |
||||
"github.com/owncast/owncast/webserver/middleware" |
||||
) |
||||
|
||||
// StartRouter will start the federation specific http router.
|
||||
func StartRouter(router *http.ServeMux) { |
||||
// WebFinger
|
||||
router.HandleFunc("/.well-known/webfinger", controllers.WebfingerHandler) |
||||
|
||||
// Host Metadata
|
||||
router.HandleFunc("/.well-known/host-meta", controllers.HostMetaController) |
||||
|
||||
// Nodeinfo v1
|
||||
router.HandleFunc("/.well-known/nodeinfo", controllers.NodeInfoController) |
||||
|
||||
// x-nodeinfo v2
|
||||
router.HandleFunc("/.well-known/x-nodeinfo2", controllers.XNodeInfo2Controller) |
||||
|
||||
// Nodeinfo v2
|
||||
router.HandleFunc("/nodeinfo/2.0", controllers.NodeInfoV2Controller) |
||||
|
||||
// Instance details
|
||||
router.HandleFunc("/api/v1/instance", controllers.InstanceV1Controller) |
||||
|
||||
// Single ActivityPub Actor
|
||||
router.HandleFunc("/federation/user/", middleware.RequireActivityPubOrRedirect(controllers.ActorHandler)) |
||||
|
||||
// Single AP object
|
||||
router.HandleFunc("/federation/", middleware.RequireActivityPubOrRedirect(controllers.ObjectHandler)) |
||||
} |
@ -0,0 +1,48 @@
@@ -0,0 +1,48 @@
|
||||
package cmd |
||||
|
||||
import ( |
||||
"github.com/owncast/owncast/services/config" |
||||
"github.com/owncast/owncast/services/metrics" |
||||
"github.com/owncast/owncast/storage/configrepository" |
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
type Application struct { |
||||
configservice *config.Config |
||||
metricsservice *metrics.Metrics |
||||
configRepository *configrepository.SqlConfigRepository |
||||
|
||||
maximumConcurrentConnectionLimit int64 |
||||
} |
||||
|
||||
/* |
||||
The order of this setup matters. |
||||
- Parse flags |
||||
- Set the session runtime values |
||||
- Use the session values to configure data persistence |
||||
*/ |
||||
func (app *Application) Start() { |
||||
app.configservice = config.Get() |
||||
|
||||
app.parseFlags() |
||||
app.configureLogging(*enableDebugOptions, *enableVerboseLogging, app.configservice.LogDirectory) |
||||
app.showStartupMessage() |
||||
|
||||
app.setSessionConfig() |
||||
app.createDirectories() |
||||
|
||||
app.maximumConcurrentConnectionLimit = getMaximumConcurrentConnectionLimit() |
||||
setSystemConcurrentConnectionLimit(app.maximumConcurrentConnectionLimit) |
||||
|
||||
// If we're restoring a backup, do that and exit.
|
||||
if *restoreDatabaseFile != "" { |
||||
app.handleRestoreBackup(restoreDatabaseFile) |
||||
log.Exit(0) |
||||
} |
||||
|
||||
if *backupDirectory != "" { |
||||
app.configservice.BackupDirectory = *backupDirectory |
||||
} |
||||
|
||||
app.startServices() |
||||
} |
@ -0,0 +1,20 @@
@@ -0,0 +1,20 @@
|
||||
package cmd |
||||
|
||||
import ( |
||||
"github.com/owncast/owncast/utils" |
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
func (app *Application) handleRestoreBackup(restoreDatabaseFile *string) { |
||||
// Allows a user to restore a specific database backup
|
||||
databaseFile := app.configservice.DatabaseFilePath |
||||
if *dbFile != "" { |
||||
databaseFile = *dbFile |
||||
} |
||||
|
||||
if err := utils.Restore(*restoreDatabaseFile, databaseFile); err != nil { |
||||
log.Fatalln(err) |
||||
} |
||||
|
||||
log.Println("Database has been restored. Restart Owncast.") |
||||
} |
@ -1,7 +1,7 @@
@@ -1,7 +1,7 @@
|
||||
//go:build freebsd
|
||||
// +build freebsd
|
||||
|
||||
package chat |
||||
package cmd |
||||
|
||||
import ( |
||||
"syscall" |
@ -1,6 +1,6 @@
@@ -1,6 +1,6 @@
|
||||
//go:build windows
|
||||
// +build windows
|
||||
|
||||
package chat |
||||
package cmd |
||||
|
||||
func setSystemConcurrentConnectionLimit(limit int64) {} |
@ -0,0 +1,71 @@
@@ -0,0 +1,71 @@
|
||||
package cmd |
||||
|
||||
import ( |
||||
"strconv" |
||||
|
||||
"github.com/owncast/owncast/storage/configrepository" |
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
func (app *Application) setSessionConfig() { |
||||
// Stream key
|
||||
if *newStreamKey != "" { |
||||
log.Println("Temporary stream key is set for this session.") |
||||
app.configservice.TemporaryStreamKey = *newStreamKey |
||||
} |
||||
|
||||
app.configservice.EnableDebugFeatures = *enableDebugOptions |
||||
|
||||
if *dbFile != "" { |
||||
app.configservice.DatabaseFilePath = *dbFile |
||||
} |
||||
|
||||
if *logDirectory != "" { |
||||
app.configservice.LogDirectory = *logDirectory |
||||
} |
||||
} |
||||
|
||||
func (app *Application) saveUpdatedConfig() { |
||||
configRepository := configrepository.Get() |
||||
|
||||
if *newAdminPassword != "" { |
||||
if err := configRepository.SetAdminPassword(*newAdminPassword); err != nil { |
||||
log.Errorln("Error setting your admin password.", err) |
||||
log.Exit(1) |
||||
} else { |
||||
log.Infoln("Admin password changed") |
||||
} |
||||
} |
||||
|
||||
// Set the web server port
|
||||
if *webServerPortOverride != "" { |
||||
portNumber, err := strconv.Atoi(*webServerPortOverride) |
||||
if err != nil { |
||||
log.Warnln(err) |
||||
return |
||||
} |
||||
|
||||
log.Println("Saving new web server port number to", portNumber) |
||||
if err := configRepository.SetHTTPPortNumber(float64(portNumber)); err != nil { |
||||
log.Errorln(err) |
||||
} |
||||
} |
||||
app.configservice.WebServerPort = configRepository.GetHTTPPortNumber() |
||||
|
||||
// Set the web server ip
|
||||
if *webServerIPOverride != "" { |
||||
log.Println("Saving new web server listen IP address to", *webServerIPOverride) |
||||
if err := configRepository.SetHTTPListenAddress(*webServerIPOverride); err != nil { |
||||
log.Errorln(err) |
||||
} |
||||
} |
||||
app.configservice.WebServerIP = configRepository.GetHTTPListenAddress() |
||||
|
||||
// Set the rtmp server port
|
||||
if *rtmpPortOverride > 0 { |
||||
log.Println("Saving new RTMP server port number to", *rtmpPortOverride) |
||||
if err := configRepository.SetRTMPPortNumber(float64(*rtmpPortOverride)); err != nil { |
||||
log.Errorln(err) |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,7 @@
@@ -0,0 +1,7 @@
|
||||
package cmd |
||||
|
||||
import log "github.com/sirupsen/logrus" |
||||
|
||||
func (app *Application) showStartupMessage() { |
||||
log.Infoln(app.configservice.GetReleaseString()) |
||||
} |
@ -0,0 +1,4 @@
@@ -0,0 +1,4 @@
|
||||
package cmd |
||||
|
||||
func initializeData() { |
||||
} |
@ -0,0 +1,23 @@
@@ -0,0 +1,23 @@
|
||||
package cmd |
||||
|
||||
import ( |
||||
"flag" |
||||
) |
||||
|
||||
var ( |
||||
dbFile = flag.String("database", "", "Path to the database file.") |
||||
logDirectory = flag.String("logdir", "", "Directory where logs will be written to") |
||||
backupDirectory = flag.String("backupdir", "", "Directory where backups will be written to") |
||||
enableDebugOptions = flag.Bool("enableDebugFeatures", false, "Enable additional debugging options.") |
||||
enableVerboseLogging = flag.Bool("enableVerboseLogging", false, "Enable additional logging.") |
||||
restoreDatabaseFile = flag.String("restoreDatabase", "", "Restore an Owncast database backup") |
||||
newAdminPassword = flag.String("adminpassword", "", "Set your admin password") |
||||
newStreamKey = flag.String("streamkey", "", "Set a temporary stream key for this session") |
||||
webServerPortOverride = flag.String("webserverport", "", "Force the web server to listen on a specific port") |
||||
webServerIPOverride = flag.String("webserverip", "", "Force web server to listen on this IP address") |
||||
rtmpPortOverride = flag.Int("rtmpport", 0, "Set listen port for the RTMP server") |
||||
) |
||||
|
||||
func (app *Application) parseFlags() { |
||||
flag.Parse() |
||||
} |
@ -0,0 +1,4 @@
@@ -0,0 +1,4 @@
|
||||
package cmd |
||||
|
||||
func (app *Application) startServices() { |
||||
} |
@ -0,0 +1,162 @@
@@ -0,0 +1,162 @@
|
||||
package cmd |
||||
|
||||
import ( |
||||
"fmt" |
||||
"io" |
||||
"io/fs" |
||||
"os" |
||||
"path" |
||||
"path/filepath" |
||||
|
||||
"github.com/owncast/owncast/logging" |
||||
"github.com/owncast/owncast/services/config" |
||||
"github.com/owncast/owncast/static" |
||||
"github.com/owncast/owncast/utils" |
||||
"github.com/pkg/errors" |
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
func (app *Application) createDirectories() { |
||||
// Create the data directory if needed
|
||||
if !utils.DoesFileExists("data") { |
||||
if err := os.Mkdir("./data", 0o700); err != nil { |
||||
log.Fatalln("Cannot create data directory", err) |
||||
} |
||||
} |
||||
|
||||
// Recreate the temp dir
|
||||
if utils.DoesFileExists(app.configservice.TempDir) { |
||||
err := os.RemoveAll(app.configservice.TempDir) |
||||
if err != nil { |
||||
log.Fatalln("Unable to remove temp dir! Check permissions.", app.configservice.TempDir, err) |
||||
} |
||||
} |
||||
if err := os.Mkdir(app.configservice.TempDir, 0o700); err != nil { |
||||
log.Fatalln("Unable to create temp dir!", err) |
||||
} |
||||
} |
||||
|
||||
func (app *Application) configureLogging(enableDebugFeatures bool, enableVerboseLogging bool, logDirectory string) { |
||||
logging.Setup(enableDebugFeatures, enableVerboseLogging, logDirectory) |
||||
log.SetFormatter(&log.TextFormatter{ |
||||
FullTimestamp: true, |
||||
}) |
||||
} |
||||
|
||||
// setupEmojiDirectory sets up the custom emoji directory by copying all built-in
|
||||
// emojis if the directory does not yet exist.
|
||||
func (app *Application) setupEmojiDirectory() (err error) { |
||||
type emojiDirectory struct { |
||||
path string |
||||
isDir bool |
||||
} |
||||
|
||||
// Migrate old (pre 0.1.0) emoji to new location if they exist.
|
||||
app.migrateCustomEmojiLocations() |
||||
|
||||
if utils.DoesFileExists(app.configservice.CustomEmojiPath) { |
||||
return nil |
||||
} |
||||
|
||||
if err = os.MkdirAll(app.configservice.CustomEmojiPath, 0o750); err != nil { |
||||
return fmt.Errorf("unable to create custom emoji directory: %w", err) |
||||
} |
||||
|
||||
staticFS := static.GetEmoji() |
||||
files := []emojiDirectory{} |
||||
|
||||
walkFunction := func(path string, d os.DirEntry, err error) error { |
||||
if path == "." { |
||||
return nil |
||||
} |
||||
|
||||
if d.Name() == "LICENSE.md" { |
||||
return nil |
||||
} |
||||
|
||||
files = append(files, emojiDirectory{path: path, isDir: d.IsDir()}) |
||||
return nil |
||||
} |
||||
|
||||
if err := fs.WalkDir(staticFS, ".", walkFunction); err != nil { |
||||
log.Errorln("unable to fetch emojis: " + err.Error()) |
||||
return errors.Wrap(err, "unable to fetch embedded emoji files") |
||||
} |
||||
|
||||
if err != nil { |
||||
return fmt.Errorf("unable to read built-in emoji files: %w", err) |
||||
} |
||||
|
||||
// Now copy all built-in emojis to the custom emoji directory
|
||||
for _, path := range files { |
||||
emojiPath := filepath.Join(app.configservice.CustomEmojiPath, path.path) |
||||
|
||||
if path.isDir { |
||||
if err := os.Mkdir(emojiPath, 0o700); err != nil { |
||||
return errors.Wrap(err, "unable to create emoji directory, check permissions?: "+path.path) |
||||
} |
||||
continue |
||||
} |
||||
|
||||
memFile, staticOpenErr := staticFS.Open(path.path) |
||||
if staticOpenErr != nil { |
||||
return errors.Wrap(staticOpenErr, "unable to open emoji file from embedded filesystem") |
||||
} |
||||
|
||||
// nolint:gosec
|
||||
diskFile, err := os.Create(emojiPath) |
||||
if err != nil { |
||||
return fmt.Errorf("unable to create custom emoji file on disk: %w", err) |
||||
} |
||||
|
||||
if err != nil { |
||||
_ = diskFile.Close() |
||||
return fmt.Errorf("unable to open built-in emoji file: %w", err) |
||||
} |
||||
|
||||
if _, err = io.Copy(diskFile, memFile); err != nil { |
||||
_ = diskFile.Close() |
||||
_ = os.Remove(emojiPath) |
||||
return fmt.Errorf("unable to copy built-in emoji file to disk: %w", err) |
||||
} |
||||
|
||||
if err = diskFile.Close(); err != nil { |
||||
_ = os.Remove(emojiPath) |
||||
return fmt.Errorf("unable to close custom emoji file on disk: %w", err) |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// MigrateCustomEmojiLocations migrates custom emoji from the old location to the new location.
|
||||
func (app *Application) migrateCustomEmojiLocations() { |
||||
oldLocation := path.Join("webroot", "img", "emoji") |
||||
newLocation := path.Join("data", "emoji") |
||||
|
||||
if !utils.DoesFileExists(oldLocation) { |
||||
return |
||||
} |
||||
|
||||
log.Println("Moving custom emoji directory from", oldLocation, "to", newLocation) |
||||
|
||||
if err := utils.Move(oldLocation, newLocation); err != nil { |
||||
log.Errorln("error moving custom emoji directory", err) |
||||
} |
||||
} |
||||
|
||||
func (app *Application) resetDirectories() { |
||||
log.Trace("Resetting file directories to a clean slate.") |
||||
|
||||
// Wipe hls data directory
|
||||
utils.CleanupDirectory(app.configservice.HLSStoragePath) |
||||
|
||||
// Remove the previous thumbnail
|
||||
logo := app.configRepository.GetLogoPath() |
||||
if utils.DoesFileExists(logo) { |
||||
err := utils.Copy(path.Join("data", logo), filepath.Join(config.DataDirectory, "thumbnail.jpg")) |
||||
if err != nil { |
||||
log.Warnln(err) |
||||
} |
||||
} |
||||
} |
@ -1,187 +0,0 @@
@@ -1,187 +0,0 @@
|
||||
package chat |
||||
|
||||
import ( |
||||
"errors" |
||||
"net/http" |
||||
"sort" |
||||
|
||||
"github.com/owncast/owncast/core/chat/events" |
||||
"github.com/owncast/owncast/models" |
||||
"github.com/owncast/owncast/services/config" |
||||
"github.com/owncast/owncast/storage/configrepository" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/client_golang/prometheus/promauto" |
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
var ( |
||||
getStatus func() models.Status |
||||
chatMessagesSentCounter prometheus.Gauge |
||||
) |
||||
|
||||
var configRepository = configrepository.Get() |
||||
|
||||
// Start begins the chat server.
|
||||
func Start(getStatusFunc func() models.Status) error { |
||||
setupPersistence() |
||||
|
||||
getStatus = getStatusFunc |
||||
_server = NewChat() |
||||
|
||||
go _server.Run() |
||||
|
||||
log.Traceln("Chat server started with max connection count of", _server.maxSocketConnectionLimit) |
||||
c := config.GetConfig() |
||||
|
||||
chatMessagesSentCounter = promauto.NewGauge(prometheus.GaugeOpts{ |
||||
Name: "total_chat_message_count", |
||||
Help: "The number of chat messages incremented over time.", |
||||
ConstLabels: map[string]string{ |
||||
"version": c.VersionNumber, |
||||
"host": configRepository.GetServerURL(), |
||||
}, |
||||
}) |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// GetClientsForUser will return chat connections that are owned by a specific user.
|
||||
func GetClientsForUser(userID string) ([]*Client, error) { |
||||
_server.mu.Lock() |
||||
defer _server.mu.Unlock() |
||||
|
||||
clients := map[string][]*Client{} |
||||
|
||||
for _, client := range _server.clients { |
||||
clients[client.User.ID] = append(clients[client.User.ID], client) |
||||
} |
||||
|
||||
if _, exists := clients[userID]; !exists { |
||||
return nil, errors.New("no connections for user found") |
||||
} |
||||
|
||||
return clients[userID], nil |
||||
} |
||||
|
||||
// FindClientByID will return a single connected client by ID.
|
||||
func FindClientByID(clientID uint) (*Client, bool) { |
||||
client, found := _server.clients[clientID] |
||||
return client, found |
||||
} |
||||
|
||||
// GetClients will return all the current chat clients connected.
|
||||
func GetClients() []*Client { |
||||
clients := []*Client{} |
||||
|
||||
if _server == nil { |
||||
return clients |
||||
} |
||||
|
||||
// Convert the keyed map to a slice.
|
||||
for _, client := range _server.clients { |
||||
clients = append(clients, client) |
||||
} |
||||
|
||||
sort.Slice(clients, func(i, j int) bool { |
||||
return clients[i].ConnectedAt.Before(clients[j].ConnectedAt) |
||||
}) |
||||
|
||||
return clients |
||||
} |
||||
|
||||
// SendSystemMessage will send a message string as a system message to all clients.
|
||||
func SendSystemMessage(text string, ephemeral bool) error { |
||||
message := events.SystemMessageEvent{ |
||||
MessageEvent: events.MessageEvent{ |
||||
Body: text, |
||||
}, |
||||
} |
||||
message.SetDefaults() |
||||
message.RenderBody() |
||||
|
||||
if err := Broadcast(&message); err != nil { |
||||
log.Errorln("error sending system message", err) |
||||
} |
||||
|
||||
if !ephemeral { |
||||
saveEvent(message.ID, nil, message.Body, message.GetMessageType(), nil, message.Timestamp, nil, nil, nil, nil) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// SendFediverseAction will send a message indicating some Fediverse engagement took place.
|
||||
func SendFediverseAction(eventType string, userAccountName string, image *string, body string, link string) error { |
||||
message := events.FediverseEngagementEvent{ |
||||
Event: events.Event{ |
||||
Type: eventType, |
||||
}, |
||||
MessageEvent: events.MessageEvent{ |
||||
Body: body, |
||||
}, |
||||
UserAccountName: userAccountName, |
||||
Image: image, |
||||
Link: link, |
||||
} |
||||
|
||||
message.SetDefaults() |
||||
message.RenderBody() |
||||
|
||||
if err := Broadcast(&message); err != nil { |
||||
log.Errorln("error sending system message", err) |
||||
return err |
||||
} |
||||
|
||||
saveFederatedAction(message) |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// SendSystemAction will send a system action string as an action event to all clients.
|
||||
func SendSystemAction(text string, ephemeral bool) error { |
||||
message := events.ActionEvent{ |
||||
MessageEvent: events.MessageEvent{ |
||||
Body: text, |
||||
}, |
||||
} |
||||
|
||||
message.SetDefaults() |
||||
message.RenderBody() |
||||
|
||||
if err := Broadcast(&message); err != nil { |
||||
log.Errorln("error sending system chat action") |
||||
} |
||||
|
||||
if !ephemeral { |
||||
saveEvent(message.ID, nil, message.Body, message.GetMessageType(), nil, message.Timestamp, nil, nil, nil, nil) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// SendAllWelcomeMessage will send the chat message to all connected clients.
|
||||
func SendAllWelcomeMessage() { |
||||
_server.sendAllWelcomeMessage() |
||||
} |
||||
|
||||
// SendSystemMessageToClient will send a single message to a single connected chat client.
|
||||
func SendSystemMessageToClient(clientID uint, text string) { |
||||
if client, foundClient := FindClientByID(clientID); foundClient { |
||||
_server.sendSystemMessageToClient(client, text) |
||||
} |
||||
} |
||||
|
||||
// Broadcast will send all connected clients the outbound object provided.
|
||||
func Broadcast(event events.OutboundEvent) error { |
||||
return _server.Broadcast(event.GetBroadcastPayload()) |
||||
} |
||||
|
||||
// HandleClientConnection handles a single inbound websocket connection.
|
||||
func HandleClientConnection(w http.ResponseWriter, r *http.Request) { |
||||
_server.HandleClientConnection(w, r) |
||||
} |
||||
|
||||
// DisconnectClients will forcefully disconnect all clients belonging to a user by ID.
|
||||
func DisconnectClients(clients []*Client) { |
||||
_server.DisconnectClients(clients) |
||||
} |
@ -1,17 +0,0 @@
@@ -1,17 +0,0 @@
|
||||
package events |
||||
|
||||
// UserJoinedEvent is the event fired when a user joins chat.
|
||||
type UserJoinedEvent struct { |
||||
Event |
||||
UserEvent |
||||
} |
||||
|
||||
// GetBroadcastPayload will return the object to send to all chat users.
|
||||
func (e *UserJoinedEvent) GetBroadcastPayload() EventPayload { |
||||
return EventPayload{ |
||||
"type": UserJoined, |
||||
"id": e.ID, |
||||
"timestamp": e.Timestamp, |
||||
"user": e.User, |
||||
} |
||||
} |
@ -1,22 +0,0 @@
@@ -1,22 +0,0 @@
|
||||
//go:build !windows
|
||||
// +build !windows
|
||||
|
||||
package chat |
||||
|
||||
import ( |
||||
"syscall" |
||||
|
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
func getMaximumConcurrentConnectionLimit() int64 { |
||||
var rLimit syscall.Rlimit |
||||
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil { |
||||
log.Fatalln(err) |
||||
} |
||||
|
||||
// Return the limit to 70% of max so the machine doesn't die even if it's maxed out for some reason.
|
||||
proposedLimit := int64(float32(rLimit.Max) * 0.7) |
||||
|
||||
return proposedLimit |
||||
} |
@ -1,46 +0,0 @@
@@ -1,46 +0,0 @@
|
||||
package core |
||||
|
||||
import ( |
||||
"github.com/owncast/owncast/models" |
||||
"github.com/owncast/owncast/services/config" |
||||
) |
||||
|
||||
// GetStatus gets the status of the system.
|
||||
func GetStatus() models.Status { |
||||
if _stats == nil { |
||||
return models.Status{} |
||||
} |
||||
|
||||
viewerCount := 0 |
||||
if IsStreamConnected() { |
||||
viewerCount = len(_stats.Viewers) |
||||
} |
||||
|
||||
c := config.GetConfig() |
||||
|
||||
return models.Status{ |
||||
Online: IsStreamConnected(), |
||||
ViewerCount: viewerCount, |
||||
OverallMaxViewerCount: _stats.OverallMaxViewerCount, |
||||
SessionMaxViewerCount: _stats.SessionMaxViewerCount, |
||||
LastDisconnectTime: _stats.LastDisconnectTime, |
||||
LastConnectTime: _stats.LastConnectTime, |
||||
VersionNumber: c.VersionNumber, |
||||
StreamTitle: configRepository.GetStreamTitle(), |
||||
} |
||||
} |
||||
|
||||
// GetCurrentBroadcast will return the currently active broadcast.
|
||||
func GetCurrentBroadcast() *models.CurrentBroadcast { |
||||
return _currentBroadcast |
||||
} |
||||
|
||||
// setBroadcaster will store the current inbound broadcasting details.
|
||||
func setBroadcaster(broadcaster models.Broadcaster) { |
||||
_broadcaster = &broadcaster |
||||
} |
||||
|
||||
// GetBroadcaster will return the details of the currently active broadcaster.
|
||||
func GetBroadcaster() *models.Broadcaster { |
||||
return _broadcaster |
||||
} |
@ -1,203 +0,0 @@
@@ -1,203 +0,0 @@
|
||||
package core |
||||
|
||||
import ( |
||||
"context" |
||||
"io" |
||||
"time" |
||||
|
||||
log "github.com/sirupsen/logrus" |
||||
|
||||
"github.com/owncast/owncast/activitypub" |
||||
"github.com/owncast/owncast/core/chat" |
||||
"github.com/owncast/owncast/models" |
||||
"github.com/owncast/owncast/services/config" |
||||
"github.com/owncast/owncast/services/notifications" |
||||
"github.com/owncast/owncast/services/webhooks" |
||||
"github.com/owncast/owncast/storage/data" |
||||
"github.com/owncast/owncast/utils" |
||||
"github.com/owncast/owncast/video/rtmp" |
||||
"github.com/owncast/owncast/video/transcoder" |
||||
) |
||||
|
||||
// After the stream goes offline this timer fires a full cleanup after N min.
|
||||
var _offlineCleanupTimer *time.Timer |
||||
|
||||
// While a stream takes place cleanup old HLS content every N min.
|
||||
var _onlineCleanupTicker *time.Ticker |
||||
|
||||
var _currentBroadcast *models.CurrentBroadcast |
||||
|
||||
var _onlineTimerCancelFunc context.CancelFunc |
||||
|
||||
var _lastNotified *time.Time |
||||
|
||||
// setStreamAsConnected sets the stream as connected.
|
||||
func setStreamAsConnected(rtmpOut *io.PipeReader) { |
||||
now := utils.NullTime{Time: time.Now(), Valid: true} |
||||
_stats.StreamConnected = true |
||||
_stats.LastDisconnectTime = nil |
||||
_stats.LastConnectTime = &now |
||||
_stats.SessionMaxViewerCount = 0 |
||||
|
||||
_currentBroadcast = &models.CurrentBroadcast{ |
||||
LatencyLevel: configRepository.GetStreamLatencyLevel(), |
||||
OutputSettings: configRepository.GetStreamOutputVariants(), |
||||
} |
||||
|
||||
StopOfflineCleanupTimer() |
||||
startOnlineCleanupTimer() |
||||
|
||||
if _yp != nil { |
||||
go _yp.Start() |
||||
} |
||||
|
||||
c := config.GetConfig() |
||||
segmentPath := c.HLSStoragePath |
||||
|
||||
if err := setupStorage(); err != nil { |
||||
log.Fatalln("failed to setup the storage", err) |
||||
} |
||||
|
||||
go func() { |
||||
_transcoder = transcoder.NewTranscoder() |
||||
_transcoder.TranscoderCompleted = func(error) { |
||||
SetStreamAsDisconnected() |
||||
_transcoder = nil |
||||
_currentBroadcast = nil |
||||
} |
||||
_transcoder.SetStdin(rtmpOut) |
||||
_transcoder.Start(true) |
||||
}() |
||||
|
||||
webhookManager := webhooks.Get() |
||||
go webhookManager.SendStreamStatusEvent(models.StreamStarted) |
||||
transcoder.StartThumbnailGenerator(segmentPath, configRepository.FindHighestVideoQualityIndex(_currentBroadcast.OutputSettings)) |
||||
|
||||
_ = chat.SendSystemAction("Stay tuned, the stream is **starting**!", true) |
||||
chat.SendAllWelcomeMessage() |
||||
|
||||
// Send delayed notification messages.
|
||||
_onlineTimerCancelFunc = startLiveStreamNotificationsTimer() |
||||
} |
||||
|
||||
// SetStreamAsDisconnected sets the stream as disconnected.
|
||||
func SetStreamAsDisconnected() { |
||||
_ = chat.SendSystemAction("The stream is ending.", true) |
||||
|
||||
now := utils.NullTime{Time: time.Now(), Valid: true} |
||||
if _onlineTimerCancelFunc != nil { |
||||
_onlineTimerCancelFunc() |
||||
} |
||||
|
||||
_stats.StreamConnected = false |
||||
_stats.LastDisconnectTime = &now |
||||
_stats.LastConnectTime = nil |
||||
_broadcaster = nil |
||||
|
||||
offlineFilename := "offline.ts" |
||||
|
||||
offlineFilePath, err := saveOfflineClipToDisk(offlineFilename) |
||||
if err != nil { |
||||
log.Errorln(err) |
||||
return |
||||
} |
||||
|
||||
transcoder.StopThumbnailGenerator() |
||||
rtmp.Disconnect() |
||||
|
||||
if _yp != nil { |
||||
_yp.Stop() |
||||
} |
||||
|
||||
// If there is no current broadcast available the previous stream
|
||||
// likely failed for some reason. Don't try to append to it.
|
||||
// Just transition to offline.
|
||||
if _currentBroadcast == nil { |
||||
stopOnlineCleanupTimer() |
||||
transitionToOfflineVideoStreamContent() |
||||
log.Errorln("unexpected nil _currentBroadcast") |
||||
return |
||||
} |
||||
|
||||
for index := range _currentBroadcast.OutputSettings { |
||||
makeVariantIndexOffline(index, offlineFilePath, offlineFilename) |
||||
} |
||||
|
||||
StartOfflineCleanupTimer() |
||||
stopOnlineCleanupTimer() |
||||
saveStats() |
||||
|
||||
webhookManager := webhooks.Get() |
||||
go webhookManager.SendStreamStatusEvent(models.StreamStopped) |
||||
} |
||||
|
||||
// StartOfflineCleanupTimer will fire a cleanup after n minutes being disconnected.
|
||||
func StartOfflineCleanupTimer() { |
||||
_offlineCleanupTimer = time.NewTimer(5 * time.Minute) |
||||
go func() { |
||||
for range _offlineCleanupTimer.C { |
||||
// Set video to offline state
|
||||
resetDirectories() |
||||
transitionToOfflineVideoStreamContent() |
||||
} |
||||
}() |
||||
} |
||||
|
||||
// StopOfflineCleanupTimer will stop the previous cleanup timer.
|
||||
func StopOfflineCleanupTimer() { |
||||
if _offlineCleanupTimer != nil { |
||||
_offlineCleanupTimer.Stop() |
||||
} |
||||
} |
||||
|
||||
func startOnlineCleanupTimer() { |
||||
_onlineCleanupTicker = time.NewTicker(1 * time.Minute) |
||||
go func() { |
||||
for range _onlineCleanupTicker.C { |
||||
if err := _storage.Cleanup(); err != nil { |
||||
log.Errorln(err) |
||||
} |
||||
} |
||||
}() |
||||
} |
||||
|
||||
func stopOnlineCleanupTimer() { |
||||
if _onlineCleanupTicker != nil { |
||||
_onlineCleanupTicker.Stop() |
||||
} |
||||
} |
||||
|
||||
func startLiveStreamNotificationsTimer() context.CancelFunc { |
||||
// Send delayed notification messages.
|
||||
c, cancelFunc := context.WithCancel(context.Background()) |
||||
_onlineTimerCancelFunc = cancelFunc |
||||
go func(c context.Context) { |
||||
select { |
||||
case <-time.After(time.Minute * 2.0): |
||||
if _lastNotified != nil && time.Since(*_lastNotified) < 10*time.Minute { |
||||
return |
||||
} |
||||
|
||||
// Send Fediverse message.
|
||||
if configRepository.GetFederationEnabled() { |
||||
log.Traceln("Sending Federated Go Live message.") |
||||
if err := activitypub.SendLive(); err != nil { |
||||
log.Errorln(err) |
||||
} |
||||
} |
||||
|
||||
// Send notification to those who have registered for them.
|
||||
if notifier, err := notifications.New(data.GetDatastore()); err != nil { |
||||
log.Errorln(err) |
||||
} else { |
||||
notifier.Notify() |
||||
} |
||||
|
||||
now := time.Now() |
||||
_lastNotified = &now |
||||
case <-c.Done(): |
||||
} |
||||
}(c) |
||||
|
||||
return cancelFunc |
||||
} |
@ -1,18 +0,0 @@
@@ -1,18 +0,0 @@
|
||||
package logging |
||||
|
||||
import ( |
||||
"path/filepath" |
||||
|
||||
"github.com/owncast/owncast/services/config" |
||||
) |
||||
|
||||
// GetTranscoderLogFilePath returns the logging path for the transcoder log output.
|
||||
func GetTranscoderLogFilePath() string { |
||||
c := config.GetConfig() |
||||
return filepath.Join(c.LogDirectory, "transcoder.log") |
||||
} |
||||
|
||||
func getLogFilePath() string { |
||||
c := config.GetConfig() |
||||
return filepath.Join(c.LogDirectory, "owncast.log") |
||||
} |
@ -1,173 +1,43 @@
@@ -1,173 +1,43 @@
|
||||
package main |
||||
|
||||
import ( |
||||
"flag" |
||||
"os" |
||||
"strconv" |
||||
import "github.com/owncast/owncast/cmd" |
||||
|
||||
"github.com/owncast/owncast/logging" |
||||
"github.com/owncast/owncast/storage/configrepository" |
||||
"github.com/owncast/owncast/webserver" |
||||
log "github.com/sirupsen/logrus" |
||||
|
||||
"github.com/owncast/owncast/core" |
||||
configservice "github.com/owncast/owncast/services/config" |
||||
"github.com/owncast/owncast/services/metrics" |
||||
|
||||
"github.com/owncast/owncast/utils" |
||||
) |
||||
|
||||
var ( |
||||
dbFile = flag.String("database", "", "Path to the database file.") |
||||
logDirectory = flag.String("logdir", "", "Directory where logs will be written to") |
||||
backupDirectory = flag.String("backupdir", "", "Directory where backups will be written to") |
||||
enableDebugOptions = flag.Bool("enableDebugFeatures", false, "Enable additional debugging options.") |
||||
enableVerboseLogging = flag.Bool("enableVerboseLogging", false, "Enable additional logging.") |
||||
restoreDatabaseFile = flag.String("restoreDatabase", "", "Restore an Owncast database backup") |
||||
newAdminPassword = flag.String("adminpassword", "", "Set your admin password") |
||||
newStreamKey = flag.String("streamkey", "", "Set a temporary stream key for this session") |
||||
webServerPortOverride = flag.String("webserverport", "", "Force the web server to listen on a specific port") |
||||
webServerIPOverride = flag.String("webserverip", "", "Force web server to listen on this IP address") |
||||
rtmpPortOverride = flag.Int("rtmpport", 0, "Set listen port for the RTMP server") |
||||
config *configservice.Config |
||||
) |
||||
|
||||
var configRepository = configrepository.Get() |
||||
|
||||
// nolint:cyclop
|
||||
func main() { |
||||
flag.Parse() |
||||
|
||||
config = configservice.NewConfig() |
||||
|
||||
if *logDirectory != "" { |
||||
config.LogDirectory = *logDirectory |
||||
} |
||||
|
||||
if *backupDirectory != "" { |
||||
config.BackupDirectory = *backupDirectory |
||||
} |
||||
|
||||
// Create the data directory if needed
|
||||
if !utils.DoesFileExists("data") { |
||||
if err := os.Mkdir("./data", 0o700); err != nil { |
||||
log.Fatalln("Cannot create data directory", err) |
||||
} |
||||
} |
||||
|
||||
// Migrate old (pre 0.1.0) emoji to new location if they exist.
|
||||
utils.MigrateCustomEmojiLocations() |
||||
|
||||
// Otherwise save the default emoji to the data directory.
|
||||
if err := data.SetupEmojiDirectory(); err != nil { |
||||
log.Fatalln("Cannot set up emoji directory", err) |
||||
} |
||||
|
||||
// Recreate the temp dir
|
||||
if utils.DoesFileExists(config.TempDir) { |
||||
err := os.RemoveAll(config.TempDir) |
||||
if err != nil { |
||||
log.Fatalln("Unable to remove temp dir! Check permissions.", config.TempDir, err) |
||||
} |
||||
} |
||||
if err := os.Mkdir(config.TempDir, 0o700); err != nil { |
||||
log.Fatalln("Unable to create temp dir!", err) |
||||
} |
||||
|
||||
configureLogging(*enableDebugOptions, *enableVerboseLogging) |
||||
log.Infoln(config.GetReleaseString()) |
||||
|
||||
// Allows a user to restore a specific database backup
|
||||
if *restoreDatabaseFile != "" { |
||||
databaseFile := config.DatabaseFilePath |
||||
if *dbFile != "" { |
||||
databaseFile = *dbFile |
||||
} |
||||
|
||||
if err := utils.Restore(*restoreDatabaseFile, databaseFile); err != nil { |
||||
log.Fatalln(err) |
||||
} |
||||
|
||||
log.Println("Database has been restored. Restart Owncast.") |
||||
log.Exit(0) |
||||
} |
||||
|
||||
config.EnableDebugFeatures = *enableDebugOptions |
||||
|
||||
if *dbFile != "" { |
||||
config.DatabaseFilePath = *dbFile |
||||
} |
||||
|
||||
if err := data.SetupPersistence(config.DatabaseFilePath); err != nil { |
||||
log.Fatalln("failed to open database", err) |
||||
} |
||||
|
||||
handleCommandLineFlags() |
||||
app := &cmd.Application{} |
||||
app.Start() |
||||
} |
||||
|
||||
// starts the core
|
||||
if err := core.Start(); err != nil { |
||||
log.Fatalln("failed to start the core package", err) |
||||
} |
||||
// var configRepository = configrepository.Get()
|
||||
|
||||
go metrics.Start(core.GetStatus) |
||||
// // nolint:cyclop
|
||||
// func main() {
|
||||
// flag.Parse()
|
||||
|
||||
webserver := webserver.New() |
||||
if err := webserver.Start(config.WebServerIP, config.WebServerPort); err != nil { |
||||
log.Fatalln("failed to start/run the web server", err) |
||||
} |
||||
} |
||||
// config = configservice.NewConfig()
|
||||
|
||||
func handleCommandLineFlags() { |
||||
if *newAdminPassword != "" { |
||||
if err := configRepository.SetAdminPassword(*newAdminPassword); err != nil { |
||||
log.Errorln("Error setting your admin password.", err) |
||||
log.Exit(1) |
||||
} else { |
||||
log.Infoln("Admin password changed") |
||||
} |
||||
} |
||||
// // Otherwise save the default emoji to the data directory.
|
||||
// if err := data.SetupEmojiDirectory(); err != nil {
|
||||
// log.Fatalln("Cannot set up emoji directory", err)
|
||||
// }
|
||||
|
||||
if *newStreamKey != "" { |
||||
log.Println("Temporary stream key is set for this session.") |
||||
config.TemporaryStreamKey = *newStreamKey |
||||
} |
||||
// if err := data.SetupPersistence(config.DatabaseFilePath); err != nil {
|
||||
// log.Fatalln("failed to open database", err)
|
||||
// }
|
||||
|
||||
// Set the web server port
|
||||
if *webServerPortOverride != "" { |
||||
portNumber, err := strconv.Atoi(*webServerPortOverride) |
||||
if err != nil { |
||||
log.Warnln(err) |
||||
return |
||||
} |
||||
// handleCommandLineFlags()
|
||||
|
||||
log.Println("Saving new web server port number to", portNumber) |
||||
if err := configRepository.SetHTTPPortNumber(float64(portNumber)); err != nil { |
||||
log.Errorln(err) |
||||
} |
||||
} |
||||
config.WebServerPort = configRepository.GetHTTPPortNumber() |
||||
// // starts the core
|
||||
// if err := core.Start(); err != nil {
|
||||
// log.Fatalln("failed to start the core package", err)
|
||||
// }
|
||||
|
||||
// Set the web server ip
|
||||
if *webServerIPOverride != "" { |
||||
log.Println("Saving new web server listen IP address to", *webServerIPOverride) |
||||
if err := configRepository.SetHTTPListenAddress(*webServerIPOverride); err != nil { |
||||
log.Errorln(err) |
||||
} |
||||
} |
||||
config.WebServerIP = configRepository.GetHTTPListenAddress() |
||||
// go metrics.Start(core.GetStatus)
|
||||
|
||||
// Set the rtmp server port
|
||||
if *rtmpPortOverride > 0 { |
||||
log.Println("Saving new RTMP server port number to", *rtmpPortOverride) |
||||
if err := configRepository.SetRTMPPortNumber(float64(*rtmpPortOverride)); err != nil { |
||||
log.Errorln(err) |
||||
} |
||||
} |
||||
} |
||||
// webserver := webserver.New()
|
||||
// if err := webserver.Start(config.WebServerIP, config.WebServerPort); err != nil {
|
||||
// log.Fatalln("failed to start/run the web server", err)
|
||||
// }
|
||||
// }
|
||||
|
||||
func configureLogging(enableDebugFeatures bool, enableVerboseLogging bool) { |
||||
logging.Setup(enableDebugFeatures, enableVerboseLogging) |
||||
log.SetFormatter(&log.TextFormatter{ |
||||
FullTimestamp: true, |
||||
}) |
||||
} |
||||
// func handleCommandLineFlags() {
|
||||
// }
|
||||
|
@ -1,4 +1,4 @@
@@ -1,4 +1,4 @@
|
||||
package events |
||||
package models |
||||
|
||||
// ActionEvent represents an action that took place, not a chat message.
|
||||
type ActionEvent struct { |
@ -0,0 +1,12 @@
@@ -0,0 +1,12 @@
|
||||
package models |
||||
|
||||
const ( |
||||
// ScopeCanSendChatMessages will allow sending chat messages as itself.
|
||||
ScopeCanSendChatMessages = "CAN_SEND_MESSAGES" |
||||
// ScopeCanSendSystemMessages will allow sending chat messages as the system.
|
||||
ScopeCanSendSystemMessages = "CAN_SEND_SYSTEM_MESSAGES" |
||||
// ScopeHasAdminAccess will allow performing administrative actions on the server.
|
||||
ScopeHasAdminAccess = "HAS_ADMIN_ACCESS" |
||||
|
||||
ModeratorScopeKey = "MODERATOR" |
||||
) |
@ -1,9 +1,7 @@
@@ -1,9 +1,7 @@
|
||||
package events |
||||
|
||||
import "github.com/owncast/owncast/models" |
||||
package models |
||||
|
||||
// ConnectedClientInfo represents the information about a connected client.
|
||||
type ConnectedClientInfo struct { |
||||
Event |
||||
User *models.User `json:"user"` |
||||
User *User `json:"user"` |
||||
} |
@ -1,29 +0,0 @@
@@ -1,29 +0,0 @@
|
||||
package models |
||||
|
||||
// EventType is the type of a websocket event.
|
||||
type EventType = string |
||||
|
||||
const ( |
||||
// MessageSent is the event sent when a chat event takes place.
|
||||
MessageSent EventType = "CHAT" |
||||
// UserJoined is the event sent when a chat user join action takes place.
|
||||
UserJoined EventType = "USER_JOINED" |
||||
// UserNameChanged is the event sent when a chat username change takes place.
|
||||
UserNameChanged EventType = "NAME_CHANGE" |
||||
// VisibiltyToggled is the event sent when a chat message's visibility changes.
|
||||
VisibiltyToggled EventType = "VISIBILITY-UPDATE" |
||||
// PING is a ping message.
|
||||
PING EventType = "PING" |
||||
// PONG is a pong message.
|
||||
PONG EventType = "PONG" |
||||
// StreamStarted represents a stream started event.
|
||||
StreamStarted EventType = "STREAM_STARTED" |
||||
// StreamStopped represents a stream stopped event.
|
||||
StreamStopped EventType = "STREAM_STOPPED" |
||||
// StreamTitleUpdated is the event sent when a stream's title changes.
|
||||
StreamTitleUpdated EventType = "STREAM_TITLE_UPDATED" |
||||
// SystemMessageSent is the event sent when a system message is sent.
|
||||
SystemMessageSent EventType = "SYSTEM" |
||||
// ChatActionSent is a generic chat action that can be used for anything that doesn't need specific handling or formatting.
|
||||
ChatActionSent EventType = "CHAT_ACTION" |
||||
) |
@ -1,4 +1,4 @@
@@ -1,4 +1,4 @@
|
||||
package events |
||||
package models |
||||
|
||||
// NameChangeEvent is received when a user changes their chat display name.
|
||||
type NameChangeEvent struct { |
@ -1,4 +1,4 @@
@@ -1,4 +1,4 @@
|
||||
package events |
||||
package models |
||||
|
||||
// SetMessageVisibilityEvent is the event fired when one or more message
|
||||
// visibilities are changed.
|
@ -1,4 +1,4 @@
@@ -1,4 +1,4 @@
|
||||
package events |
||||
package models |
||||
|
||||
// UserDisabledEvent is the event fired when a user is banned/blocked and disconnected from chat.
|
||||
type UserDisabledEvent struct { |
@ -1,11 +1,17 @@
@@ -1,11 +1,17 @@
|
||||
package models |
||||
|
||||
import "time" |
||||
|
||||
// UserJoinedEvent represents an event when a user joins the chat.
|
||||
// UserJoinedEvent is the event fired when a user joins chat.
|
||||
type UserJoinedEvent struct { |
||||
Timestamp time.Time `json:"timestamp,omitempty"` |
||||
Username string `json:"username"` |
||||
Type EventType `json:"type"` |
||||
ID string `json:"id"` |
||||
Event |
||||
UserEvent |
||||
} |
||||
|
||||
// GetBroadcastPayload will return the object to send to all chat users.
|
||||
func (e *UserJoinedEvent) GetBroadcastPayload() EventPayload { |
||||
return EventPayload{ |
||||
"type": UserJoined, |
||||
"id": e.ID, |
||||
"timestamp": e.Timestamp, |
||||
"user": e.User, |
||||
} |
||||
} |
||||
|
@ -1,4 +1,4 @@
@@ -1,4 +1,4 @@
|
||||
package events |
||||
package models |
||||
|
||||
// UserMessageEvent is an inbound message from a user.
|
||||
type UserMessageEvent struct { |
@ -0,0 +1,82 @@
@@ -0,0 +1,82 @@
|
||||
package apfederation |
||||
|
||||
import ( |
||||
"github.com/owncast/owncast/services/apfederation/crypto" |
||||
"github.com/owncast/owncast/services/apfederation/outbox" |
||||
|
||||
"github.com/owncast/owncast/services/apfederation/workerpool" |
||||
"github.com/owncast/owncast/storage/configrepository" |
||||
"github.com/owncast/owncast/storage/data" |
||||
"github.com/owncast/owncast/storage/federationrepository" |
||||
|
||||
"github.com/owncast/owncast/models" |
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
type APFederation struct { |
||||
workers *workerpool.WorkerPool |
||||
outbox *outbox.APOutbox |
||||
} |
||||
|
||||
func New() *APFederation { |
||||
ds := data.GetDatastore() |
||||
apf := &APFederation{ |
||||
outbox: outbox.Get(), |
||||
} |
||||
apf.Start(ds) |
||||
return apf |
||||
} |
||||
|
||||
var temporaryGlobalInstance *APFederation |
||||
|
||||
func Get() *APFederation { |
||||
if temporaryGlobalInstance == nil { |
||||
temporaryGlobalInstance = New() |
||||
} |
||||
return temporaryGlobalInstance |
||||
} |
||||
|
||||
// Start will initialize and start the federation support.
|
||||
func (ap *APFederation) Start(datastore *data.Store) { |
||||
configRepository := configrepository.Get() |
||||
|
||||
// workerpool.InitOutboundWorkerPool()
|
||||
// ap.InitInboxWorkerPool()
|
||||
|
||||
// Generate the keys for signing federated activity if needed.
|
||||
if configRepository.GetPrivateKey() == "" { |
||||
privateKey, publicKey, err := crypto.GenerateKeys() |
||||
_ = configRepository.SetPrivateKey(string(privateKey)) |
||||
_ = configRepository.SetPublicKey(string(publicKey)) |
||||
if err != nil { |
||||
log.Errorln("Unable to get private key", err) |
||||
} |
||||
} |
||||
} |
||||
|
||||
// SendLive will send a "Go Live" message to followers.
|
||||
func (ap *APFederation) SendLive() error { |
||||
return ap.SendLive() |
||||
} |
||||
|
||||
// SendPublicFederatedMessage will send an arbitrary provided message to followers.
|
||||
func (ap *APFederation) SendPublicFederatedMessage(message string) error { |
||||
return ap.outbox.SendPublicMessage(message) |
||||
} |
||||
|
||||
// SendDirectFederatedMessage will send a direct message to a single account.
|
||||
func (ap *APFederation) SendDirectFederatedMessage(message, account string) error { |
||||
return ap.outbox.SendDirectMessageToAccount(message, account) |
||||
} |
||||
|
||||
// GetFollowerCount will return the local tracked follower count.
|
||||
func (ap *APFederation) GetFollowerCount() (int64, error) { |
||||
federationRepository := federationrepository.Get() |
||||
return federationRepository.GetFollowerCount() |
||||
} |
||||
|
||||
// GetPendingFollowRequests will return the pending follow requests.
|
||||
func (ap *APFederation) GetPendingFollowRequests() ([]models.Follower, error) { |
||||
federationRepository := federationrepository.Get() |
||||
return federationRepository.GetPendingFollowRequests() |
||||
} |
@ -0,0 +1,59 @@
@@ -0,0 +1,59 @@
|
||||
package inbox |
||||
|
||||
import ( |
||||
"fmt" |
||||
|
||||
"github.com/go-fed/activity/streams/vocab" |
||||
"github.com/owncast/owncast/models" |
||||
) |
||||
|
||||
func (api *APInbox) handleEngagementActivity(eventType models.EventType, isLiveNotification bool, actorReference vocab.ActivityStreamsActorProperty, action string) error { |
||||
// Do nothing if displaying engagement actions has been turned off.
|
||||
if !api.configRepository.GetFederationShowEngagement() { |
||||
return nil |
||||
} |
||||
|
||||
// Do nothing if chat is disabled
|
||||
if api.configRepository.GetChatDisabled() { |
||||
return nil |
||||
} |
||||
|
||||
// Get actor of the action
|
||||
actor, _ := api.resolvers.GetResolvedActorFromActorProperty(actorReference) |
||||
|
||||
// Send chat message
|
||||
actorName := actor.Name |
||||
if actorName == "" { |
||||
actorName = actor.Username |
||||
} |
||||
actorIRI := actorReference.Begin().GetIRI().String() |
||||
|
||||
userPrefix := fmt.Sprintf("%s ", actorName) |
||||
var suffix string |
||||
if isLiveNotification && action == models.FediverseEngagementLike { |
||||
suffix = "liked that this stream went live." |
||||
} else if action == models.FediverseEngagementLike { |
||||
suffix = fmt.Sprintf("liked a post from %s.", api.configRepository.GetServerName()) |
||||
} else if isLiveNotification && action == models.FediverseEngagementRepost { |
||||
suffix = "shared this stream with their followers." |
||||
} else if action == models.FediverseEngagementRepost { |
||||
suffix = fmt.Sprintf("shared a post from %s.", api.configRepository.GetServerName()) |
||||
} else if action == models.FediverseEngagementFollow { |
||||
suffix = "followed this stream." |
||||
} else { |
||||
return fmt.Errorf("could not handle event for sending to chat: %s", action) |
||||
} |
||||
body := fmt.Sprintf("%s %s", userPrefix, suffix) |
||||
|
||||
var image *string |
||||
if actor.Image != nil { |
||||
s := actor.Image.String() |
||||
image = &s |
||||
} |
||||
|
||||
if err := api.chatService.SendFediverseAction(eventType, actor.FullUsername, image, body, actorIRI); err != nil { |
||||
return err |
||||
} |
||||
|
||||
return nil |
||||
} |
@ -0,0 +1,37 @@
@@ -0,0 +1,37 @@
|
||||
package inbox |
||||
|
||||
import ( |
||||
"github.com/owncast/owncast/services/apfederation/requests" |
||||
"github.com/owncast/owncast/services/apfederation/resolvers" |
||||
"github.com/owncast/owncast/services/chat" |
||||
|
||||
"github.com/owncast/owncast/storage/configrepository" |
||||
"github.com/owncast/owncast/storage/federationrepository" |
||||
) |
||||
|
||||
type APInbox struct { |
||||
configRepository configrepository.ConfigRepository |
||||
federationRepository *federationrepository.FederationRepository |
||||
resolvers *resolvers.APResolvers |
||||
requests *requests.Requests |
||||
chatService *chat.Chat |
||||
} |
||||
|
||||
func New() *APInbox { |
||||
return &APInbox{ |
||||
configRepository: configrepository.Get(), |
||||
federationRepository: federationrepository.Get(), |
||||
resolvers: resolvers.Get(), |
||||
requests: requests.Get(), |
||||
chatService: chat.Get(), |
||||
} |
||||
} |
||||
|
||||
var temporaryGlobalInstance *APInbox |
||||
|
||||
func Get() *APInbox { |
||||
if temporaryGlobalInstance == nil { |
||||
temporaryGlobalInstance = New() |
||||
} |
||||
return temporaryGlobalInstance |
||||
} |
@ -0,0 +1,23 @@
@@ -0,0 +1,23 @@
|
||||
package inbox |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/go-fed/activity/streams/vocab" |
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
func (api *APInbox) handleUpdateRequest(c context.Context, activity vocab.ActivityStreamsUpdate) error { |
||||
// We only care about update events to followers.
|
||||
if !activity.GetActivityStreamsObject().At(0).IsActivityStreamsPerson() { |
||||
return nil |
||||
} |
||||
|
||||
actor, err := api.resolvers.GetResolvedActorFromActorProperty(activity.GetActivityStreamsActor()) |
||||
if err != nil { |
||||
log.Errorln(err) |
||||
return err |
||||
} |
||||
|
||||
return api.federationRepository.UpdateFollower(actor.ActorIri.String(), actor.Inbox.String(), actor.Name, actor.FullUsername, actor.Image.String()) |
||||
} |
@ -0,0 +1,20 @@
@@ -0,0 +1,20 @@
|
||||
package requests |
||||
|
||||
import "github.com/owncast/owncast/services/apfederation/workerpool" |
||||
|
||||
type Requests struct { |
||||
outboundWorkerPool *workerpool.WorkerPool |
||||
} |
||||
|
||||
func New() *Requests { |
||||
return &Requests{} |
||||
} |
||||
|
||||
var temporaryGlobalInstance *Requests |
||||
|
||||
func Get() *Requests { |
||||
if temporaryGlobalInstance == nil { |
||||
temporaryGlobalInstance = New() |
||||
} |
||||
return temporaryGlobalInstance |
||||
} |
@ -0,0 +1,24 @@
@@ -0,0 +1,24 @@
|
||||
package resolvers |
||||
|
||||
import ( |
||||
"github.com/owncast/owncast/storage/configrepository" |
||||
) |
||||
|
||||
type APResolvers struct { |
||||
configRepository configrepository.ConfigRepository |
||||
} |
||||
|
||||
func New() *APResolvers { |
||||
return &APResolvers{ |
||||
configRepository: configrepository.Get(), |
||||
} |
||||
} |
||||
|
||||
var temporaryGlobalInstance *APResolvers |
||||
|
||||
func Get() *APResolvers { |
||||
if temporaryGlobalInstance == nil { |
||||
temporaryGlobalInstance = New() |
||||
} |
||||
return temporaryGlobalInstance |
||||
} |
@ -0,0 +1,240 @@
@@ -0,0 +1,240 @@
|
||||
package chat |
||||
|
||||
import ( |
||||
"errors" |
||||
"fmt" |
||||
"net/http" |
||||
"sort" |
||||
|
||||
"github.com/owncast/owncast/models" |
||||
"github.com/owncast/owncast/storage/chatrepository" |
||||
"github.com/owncast/owncast/storage/configrepository" |
||||
"github.com/owncast/owncast/storage/userrepository" |
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
type Chat struct { |
||||
getStatus func() *models.Status |
||||
server *Server |
||||
configRepository *configrepository.SqlConfigRepository |
||||
} |
||||
|
||||
func New() *Chat { |
||||
return &Chat{ |
||||
configRepository: configrepository.Get(), |
||||
} |
||||
} |
||||
|
||||
var temporaryGlobalInstance *Chat |
||||
|
||||
// GetConfig returns the temporary global instance.
|
||||
// Remove this after dependency injection is implemented.
|
||||
func Get() *Chat { |
||||
if temporaryGlobalInstance == nil { |
||||
temporaryGlobalInstance = New() |
||||
} |
||||
|
||||
return temporaryGlobalInstance |
||||
} |
||||
|
||||
// Start begins the chat server.
|
||||
func (c *Chat) Start(getStatusFunc func() *models.Status) error { |
||||
c.getStatus = getStatusFunc |
||||
c.server = NewChat() |
||||
|
||||
go c.server.Run() |
||||
|
||||
log.Traceln("Chat server started with max connection count of", c.server.maxSocketConnectionLimit) |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// GetClientsForUser will return chat connections that are owned by a specific user.
|
||||
func (c *Chat) GetClientsForUser(userID string) ([]*Client, error) { |
||||
c.server.mu.Lock() |
||||
defer c.server.mu.Unlock() |
||||
|
||||
clients := map[string][]*Client{} |
||||
|
||||
for _, client := range c.server.clients { |
||||
clients[client.User.ID] = append(clients[client.User.ID], client) |
||||
} |
||||
|
||||
if _, exists := clients[userID]; !exists { |
||||
return nil, errors.New("no connections for user found") |
||||
} |
||||
|
||||
return clients[userID], nil |
||||
} |
||||
|
||||
// FindClientByID will return a single connected client by ID.
|
||||
func (c *Chat) FindClientByID(clientID uint) (*Client, bool) { |
||||
client, found := c.server.clients[clientID] |
||||
return client, found |
||||
} |
||||
|
||||
// GetClients will return all the current chat clients connected.
|
||||
func (c *Chat) GetClients() []*Client { |
||||
clients := []*Client{} |
||||
|
||||
if c.server == nil { |
||||
return clients |
||||
} |
||||
|
||||
// Convert the keyed map to a slice.
|
||||
for _, client := range c.server.clients { |
||||
clients = append(clients, client) |
||||
} |
||||
|
||||
sort.Slice(clients, func(i, j int) bool { |
||||
return clients[i].ConnectedAt.Before(clients[j].ConnectedAt) |
||||
}) |
||||
|
||||
return clients |
||||
} |
||||
|
||||
// SendSystemMessage will send a message string as a system message to all clients.
|
||||
func (c *Chat) SendSystemMessage(text string, ephemeral bool) error { |
||||
message := models.SystemMessageEvent{ |
||||
MessageEvent: models.MessageEvent{ |
||||
Body: text, |
||||
}, |
||||
} |
||||
message.SetDefaults() |
||||
message.RenderBody() |
||||
message.DisplayName = c.configRepository.GetServerName() |
||||
|
||||
if err := c.Broadcast(&message); err != nil { |
||||
log.Errorln("error sending system message", err) |
||||
} |
||||
|
||||
if !ephemeral { |
||||
cr := chatrepository.Get() |
||||
cr.SaveEvent(message.ID, nil, message.Body, message.GetMessageType(), nil, message.Timestamp, nil, nil, nil, nil) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// SendFediverseAction will send a message indicating some Fediverse engagement took place.
|
||||
func (c *Chat) SendFediverseAction(eventType string, userAccountName string, image *string, body string, link string) error { |
||||
message := models.FediverseEngagementEvent{ |
||||
Event: models.Event{ |
||||
Type: eventType, |
||||
}, |
||||
MessageEvent: models.MessageEvent{ |
||||
Body: body, |
||||
}, |
||||
UserAccountName: userAccountName, |
||||
Image: image, |
||||
Link: link, |
||||
} |
||||
|
||||
message.SetDefaults() |
||||
message.RenderBody() |
||||
|
||||
if err := c.Broadcast(&message); err != nil { |
||||
log.Errorln("error sending system message", err) |
||||
return err |
||||
} |
||||
|
||||
cr := chatrepository.Get() |
||||
cr.SaveFederatedAction(message) |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// SendSystemAction will send a system action string as an action event to all clients.
|
||||
func (c *Chat) SendSystemAction(text string, ephemeral bool) error { |
||||
message := models.ActionEvent{ |
||||
MessageEvent: models.MessageEvent{ |
||||
Body: text, |
||||
}, |
||||
} |
||||
|
||||
message.SetDefaults() |
||||
message.RenderBody() |
||||
|
||||
if err := c.Broadcast(&message); err != nil { |
||||
log.Errorln("error sending system chat action") |
||||
} |
||||
|
||||
if !ephemeral { |
||||
cr := chatrepository.Get() |
||||
cr.SaveEvent(message.ID, nil, message.Body, message.GetMessageType(), nil, message.Timestamp, nil, nil, nil, nil) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// SendAllWelcomeMessage will send the chat message to all connected clients.
|
||||
func (c *Chat) SendAllWelcomeMessage() { |
||||
c.server.sendAllWelcomeMessage() |
||||
} |
||||
|
||||
// SendSystemMessageToClient will send a single message to a single connected chat client.
|
||||
func (c *Chat) SendSystemMessageToClient(clientID uint, text string) { |
||||
if client, foundClient := c.FindClientByID(clientID); foundClient { |
||||
c.server.sendSystemMessageToClient(client, text) |
||||
} |
||||
} |
||||
|
||||
// Broadcast will send all connected clients the outbound object provided.
|
||||
func (c *Chat) Broadcast(event models.OutboundEvent) error { |
||||
return c.server.Broadcast(event.GetBroadcastPayload()) |
||||
} |
||||
|
||||
// HandleClientConnection handles a single inbound websocket connection.
|
||||
func (c *Chat) HandleClientConnection(w http.ResponseWriter, r *http.Request) { |
||||
c.server.HandleClientConnection(w, r) |
||||
} |
||||
|
||||
// DisconnectClients will forcefully disconnect all clients belonging to a user by ID.
|
||||
func (c *Chat) DisconnectClients(clients []*Client) { |
||||
c.server.DisconnectClients(clients) |
||||
} |
||||
|
||||
// SendConnectedClientInfoToUser will find all the connected clients assigned to a user
|
||||
// and re-send each the connected client info.
|
||||
func (c *Chat) SendConnectedClientInfoToUser(userID string) error { |
||||
clients, err := c.GetClientsForUser(userID) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
userRepository := userrepository.Get() |
||||
|
||||
// Get an updated reference to the user.
|
||||
user := userRepository.GetUserByID(userID) |
||||
if user == nil { |
||||
return fmt.Errorf("user not found") |
||||
} |
||||
|
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
for _, client := range clients { |
||||
// Update the client's reference to its user.
|
||||
client.User = user |
||||
// Send the update to the client.
|
||||
client.sendConnectedClientInfo() |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// SendActionToUser will send system action text to all connected clients
|
||||
// assigned to a user ID.
|
||||
func (c *Chat) SendActionToUser(userID string, text string) error { |
||||
clients, err := c.GetClientsForUser(userID) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
for _, client := range clients { |
||||
c.server.sendActionToClient(client, text) |
||||
} |
||||
|
||||
return nil |
||||
} |
@ -0,0 +1,61 @@
@@ -0,0 +1,61 @@
|
||||
package status |
||||
|
||||
import ( |
||||
"github.com/owncast/owncast/models" |
||||
"github.com/owncast/owncast/storage/configrepository" |
||||
) |
||||
|
||||
type Status struct { |
||||
models.Stats |
||||
models.Status |
||||
|
||||
broadcast *models.CurrentBroadcast |
||||
broadcaster *models.Broadcaster |
||||
StreamConnected bool |
||||
|
||||
VersionNumber string `json:"versionNumber"` |
||||
StreamTitle string `json:"streamTitle"` |
||||
ViewerCount int `json:"viewerCount"` |
||||
OverallMaxViewerCount int `json:"overallMaxViewerCount"` |
||||
SessionMaxViewerCount int `json:"sessionMaxViewerCount"` |
||||
|
||||
Online bool `json:"online"` |
||||
} |
||||
|
||||
var temporaryGlobalInstance *Status |
||||
|
||||
func New() *Status { |
||||
configRepository := configrepository.Get() |
||||
|
||||
return &Status{ |
||||
StreamTitle: configRepository.GetStreamTitle(), |
||||
} |
||||
} |
||||
|
||||
// Get will return the global instance of the status service.
|
||||
func Get() *Status { |
||||
if temporaryGlobalInstance == nil { |
||||
temporaryGlobalInstance = &Status{} |
||||
} |
||||
|
||||
return temporaryGlobalInstance |
||||
} |
||||
|
||||
// GetCurrentBroadcast will return the currently active broadcast.
|
||||
func (s *Status) GetCurrentBroadcast() *models.CurrentBroadcast { |
||||
return s.broadcast |
||||
} |
||||
|
||||
func (s *Status) SetCurrentBroadcast(broadcast *models.CurrentBroadcast) { |
||||
s.broadcast = broadcast |
||||
} |
||||
|
||||
// SetBroadcaster will store the current inbound broadcasting details.
|
||||
func (s *Status) SetBroadcaster(broadcaster *models.Broadcaster) { |
||||
s.broadcaster = broadcaster |
||||
} |
||||
|
||||
// GetBroadcaster will return the details of the currently active broadcaster.
|
||||
func (s *Status) GetBroadcaster() *models.Broadcaster { |
||||
return s.broadcaster |
||||
} |
@ -1,14 +1,33 @@
@@ -1,14 +1,33 @@
|
||||
package chat |
||||
package chatrepository |
||||
|
||||
import ( |
||||
"fmt" |
||||
"time" |
||||
|
||||
"github.com/owncast/owncast/storage/data" |
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
const ( |
||||
maxBacklogHours = 2 // Keep backlog max hours worth of messages
|
||||
maxBacklogNumber = 50 // Return max number of messages in history request
|
||||
) |
||||
|
||||
func (cr *ChatRepository) startPruner() { |
||||
chatDataPruner := time.NewTicker(5 * time.Minute) |
||||
go func() { |
||||
cr.runPruner() |
||||
for range chatDataPruner.C { |
||||
cr.runPruner() |
||||
} |
||||
}() |
||||
} |
||||
|
||||
// Only keep recent messages so we don't keep more chat data than needed
|
||||
// for privacy and efficiency reasons.
|
||||
func runPruner() { |
||||
func (cr *ChatRepository) runPruner() { |
||||
_datastore := data.GetDatastore() |
||||
|
||||
_datastore.DbLock.Lock() |
||||
defer _datastore.DbLock.Unlock() |
||||
|
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue