126 changed files with 1903 additions and 1610 deletions
@ -1,98 +0,0 @@
@@ -1,98 +0,0 @@
|
||||
package persistence |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/owncast/owncast/db" |
||||
"github.com/owncast/owncast/models" |
||||
"github.com/owncast/owncast/utils" |
||||
"github.com/pkg/errors" |
||||
) |
||||
|
||||
// GetFollowerCount will return the number of followers we're keeping track of.
|
||||
func GetFollowerCount() (int64, error) { |
||||
ctx := context.Background() |
||||
return _datastore.GetQueries().GetFollowerCount(ctx) |
||||
} |
||||
|
||||
// GetFederationFollowers will return a slice of the followers we keep track of locally.
|
||||
func GetFederationFollowers(limit int, offset int) ([]models.Follower, int, error) { |
||||
ctx := context.Background() |
||||
total, err := _datastore.GetQueries().GetFollowerCount(ctx) |
||||
if err != nil { |
||||
return nil, 0, errors.Wrap(err, "unable to fetch total number of followers") |
||||
} |
||||
|
||||
followersResult, err := _datastore.GetQueries().GetFederationFollowersWithOffset(ctx, db.GetFederationFollowersWithOffsetParams{ |
||||
Limit: int32(limit), |
||||
Offset: int32(offset), |
||||
}) |
||||
if err != nil { |
||||
return nil, 0, err |
||||
} |
||||
|
||||
followers := make([]models.Follower, 0) |
||||
|
||||
for _, row := range followersResult { |
||||
singleFollower := models.Follower{ |
||||
Name: row.Name.String, |
||||
Username: row.Username, |
||||
Image: row.Image.String, |
||||
ActorIRI: row.Iri, |
||||
Inbox: row.Inbox, |
||||
Timestamp: utils.NullTime(row.CreatedAt), |
||||
} |
||||
|
||||
followers = append(followers, singleFollower) |
||||
} |
||||
|
||||
return followers, int(total), nil |
||||
} |
||||
|
||||
// GetPendingFollowRequests will return pending follow requests.
|
||||
func GetPendingFollowRequests() ([]models.Follower, error) { |
||||
pendingFollowersResult, err := _datastore.GetQueries().GetFederationFollowerApprovalRequests(context.Background()) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
followers := make([]models.Follower, 0) |
||||
|
||||
for _, row := range pendingFollowersResult { |
||||
singleFollower := models.Follower{ |
||||
Name: row.Name.String, |
||||
Username: row.Username, |
||||
Image: row.Image.String, |
||||
ActorIRI: row.Iri, |
||||
Inbox: row.Inbox, |
||||
Timestamp: utils.NullTime{Time: row.CreatedAt.Time, Valid: true}, |
||||
} |
||||
followers = append(followers, singleFollower) |
||||
} |
||||
|
||||
return followers, nil |
||||
} |
||||
|
||||
// GetBlockedAndRejectedFollowers will return blocked and rejected followers.
|
||||
func GetBlockedAndRejectedFollowers() ([]models.Follower, error) { |
||||
pendingFollowersResult, err := _datastore.GetQueries().GetRejectedAndBlockedFollowers(context.Background()) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
followers := make([]models.Follower, 0) |
||||
|
||||
for _, row := range pendingFollowersResult { |
||||
singleFollower := models.Follower{ |
||||
Name: row.Name.String, |
||||
Username: row.Username, |
||||
Image: row.Image.String, |
||||
ActorIRI: row.Iri, |
||||
DisabledAt: utils.NullTime{Time: row.DisabledAt.Time, Valid: true}, |
||||
Timestamp: utils.NullTime{Time: row.CreatedAt.Time, Valid: true}, |
||||
} |
||||
followers = append(followers, singleFollower) |
||||
} |
||||
|
||||
return followers, nil |
||||
} |
@ -1,319 +0,0 @@
@@ -1,319 +0,0 @@
|
||||
package persistence |
||||
|
||||
import ( |
||||
"context" |
||||
"database/sql" |
||||
"fmt" |
||||
"net/url" |
||||
"time" |
||||
|
||||
"github.com/go-fed/activity/streams" |
||||
"github.com/go-fed/activity/streams/vocab" |
||||
"github.com/owncast/owncast/activitypub/apmodels" |
||||
"github.com/owncast/owncast/activitypub/resolvers" |
||||
"github.com/owncast/owncast/db" |
||||
"github.com/owncast/owncast/models" |
||||
"github.com/pkg/errors" |
||||
|
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
var _datastore *data.Datastore |
||||
|
||||
// Setup will initialize the ActivityPub persistence layer with the provided datastore.
|
||||
func Setup(datastore *data.Datastore) { |
||||
_datastore = datastore |
||||
createFederationFollowersTable() |
||||
createFederationOutboxTable() |
||||
createFederatedActivitiesTable() |
||||
} |
||||
|
||||
// AddFollow will save a follow to the datastore.
|
||||
func AddFollow(follow apmodels.ActivityPubActor, approved bool) error { |
||||
log.Traceln("Saving", follow.ActorIri, "as a follower.") |
||||
var image string |
||||
if follow.Image != nil { |
||||
image = follow.Image.String() |
||||
} |
||||
|
||||
followRequestObject, err := apmodels.Serialize(follow.RequestObject) |
||||
if err != nil { |
||||
return errors.Wrap(err, "error serializing follow request object") |
||||
} |
||||
|
||||
return createFollow(follow.ActorIri.String(), follow.Inbox.String(), follow.FollowRequestIri.String(), follow.Name, follow.Username, image, followRequestObject, approved) |
||||
} |
||||
|
||||
// RemoveFollow will remove a follow from the datastore.
|
||||
func RemoveFollow(unfollow apmodels.ActivityPubActor) error { |
||||
log.Traceln("Removing", unfollow.ActorIri, "as a follower.") |
||||
return removeFollow(unfollow.ActorIri) |
||||
} |
||||
|
||||
// GetFollower will return a single follower/request given an IRI.
|
||||
func GetFollower(iri string) (*apmodels.ActivityPubActor, error) { |
||||
result, err := _datastore.GetQueries().GetFollowerByIRI(context.Background(), iri) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
followIRI, err := url.Parse(result.Request) |
||||
if err != nil { |
||||
return nil, errors.Wrap(err, "error parsing follow request IRI") |
||||
} |
||||
|
||||
iriURL, err := url.Parse(result.Iri) |
||||
if err != nil { |
||||
return nil, errors.Wrap(err, "error parsing actor IRI") |
||||
} |
||||
|
||||
inbox, err := url.Parse(result.Inbox) |
||||
if err != nil { |
||||
return nil, errors.Wrap(err, "error parsing acting inbox") |
||||
} |
||||
|
||||
image, _ := url.Parse(result.Image.String) |
||||
|
||||
var disabledAt *time.Time |
||||
if result.DisabledAt.Valid { |
||||
disabledAt = &result.DisabledAt.Time |
||||
} |
||||
|
||||
follower := apmodels.ActivityPubActor{ |
||||
ActorIri: iriURL, |
||||
Inbox: inbox, |
||||
Name: result.Name.String, |
||||
Username: result.Username, |
||||
Image: image, |
||||
FollowRequestIri: followIRI, |
||||
DisabledAt: disabledAt, |
||||
} |
||||
|
||||
return &follower, nil |
||||
} |
||||
|
||||
// ApprovePreviousFollowRequest will approve a follow request.
|
||||
func ApprovePreviousFollowRequest(iri string) error { |
||||
return _datastore.GetQueries().ApproveFederationFollower(context.Background(), db.ApproveFederationFollowerParams{ |
||||
Iri: iri, |
||||
ApprovedAt: sql.NullTime{ |
||||
Time: time.Now(), |
||||
Valid: true, |
||||
}, |
||||
}) |
||||
} |
||||
|
||||
// BlockOrRejectFollower will block an existing follower or reject a follow request.
|
||||
func BlockOrRejectFollower(iri string) error { |
||||
return _datastore.GetQueries().RejectFederationFollower(context.Background(), db.RejectFederationFollowerParams{ |
||||
Iri: iri, |
||||
DisabledAt: sql.NullTime{ |
||||
Time: time.Now(), |
||||
Valid: true, |
||||
}, |
||||
}) |
||||
} |
||||
|
||||
func createFollow(actor, inbox, request, name, username, image string, requestObject []byte, approved bool) error { |
||||
tx, err := _datastore.DB.Begin() |
||||
if err != nil { |
||||
log.Debugln(err) |
||||
} |
||||
defer func() { |
||||
_ = tx.Rollback() |
||||
}() |
||||
|
||||
var approvedAt sql.NullTime |
||||
if approved { |
||||
approvedAt = sql.NullTime{ |
||||
Time: time.Now(), |
||||
Valid: true, |
||||
} |
||||
} |
||||
|
||||
if err = _datastore.GetQueries().WithTx(tx).AddFollower(context.Background(), db.AddFollowerParams{ |
||||
Iri: actor, |
||||
Inbox: inbox, |
||||
Name: sql.NullString{String: name, Valid: true}, |
||||
Username: username, |
||||
Image: sql.NullString{String: image, Valid: true}, |
||||
ApprovedAt: approvedAt, |
||||
Request: request, |
||||
RequestObject: requestObject, |
||||
}); err != nil { |
||||
log.Errorln("error creating new federation follow: ", err) |
||||
} |
||||
|
||||
return tx.Commit() |
||||
} |
||||
|
||||
// UpdateFollower will update the details of a stored follower given an IRI.
|
||||
func UpdateFollower(actorIRI string, inbox string, name string, username string, image string) error { |
||||
_datastore.DbLock.Lock() |
||||
defer _datastore.DbLock.Unlock() |
||||
|
||||
tx, err := _datastore.DB.Begin() |
||||
if err != nil { |
||||
log.Debugln(err) |
||||
} |
||||
defer func() { |
||||
_ = tx.Rollback() |
||||
}() |
||||
|
||||
if err = _datastore.GetQueries().WithTx(tx).UpdateFollowerByIRI(context.Background(), db.UpdateFollowerByIRIParams{ |
||||
Inbox: inbox, |
||||
Name: sql.NullString{String: name, Valid: true}, |
||||
Username: username, |
||||
Image: sql.NullString{String: image, Valid: true}, |
||||
Iri: actorIRI, |
||||
}); err != nil { |
||||
return fmt.Errorf("error updating follower %s %s", actorIRI, err) |
||||
} |
||||
|
||||
return tx.Commit() |
||||
} |
||||
|
||||
func removeFollow(actor *url.URL) error { |
||||
_datastore.DbLock.Lock() |
||||
defer _datastore.DbLock.Unlock() |
||||
|
||||
tx, err := _datastore.DB.Begin() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer func() { |
||||
_ = tx.Rollback() |
||||
}() |
||||
|
||||
if err := _datastore.GetQueries().WithTx(tx).RemoveFollowerByIRI(context.Background(), actor.String()); err != nil { |
||||
return err |
||||
} |
||||
|
||||
return tx.Commit() |
||||
} |
||||
|
||||
// GetOutboxPostCount will return the number of posts in the outbox.
|
||||
func GetOutboxPostCount() (int64, error) { |
||||
ctx := context.Background() |
||||
return _datastore.GetQueries().GetLocalPostCount(ctx) |
||||
} |
||||
|
||||
// GetOutbox will return an instance of the outbox populated by stored items.
|
||||
func GetOutbox(limit int, offset int) (vocab.ActivityStreamsOrderedCollection, error) { |
||||
collection := streams.NewActivityStreamsOrderedCollection() |
||||
orderedItems := streams.NewActivityStreamsOrderedItemsProperty() |
||||
rows, err := _datastore.GetQueries().GetOutboxWithOffset( |
||||
context.Background(), |
||||
db.GetOutboxWithOffsetParams{Limit: int32(limit), Offset: int32(offset)}, |
||||
) |
||||
if err != nil { |
||||
return collection, err |
||||
} |
||||
|
||||
for _, value := range rows { |
||||
createCallback := func(c context.Context, activity vocab.ActivityStreamsCreate) error { |
||||
orderedItems.AppendActivityStreamsCreate(activity) |
||||
return nil |
||||
} |
||||
if err := resolvers.Resolve(context.Background(), value, createCallback); err != nil { |
||||
return collection, err |
||||
} |
||||
} |
||||
|
||||
return collection, nil |
||||
} |
||||
|
||||
// AddToOutbox will store a single payload to the persistence layer.
|
||||
func AddToOutbox(iri string, itemData []byte, typeString string, isLiveNotification bool) error { |
||||
tx, err := _datastore.DB.Begin() |
||||
if err != nil { |
||||
log.Debugln(err) |
||||
} |
||||
defer func() { |
||||
_ = tx.Rollback() |
||||
}() |
||||
|
||||
if err = _datastore.GetQueries().WithTx(tx).AddToOutbox(context.Background(), db.AddToOutboxParams{ |
||||
Iri: iri, |
||||
Value: itemData, |
||||
Type: typeString, |
||||
LiveNotification: sql.NullBool{Bool: isLiveNotification, Valid: true}, |
||||
}); err != nil { |
||||
return fmt.Errorf("error creating new item in federation outbox %s", err) |
||||
} |
||||
|
||||
return tx.Commit() |
||||
} |
||||
|
||||
// GetObjectByIRI will return a string representation of a single object by the IRI.
|
||||
func GetObjectByIRI(iri string) (string, bool, time.Time, error) { |
||||
row, err := _datastore.GetQueries().GetObjectFromOutboxByIRI(context.Background(), iri) |
||||
return string(row.Value), row.LiveNotification.Bool, row.CreatedAt.Time, err |
||||
} |
||||
|
||||
// GetLocalPostCount will return the number of posts existing locally.
|
||||
func GetLocalPostCount() (int64, error) { |
||||
ctx := context.Background() |
||||
return _datastore.GetQueries().GetLocalPostCount(ctx) |
||||
} |
||||
|
||||
// SaveInboundFediverseActivity will save an event to the ap_inbound_activities table.
|
||||
func SaveInboundFediverseActivity(objectIRI string, actorIRI string, eventType string, timestamp time.Time) error { |
||||
if err := _datastore.GetQueries().AddToAcceptedActivities(context.Background(), db.AddToAcceptedActivitiesParams{ |
||||
Iri: objectIRI, |
||||
Actor: actorIRI, |
||||
Type: eventType, |
||||
Timestamp: timestamp, |
||||
}); err != nil { |
||||
return errors.Wrap(err, "error saving event "+objectIRI) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// GetInboundActivities will return a collection of saved, federated activities
|
||||
// limited and offset by the values provided to support pagination.
|
||||
func GetInboundActivities(limit int, offset int) ([]models.FederatedActivity, int, error) { |
||||
ctx := context.Background() |
||||
rows, err := _datastore.GetQueries().GetInboundActivitiesWithOffset(ctx, db.GetInboundActivitiesWithOffsetParams{ |
||||
Limit: int32(limit), |
||||
Offset: int32(offset), |
||||
}) |
||||
if err != nil { |
||||
return nil, 0, err |
||||
} |
||||
|
||||
activities := make([]models.FederatedActivity, 0) |
||||
|
||||
total, err := _datastore.GetQueries().GetInboundActivityCount(context.Background()) |
||||
if err != nil { |
||||
return nil, 0, errors.Wrap(err, "unable to fetch total activity count") |
||||
} |
||||
|
||||
for _, row := range rows { |
||||
singleActivity := models.FederatedActivity{ |
||||
IRI: row.Iri, |
||||
ActorIRI: row.Actor, |
||||
Type: row.Type, |
||||
Timestamp: row.Timestamp, |
||||
} |
||||
activities = append(activities, singleActivity) |
||||
} |
||||
|
||||
return activities, int(total), nil |
||||
} |
||||
|
||||
// HasPreviouslyHandledInboundActivity will return if we have previously handled
|
||||
// an inbound federated activity.
|
||||
func HasPreviouslyHandledInboundActivity(iri string, actorIRI string, eventType string) (bool, error) { |
||||
exists, err := _datastore.GetQueries().DoesInboundActivityExist(context.Background(), db.DoesInboundActivityExistParams{ |
||||
Iri: iri, |
||||
Actor: actorIRI, |
||||
Type: eventType, |
||||
}) |
||||
if err != nil { |
||||
return false, err |
||||
} |
||||
|
||||
return exists > 0, nil |
||||
} |
@ -1,11 +1,11 @@
@@ -1,11 +1,11 @@
|
||||
package auth |
||||
package models |
||||
|
||||
// Type represents a form of authentication.
|
||||
type Type string |
||||
type AuthType string |
||||
|
||||
// The different auth types we support.
|
||||
const ( |
||||
// IndieAuth https://indieauth.spec.indieweb.org/.
|
||||
IndieAuth Type = "indieauth" |
||||
Fediverse Type = "fediverse" |
||||
IndieAuth AuthType = "indieauth" |
||||
Fediverse AuthType = "fediverse" |
||||
) |
@ -1,66 +0,0 @@
@@ -1,66 +0,0 @@
|
||||
package auth |
||||
|
||||
import ( |
||||
"context" |
||||
"strings" |
||||
|
||||
"github.com/owncast/owncast/models" |
||||
|
||||
"github.com/owncast/owncast/db" |
||||
) |
||||
|
||||
var _datastore *data.Datastore |
||||
|
||||
// Setup will initialize auth persistence.
|
||||
func Setup(db *data.Datastore) { |
||||
_datastore = db |
||||
|
||||
createTableSQL := `CREATE TABLE IF NOT EXISTS auth ( |
||||
"id" INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, |
||||
"user_id" TEXT NOT NULL, |
||||
"token" TEXT NOT NULL, |
||||
"type" TEXT NOT NULL, |
||||
"timestamp" DATE DEFAULT CURRENT_TIMESTAMP NOT NULL, |
||||
FOREIGN KEY(user_id) REFERENCES users(id) |
||||
);` |
||||
_datastore.MustExec(createTableSQL) |
||||
_datastore.MustExec(`CREATE INDEX IF NOT EXISTS idx_auth_token ON auth (token);`) |
||||
} |
||||
|
||||
// AddAuth will add an external authentication token and type for a user.
|
||||
func AddAuth(userID, authToken string, authType Type) error { |
||||
return _datastore.GetQueries().AddAuthForUser(context.Background(), db.AddAuthForUserParams{ |
||||
UserID: userID, |
||||
Token: authToken, |
||||
Type: string(authType), |
||||
}) |
||||
} |
||||
|
||||
// GetUserByAuth will return an existing user given auth details if a user
|
||||
// has previously authenticated with that method.
|
||||
func GetUserByAuth(authToken string, authType Type) *models.User { |
||||
u, err := _datastore.GetQueries().GetUserByAuth(context.Background(), db.GetUserByAuthParams{ |
||||
Token: authToken, |
||||
Type: string(authType), |
||||
}) |
||||
if err != nil { |
||||
return nil |
||||
} |
||||
|
||||
var scopes []string |
||||
if u.Scopes.Valid { |
||||
scopes = strings.Split(u.Scopes.String, ",") |
||||
} |
||||
|
||||
return &models.User{ |
||||
ID: u.ID, |
||||
DisplayName: u.DisplayName, |
||||
DisplayColor: int(u.DisplayColor), |
||||
CreatedAt: u.CreatedAt.Time, |
||||
DisabledAt: &u.DisabledAt.Time, |
||||
PreviousNames: strings.Split(u.PreviousNames.String, ","), |
||||
NameChangedAt: &u.NamechangedAt.Time, |
||||
AuthenticatedAt: &u.AuthenticatedAt.Time, |
||||
Scopes: scopes, |
||||
} |
||||
} |
@ -1,37 +0,0 @@
@@ -1,37 +0,0 @@
|
||||
package notifications |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/owncast/owncast/db" |
||||
"github.com/pkg/errors" |
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
// AddNotification saves a new user notification destination.
|
||||
func AddNotification(channel, destination string) error { |
||||
return data.GetDatastore().GetQueries().AddNotification(context.Background(), db.AddNotificationParams{ |
||||
Channel: channel, |
||||
Destination: destination, |
||||
}) |
||||
} |
||||
|
||||
// RemoveNotificationForChannel removes a notification destination.
|
||||
func RemoveNotificationForChannel(channel, destination string) error { |
||||
log.Debugln("Removing notification for channel", channel) |
||||
return data.GetDatastore().GetQueries().RemoveNotificationDestinationForChannel(context.Background(), db.RemoveNotificationDestinationForChannelParams{ |
||||
Channel: channel, |
||||
Destination: destination, |
||||
}) |
||||
} |
||||
|
||||
// GetNotificationDestinationsForChannel will return a collection of
|
||||
// destinations to notify for a given channel.
|
||||
func GetNotificationDestinationsForChannel(channel string) ([]string, error) { |
||||
result, err := data.GetDatastore().GetQueries().GetNotificationDestinationsForChannel(context.Background(), channel) |
||||
if err != nil { |
||||
return nil, errors.Wrap(err, "unable to query notification destinations for channel "+channel) |
||||
} |
||||
|
||||
return result, nil |
||||
} |
@ -1,6 +1,6 @@
@@ -1,6 +1,6 @@
|
||||
version: 1 |
||||
packages: |
||||
- path: db |
||||
name: db |
||||
schema: 'db/schema.sql' |
||||
queries: 'db/query.sql' |
||||
- path: storage/sqlstorage |
||||
name: sqlstorage |
||||
schema: 'storage/sqlstorage/schema.sql' |
||||
queries: 'storage/sqlstorage/query.sql' |
||||
|
@ -0,0 +1,27 @@
@@ -0,0 +1,27 @@
|
||||
package chatrepository |
||||
|
||||
import "github.com/owncast/owncast/storage/data" |
||||
|
||||
type ChatRepository struct { |
||||
datastore *data.Store |
||||
} |
||||
|
||||
func New(datastore *data.Store) *ChatRepository { |
||||
r := &ChatRepository{ |
||||
datastore: datastore, |
||||
} |
||||
|
||||
return r |
||||
} |
||||
|
||||
// NOTE: This is temporary during the transition period.
|
||||
var temporaryGlobalInstance *ChatRepository |
||||
|
||||
// GetUserRepository will return the user repository.
|
||||
func GetChatRepository() *ChatRepository { |
||||
if temporaryGlobalInstance == nil { |
||||
i := New(data.GetDatastore()) |
||||
temporaryGlobalInstance = i |
||||
} |
||||
return temporaryGlobalInstance |
||||
} |
@ -0,0 +1,7 @@
@@ -0,0 +1,7 @@
|
||||
# Config Repository |
||||
|
||||
The configuration repository represents all the getters, setters and storage logic for user-defined configuration values. This includes things like the server name, enabled/disabled flags, etc. See `keys.go` to see the full list of keys that are used for accessing these values in the database. |
||||
|
||||
## Migrations |
||||
|
||||
Add migrations to `migrations.go` and you can use the datastore and config repository to make your required changes between datastore versions. |
@ -0,0 +1,22 @@
@@ -0,0 +1,22 @@
|
||||
package data |
||||
|
||||
import ( |
||||
"testing" |
||||
) |
||||
|
||||
func TestCachedString(t *testing.T) { |
||||
const testKey = "test string key" |
||||
const testValue = "test string value" |
||||
|
||||
_datastore.SetCachedValue(testKey, []byte(testValue)) |
||||
|
||||
// Get the config entry from the database
|
||||
stringTestResult, err := _datastore.GetCachedValue(testKey) |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
|
||||
if string(stringTestResult) != testValue { |
||||
t.Error("expected", testValue, "but test returned", stringTestResult) |
||||
} |
||||
} |
@ -0,0 +1,126 @@
@@ -0,0 +1,126 @@
|
||||
package data |
||||
|
||||
import ( |
||||
"bytes" |
||||
"database/sql" |
||||
"encoding/gob" |
||||
"sync" |
||||
|
||||
// sqlite requires a blank import.
|
||||
_ "github.com/mattn/go-sqlite3" |
||||
"github.com/owncast/owncast/models" |
||||
"github.com/owncast/owncast/services/config" |
||||
"github.com/owncast/owncast/storage/sqlstorage" |
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
const ( |
||||
schemaVersion = 7 |
||||
) |
||||
|
||||
// Store is the global key/value store for configuration values.
|
||||
type Store struct { |
||||
DB *sql.DB |
||||
cache map[string][]byte |
||||
DbLock *sync.Mutex |
||||
} |
||||
|
||||
// NewStore creates a new datastore.
|
||||
func NewStore(file string) (*Store, error) { |
||||
s := &Store{ |
||||
cache: make(map[string][]byte), |
||||
DbLock: &sync.Mutex{}, |
||||
} |
||||
|
||||
db, err := sqlstorage.InitializeDatabase(file, schemaVersion) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
s.DB = db |
||||
s.warmCache() |
||||
temporaryGlobalDatastoreInstance = s |
||||
return s, nil |
||||
} |
||||
|
||||
var temporaryGlobalDatastoreInstance *Store |
||||
|
||||
// GetDatastore returns the shared instance of the owncast datastore.
|
||||
func GetDatastore() *Store { |
||||
if temporaryGlobalDatastoreInstance == nil { |
||||
c := config.GetConfig() |
||||
i, err := NewStore(c.DatabaseFilePath) |
||||
if err != nil { |
||||
log.Fatal(err) |
||||
} |
||||
temporaryGlobalDatastoreInstance = i |
||||
} |
||||
return temporaryGlobalDatastoreInstance |
||||
} |
||||
|
||||
// GetQueries will return the shared instance of the SQL query generator.
|
||||
func (ds *Store) GetQueries() *sqlstorage.Queries { |
||||
return sqlstorage.New(ds.DB) |
||||
} |
||||
|
||||
// Get will query the database for the key and return the entry.
|
||||
func (ds *Store) Get(key string) (models.ConfigEntry, error) { |
||||
cachedValue, err := ds.GetCachedValue(key) |
||||
if err == nil { |
||||
return models.ConfigEntry{ |
||||
Key: key, |
||||
Value: cachedValue, |
||||
}, nil |
||||
} |
||||
|
||||
var resultKey string |
||||
var resultValue []byte |
||||
|
||||
row := ds.DB.QueryRow("SELECT key, value FROM datastore WHERE key = ? LIMIT 1", key) |
||||
if err := row.Scan(&resultKey, &resultValue); err != nil { |
||||
return models.ConfigEntry{}, err |
||||
} |
||||
|
||||
result := models.ConfigEntry{ |
||||
Key: resultKey, |
||||
Value: resultValue, |
||||
} |
||||
ds.SetCachedValue(resultKey, resultValue) |
||||
|
||||
return result, nil |
||||
} |
||||
|
||||
// Save will save the models.ConfigEntry to the database.
|
||||
func (ds *Store) Save(e models.ConfigEntry) error { |
||||
ds.DbLock.Lock() |
||||
defer ds.DbLock.Unlock() |
||||
|
||||
var dataGob bytes.Buffer |
||||
enc := gob.NewEncoder(&dataGob) |
||||
if err := enc.Encode(e.Value); err != nil { |
||||
return err |
||||
} |
||||
|
||||
tx, err := ds.DB.Begin() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
var stmt *sql.Stmt |
||||
stmt, err = tx.Prepare("INSERT INTO datastore (key, value) VALUES(?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value") |
||||
if err != nil { |
||||
return err |
||||
} |
||||
_, err = stmt.Exec(e.Key, dataGob.Bytes()) |
||||
|
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer stmt.Close() |
||||
|
||||
if err = tx.Commit(); err != nil { |
||||
log.Fatalln(err) |
||||
} |
||||
|
||||
ds.SetCachedValue(e.Key, dataGob.Bytes()) |
||||
|
||||
return nil |
||||
} |
@ -1,4 +1,4 @@
@@ -1,4 +1,4 @@
|
||||
package datastore |
||||
package data |
||||
|
||||
import ( |
||||
"database/sql" |
@ -1,229 +0,0 @@
@@ -1,229 +0,0 @@
|
||||
package datastore |
||||
|
||||
import ( |
||||
"bytes" |
||||
"database/sql" |
||||
"encoding/gob" |
||||
"fmt" |
||||
"os" |
||||
"path/filepath" |
||||
"sync" |
||||
"time" |
||||
|
||||
// sqlite requires a blank import.
|
||||
_ "github.com/mattn/go-sqlite3" |
||||
"github.com/owncast/owncast/db" |
||||
"github.com/owncast/owncast/models" |
||||
"github.com/owncast/owncast/services/config" |
||||
"github.com/owncast/owncast/utils" |
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
const ( |
||||
schemaVersion = 7 |
||||
) |
||||
|
||||
// Datastore is the global key/value store for configuration values.
|
||||
type Datastore struct { |
||||
DB *sql.DB |
||||
cache map[string][]byte |
||||
DbLock *sync.Mutex |
||||
} |
||||
|
||||
var temporaryGlobalDatastoreInstance *Datastore |
||||
|
||||
// NewDatastore creates a new datastore.
|
||||
func NewDatastore(file string) (*Datastore, error) { |
||||
r := &Datastore{ |
||||
cache: make(map[string][]byte), |
||||
DbLock: &sync.Mutex{}, |
||||
} |
||||
|
||||
if err := r.InitializeDatabase(file); err != nil { |
||||
return nil, err |
||||
} |
||||
return r, nil |
||||
} |
||||
|
||||
// GetDatastore returns the shared instance of the owncast datastore.
|
||||
func GetDatastore() *Datastore { |
||||
if temporaryGlobalDatastoreInstance == nil { |
||||
c := config.GetConfig() |
||||
i, err := NewDatastore(c.DatabaseFilePath) |
||||
if err != nil { |
||||
log.Fatal(err) |
||||
} |
||||
temporaryGlobalDatastoreInstance = i |
||||
} |
||||
return temporaryGlobalDatastoreInstance |
||||
} |
||||
|
||||
// GetQueries will return the shared instance of the SQL query generator.
|
||||
func (ds *Datastore) GetQueries() *db.Queries { |
||||
return db.New(ds.DB) |
||||
} |
||||
|
||||
// Get will query the database for the key and return the entry.
|
||||
func (ds *Datastore) Get(key string) (models.ConfigEntry, error) { |
||||
cachedValue, err := ds.GetCachedValue(key) |
||||
if err == nil { |
||||
return models.ConfigEntry{ |
||||
Key: key, |
||||
Value: cachedValue, |
||||
}, nil |
||||
} |
||||
|
||||
var resultKey string |
||||
var resultValue []byte |
||||
|
||||
row := ds.DB.QueryRow("SELECT key, value FROM datastore WHERE key = ? LIMIT 1", key) |
||||
if err := row.Scan(&resultKey, &resultValue); err != nil { |
||||
return models.ConfigEntry{}, err |
||||
} |
||||
|
||||
result := models.ConfigEntry{ |
||||
Key: resultKey, |
||||
Value: resultValue, |
||||
} |
||||
ds.SetCachedValue(resultKey, resultValue) |
||||
|
||||
return result, nil |
||||
} |
||||
|
||||
// Save will save the models.ConfigEntry to the database.
|
||||
func (ds *Datastore) Save(e models.ConfigEntry) error { |
||||
ds.DbLock.Lock() |
||||
defer ds.DbLock.Unlock() |
||||
|
||||
var dataGob bytes.Buffer |
||||
enc := gob.NewEncoder(&dataGob) |
||||
if err := enc.Encode(e.Value); err != nil { |
||||
return err |
||||
} |
||||
|
||||
tx, err := ds.DB.Begin() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
var stmt *sql.Stmt |
||||
stmt, err = tx.Prepare("INSERT INTO datastore (key, value) VALUES(?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value") |
||||
if err != nil { |
||||
return err |
||||
} |
||||
_, err = stmt.Exec(e.Key, dataGob.Bytes()) |
||||
|
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer stmt.Close() |
||||
|
||||
if err = tx.Commit(); err != nil { |
||||
log.Fatalln(err) |
||||
} |
||||
|
||||
ds.SetCachedValue(e.Key, dataGob.Bytes()) |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// MustExec will execute a SQL statement on a provided database instance.
|
||||
func (ds *Datastore) MustExec(s string) { |
||||
stmt, err := ds.DB.Prepare(s) |
||||
if err != nil { |
||||
log.Panic(err) |
||||
} |
||||
defer stmt.Close() |
||||
_, err = stmt.Exec() |
||||
if err != nil { |
||||
log.Warnln(err) |
||||
} |
||||
} |
||||
|
||||
// InitializeDatabase will open the datastore and make it available.
|
||||
func (ds *Datastore) InitializeDatabase(file string) error { |
||||
// Allow support for in-memory databases for tests.
|
||||
|
||||
var db *sql.DB |
||||
|
||||
if file == ":memory:" { |
||||
inMemoryDb, err := sql.Open("sqlite3", file) |
||||
if err != nil { |
||||
log.Fatal(err.Error()) |
||||
} |
||||
db = inMemoryDb |
||||
} else { |
||||
// Create empty DB file if it doesn't exist.
|
||||
if !utils.DoesFileExists(file) { |
||||
log.Traceln("Creating new database at", file) |
||||
|
||||
_, err := os.Create(file) //nolint:gosec
|
||||
if err != nil { |
||||
log.Fatal(err.Error()) |
||||
} |
||||
} |
||||
|
||||
onDiskDb, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?_cache_size=10000&cache=shared&_journal_mode=WAL", file)) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
db = onDiskDb |
||||
db.SetMaxOpenConns(1) |
||||
} |
||||
|
||||
ds.DB = db |
||||
|
||||
// Some SQLite optimizations
|
||||
_, _ = db.Exec("pragma journal_mode = WAL") |
||||
_, _ = db.Exec("pragma synchronous = normal") |
||||
_, _ = db.Exec("pragma temp_store = memory") |
||||
_, _ = db.Exec("pragma wal_checkpoint(full)") |
||||
|
||||
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS config ( |
||||
"key" string NOT NULL PRIMARY KEY, |
||||
"value" TEXT |
||||
);`); err != nil { |
||||
return err |
||||
} |
||||
|
||||
ds.createTables() |
||||
|
||||
var version int |
||||
err := db.QueryRow("SELECT value FROM config WHERE key='version'"). |
||||
Scan(&version) |
||||
if err != nil { |
||||
if err != sql.ErrNoRows { |
||||
return err |
||||
} |
||||
|
||||
// fresh database: initialize it with the current schema version
|
||||
_, err := db.Exec("INSERT INTO config(key, value) VALUES(?, ?)", "version", schemaVersion) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
version = schemaVersion |
||||
} |
||||
|
||||
// is database from a newer Owncast version?
|
||||
if version > schemaVersion { |
||||
return fmt.Errorf("incompatible database version %d (versions up to %d are supported)", |
||||
version, schemaVersion) |
||||
} |
||||
|
||||
// is database schema outdated?
|
||||
if version < schemaVersion { |
||||
if err := migrateDatabaseSchema(db, version, schemaVersion); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
dbBackupTicker := time.NewTicker(1 * time.Hour) |
||||
go func() { |
||||
c := config.GetConfig() |
||||
backupFile := filepath.Join(c.BackupDirectory, "owncastdb.bak") |
||||
for range dbBackupTicker.C { |
||||
utils.Backup(db, backupFile) |
||||
} |
||||
}() |
||||
|
||||
return nil |
||||
} |
@ -0,0 +1,423 @@
@@ -0,0 +1,423 @@
|
||||
package federationrepository |
||||
|
||||
import ( |
||||
"context" |
||||
"database/sql" |
||||
"fmt" |
||||
"net/url" |
||||
"time" |
||||
|
||||
"github.com/go-fed/activity/streams" |
||||
"github.com/go-fed/activity/streams/vocab" |
||||
"github.com/owncast/owncast/activitypub/apmodels" |
||||
"github.com/owncast/owncast/activitypub/resolvers" |
||||
"github.com/owncast/owncast/models" |
||||
"github.com/owncast/owncast/storage/data" |
||||
"github.com/owncast/owncast/storage/sqlstorage" |
||||
"github.com/owncast/owncast/utils" |
||||
"github.com/pkg/errors" |
||||
|
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
type FederationRepository struct { |
||||
datastore *data.Store |
||||
} |
||||
|
||||
func New(datastore *data.Store) *FederationRepository { |
||||
r := &FederationRepository{ |
||||
datastore: datastore, |
||||
} |
||||
|
||||
return r |
||||
} |
||||
|
||||
// NOTE: This is temporary during the transition period.
|
||||
var temporaryGlobalInstance *FederationRepository |
||||
|
||||
// GetUserRepository will return the user repository.
|
||||
func Get() *FederationRepository { |
||||
if temporaryGlobalInstance == nil { |
||||
i := New(data.GetDatastore()) |
||||
temporaryGlobalInstance = i |
||||
} |
||||
return temporaryGlobalInstance |
||||
} |
||||
|
||||
// GetFollowerCount will return the number of followers we're keeping track of.
|
||||
func (f *FederationRepository) GetFollowerCount() (int64, error) { |
||||
ctx := context.Background() |
||||
return f.datastore.GetQueries().GetFollowerCount(ctx) |
||||
} |
||||
|
||||
// GetFederationFollowers will return a slice of the followers we keep track of locally.
|
||||
func (f *FederationRepository) GetFederationFollowers(limit int, offset int) ([]models.Follower, int, error) { |
||||
ctx := context.Background() |
||||
total, err := f.datastore.GetQueries().GetFollowerCount(ctx) |
||||
if err != nil { |
||||
return nil, 0, errors.Wrap(err, "unable to fetch total number of followers") |
||||
} |
||||
|
||||
followersResult, err := f.datastore.GetQueries().GetFederationFollowersWithOffset(ctx, sqlstorage.GetFederationFollowersWithOffsetParams{ |
||||
Limit: int32(limit), |
||||
Offset: int32(offset), |
||||
}) |
||||
if err != nil { |
||||
return nil, 0, err |
||||
} |
||||
|
||||
followers := make([]models.Follower, 0) |
||||
|
||||
for _, row := range followersResult { |
||||
singleFollower := models.Follower{ |
||||
Name: row.Name.String, |
||||
Username: row.Username, |
||||
Image: row.Image.String, |
||||
ActorIRI: row.Iri, |
||||
Inbox: row.Inbox, |
||||
Timestamp: utils.NullTime(row.CreatedAt), |
||||
} |
||||
|
||||
followers = append(followers, singleFollower) |
||||
} |
||||
|
||||
return followers, int(total), nil |
||||
} |
||||
|
||||
// GetPendingFollowRequests will return pending follow requests.
|
||||
func (f *FederationRepository) GetPendingFollowRequests() ([]models.Follower, error) { |
||||
pendingFollowersResult, err := f.datastore.GetQueries().GetFederationFollowerApprovalRequests(context.Background()) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
followers := make([]models.Follower, 0) |
||||
|
||||
for _, row := range pendingFollowersResult { |
||||
singleFollower := models.Follower{ |
||||
Name: row.Name.String, |
||||
Username: row.Username, |
||||
Image: row.Image.String, |
||||
ActorIRI: row.Iri, |
||||
Inbox: row.Inbox, |
||||
Timestamp: utils.NullTime{Time: row.CreatedAt.Time, Valid: true}, |
||||
} |
||||
followers = append(followers, singleFollower) |
||||
} |
||||
|
||||
return followers, nil |
||||
} |
||||
|
||||
// GetBlockedAndRejectedFollowers will return blocked and rejected followers.
|
||||
func (f *FederationRepository) GetBlockedAndRejectedFollowers() ([]models.Follower, error) { |
||||
pendingFollowersResult, err := f.datastore.GetQueries().GetRejectedAndBlockedFollowers(context.Background()) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
followers := make([]models.Follower, 0) |
||||
|
||||
for _, row := range pendingFollowersResult { |
||||
singleFollower := models.Follower{ |
||||
Name: row.Name.String, |
||||
Username: row.Username, |
||||
Image: row.Image.String, |
||||
ActorIRI: row.Iri, |
||||
DisabledAt: utils.NullTime{Time: row.DisabledAt.Time, Valid: true}, |
||||
Timestamp: utils.NullTime{Time: row.CreatedAt.Time, Valid: true}, |
||||
} |
||||
followers = append(followers, singleFollower) |
||||
} |
||||
|
||||
return followers, nil |
||||
} |
||||
|
||||
// AddFollow will save a follow to the datastore.
|
||||
func (f *FederationRepository) AddFollow(follow apmodels.ActivityPubActor, approved bool) error { |
||||
log.Traceln("Saving", follow.ActorIri, "as a follower.") |
||||
var image string |
||||
if follow.Image != nil { |
||||
image = follow.Image.String() |
||||
} |
||||
|
||||
followRequestObject, err := apmodels.Serialize(follow.RequestObject) |
||||
if err != nil { |
||||
return errors.Wrap(err, "error serializing follow request object") |
||||
} |
||||
|
||||
return f.createFollow(follow.ActorIri.String(), follow.Inbox.String(), follow.FollowRequestIri.String(), follow.Name, follow.Username, image, followRequestObject, approved) |
||||
} |
||||
|
||||
// RemoveFollow will remove a follow from the datastore.
|
||||
func (f *FederationRepository) RemoveFollow(unfollow apmodels.ActivityPubActor) error { |
||||
log.Traceln("Removing", unfollow.ActorIri, "as a follower.") |
||||
return f.removeFollow(unfollow.ActorIri) |
||||
} |
||||
|
||||
// GetFollower will return a single follower/request given an IRI.
|
||||
func (f *FederationRepository) GetFollower(iri string) (*apmodels.ActivityPubActor, error) { |
||||
result, err := f.datastore.GetQueries().GetFollowerByIRI(context.Background(), iri) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
followIRI, err := url.Parse(result.Request) |
||||
if err != nil { |
||||
return nil, errors.Wrap(err, "error parsing follow request IRI") |
||||
} |
||||
|
||||
iriURL, err := url.Parse(result.Iri) |
||||
if err != nil { |
||||
return nil, errors.Wrap(err, "error parsing actor IRI") |
||||
} |
||||
|
||||
inbox, err := url.Parse(result.Inbox) |
||||
if err != nil { |
||||
return nil, errors.Wrap(err, "error parsing acting inbox") |
||||
} |
||||
|
||||
image, _ := url.Parse(result.Image.String) |
||||
|
||||
var disabledAt *time.Time |
||||
if result.DisabledAt.Valid { |
||||
disabledAt = &result.DisabledAt.Time |
||||
} |
||||
|
||||
follower := apmodels.ActivityPubActor{ |
||||
ActorIri: iriURL, |
||||
Inbox: inbox, |
||||
Name: result.Name.String, |
||||
Username: result.Username, |
||||
Image: image, |
||||
FollowRequestIri: followIRI, |
||||
DisabledAt: disabledAt, |
||||
} |
||||
|
||||
return &follower, nil |
||||
} |
||||
|
||||
// ApprovePreviousFollowRequest will approve a follow request.
|
||||
func (f *FederationRepository) ApprovePreviousFollowRequest(iri string) error { |
||||
return f.datastore.GetQueries().ApproveFederationFollower(context.Background(), sqlstorage.ApproveFederationFollowerParams{ |
||||
Iri: iri, |
||||
ApprovedAt: sql.NullTime{ |
||||
Time: time.Now(), |
||||
Valid: true, |
||||
}, |
||||
}) |
||||
} |
||||
|
||||
// BlockOrRejectFollower will block an existing follower or reject a follow request.
|
||||
func (f *FederationRepository) BlockOrRejectFollower(iri string) error { |
||||
return f.datastore.GetQueries().RejectFederationFollower(context.Background(), sqlstorage.RejectFederationFollowerParams{ |
||||
Iri: iri, |
||||
DisabledAt: sql.NullTime{ |
||||
Time: time.Now(), |
||||
Valid: true, |
||||
}, |
||||
}) |
||||
} |
||||
|
||||
func (f *FederationRepository) createFollow(actor, inbox, request, name, username, image string, requestObject []byte, approved bool) error { |
||||
tx, err := f.datastore.DB.Begin() |
||||
if err != nil { |
||||
log.Debugln(err) |
||||
} |
||||
defer func() { |
||||
_ = tx.Rollback() |
||||
}() |
||||
|
||||
var approvedAt sql.NullTime |
||||
if approved { |
||||
approvedAt = sql.NullTime{ |
||||
Time: time.Now(), |
||||
Valid: true, |
||||
} |
||||
} |
||||
|
||||
if err = f.datastore.GetQueries().WithTx(tx).AddFollower(context.Background(), sqlstorage.AddFollowerParams{ |
||||
Iri: actor, |
||||
Inbox: inbox, |
||||
Name: sql.NullString{String: name, Valid: true}, |
||||
Username: username, |
||||
Image: sql.NullString{String: image, Valid: true}, |
||||
ApprovedAt: approvedAt, |
||||
Request: request, |
||||
RequestObject: requestObject, |
||||
}); err != nil { |
||||
log.Errorln("error creating new federation follow: ", err) |
||||
} |
||||
|
||||
return tx.Commit() |
||||
} |
||||
|
||||
// UpdateFollower will update the details of a stored follower given an IRI.
|
||||
func (f *FederationRepository) UpdateFollower(actorIRI string, inbox string, name string, username string, image string) error { |
||||
f.datastore.DbLock.Lock() |
||||
defer f.datastore.DbLock.Unlock() |
||||
|
||||
tx, err := f.datastore.DB.Begin() |
||||
if err != nil { |
||||
log.Debugln(err) |
||||
} |
||||
defer func() { |
||||
_ = tx.Rollback() |
||||
}() |
||||
|
||||
if err = f.datastore.GetQueries().WithTx(tx).UpdateFollowerByIRI(context.Background(), sqlstorage.UpdateFollowerByIRIParams{ |
||||
Inbox: inbox, |
||||
Name: sql.NullString{String: name, Valid: true}, |
||||
Username: username, |
||||
Image: sql.NullString{String: image, Valid: true}, |
||||
Iri: actorIRI, |
||||
}); err != nil { |
||||
return fmt.Errorf("error updating follower %s %s", actorIRI, err) |
||||
} |
||||
|
||||
return tx.Commit() |
||||
} |
||||
|
||||
func (f *FederationRepository) removeFollow(actor *url.URL) error { |
||||
f.datastore.DbLock.Lock() |
||||
defer f.datastore.DbLock.Unlock() |
||||
|
||||
tx, err := f.datastore.DB.Begin() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
defer func() { |
||||
_ = tx.Rollback() |
||||
}() |
||||
|
||||
if err := f.datastore.GetQueries().WithTx(tx).RemoveFollowerByIRI(context.Background(), actor.String()); err != nil { |
||||
return err |
||||
} |
||||
|
||||
return tx.Commit() |
||||
} |
||||
|
||||
// GetOutboxPostCount will return the number of posts in the outbox.
|
||||
func (f *FederationRepository) GetOutboxPostCount() (int64, error) { |
||||
ctx := context.Background() |
||||
return f.datastore.GetQueries().GetLocalPostCount(ctx) |
||||
} |
||||
|
||||
// GetOutbox will return an instance of the outbox populated by stored items.
|
||||
func (f *FederationRepository) GetOutbox(limit int, offset int) (vocab.ActivityStreamsOrderedCollection, error) { |
||||
collection := streams.NewActivityStreamsOrderedCollection() |
||||
orderedItems := streams.NewActivityStreamsOrderedItemsProperty() |
||||
rows, err := f.datastore.GetQueries().GetOutboxWithOffset( |
||||
context.Background(), |
||||
sqlstorage.GetOutboxWithOffsetParams{Limit: int32(limit), Offset: int32(offset)}, |
||||
) |
||||
if err != nil { |
||||
return collection, err |
||||
} |
||||
|
||||
for _, value := range rows { |
||||
createCallback := func(c context.Context, activity vocab.ActivityStreamsCreate) error { |
||||
orderedItems.AppendActivityStreamsCreate(activity) |
||||
return nil |
||||
} |
||||
if err := resolvers.Resolve(context.Background(), value, createCallback); err != nil { |
||||
return collection, err |
||||
} |
||||
} |
||||
|
||||
return collection, nil |
||||
} |
||||
|
||||
// AddToOutbox will store a single payload to the persistence layer.
|
||||
func (f *FederationRepository) AddToOutbox(iri string, itemData []byte, typeString string, isLiveNotification bool) error { |
||||
tx, err := f.datastore.DB.Begin() |
||||
if err != nil { |
||||
log.Debugln(err) |
||||
} |
||||
defer func() { |
||||
_ = tx.Rollback() |
||||
}() |
||||
|
||||
if err = f.datastore.GetQueries().WithTx(tx).AddToOutbox(context.Background(), sqlstorage.AddToOutboxParams{ |
||||
Iri: iri, |
||||
Value: itemData, |
||||
Type: typeString, |
||||
LiveNotification: sql.NullBool{Bool: isLiveNotification, Valid: true}, |
||||
}); err != nil { |
||||
return fmt.Errorf("error creating new item in federation outbox %s", err) |
||||
} |
||||
|
||||
return tx.Commit() |
||||
} |
||||
|
||||
// GetObjectByIRI will return a string representation of a single object by the IRI.
|
||||
func (f *FederationRepository) GetObjectByIRI(iri string) (string, bool, time.Time, error) { |
||||
row, err := f.datastore.GetQueries().GetObjectFromOutboxByIRI(context.Background(), iri) |
||||
return string(row.Value), row.LiveNotification.Bool, row.CreatedAt.Time, err |
||||
} |
||||
|
||||
// GetLocalPostCount will return the number of posts existing locally.
|
||||
func (f *FederationRepository) GetLocalPostCount() (int64, error) { |
||||
ctx := context.Background() |
||||
return f.datastore.GetQueries().GetLocalPostCount(ctx) |
||||
} |
||||
|
||||
// SaveInboundFediverseActivity will save an event to the ap_inbound_activities table.
|
||||
func (f *FederationRepository) SaveInboundFediverseActivity(objectIRI string, actorIRI string, eventType string, timestamp time.Time) error { |
||||
if err := f.datastore.GetQueries().AddToAcceptedActivities(context.Background(), sqlstorage.AddToAcceptedActivitiesParams{ |
||||
Iri: objectIRI, |
||||
Actor: actorIRI, |
||||
Type: eventType, |
||||
Timestamp: timestamp, |
||||
}); err != nil { |
||||
return errors.Wrap(err, "error saving event "+objectIRI) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// GetInboundActivities will return a collection of saved, federated activities
|
||||
// limited and offset by the values provided to support pagination.
|
||||
func (f *FederationRepository) GetInboundActivities(limit int, offset int) ([]models.FederatedActivity, int, error) { |
||||
ctx := context.Background() |
||||
rows, err := f.datastore.GetQueries().GetInboundActivitiesWithOffset(ctx, sqlstorage.GetInboundActivitiesWithOffsetParams{ |
||||
Limit: int32(limit), |
||||
Offset: int32(offset), |
||||
}) |
||||
if err != nil { |
||||
return nil, 0, err |
||||
} |
||||
|
||||
activities := make([]models.FederatedActivity, 0) |
||||
|
||||
total, err := f.datastore.GetQueries().GetInboundActivityCount(context.Background()) |
||||
if err != nil { |
||||
return nil, 0, errors.Wrap(err, "unable to fetch total activity count") |
||||
} |
||||
|
||||
for _, row := range rows { |
||||
singleActivity := models.FederatedActivity{ |
||||
IRI: row.Iri, |
||||
ActorIRI: row.Actor, |
||||
Type: row.Type, |
||||
Timestamp: row.Timestamp, |
||||
} |
||||
activities = append(activities, singleActivity) |
||||
} |
||||
|
||||
return activities, int(total), nil |
||||
} |
||||
|
||||
// HasPreviouslyHandledInboundActivity will return if we have previously handled
|
||||
// an inbound federated activity.
|
||||
func (f *FederationRepository) HasPreviouslyHandledInboundActivity(iri string, actorIRI string, eventType string) (bool, error) { |
||||
exists, err := f.datastore.GetQueries().DoesInboundActivityExist(context.Background(), sqlstorage.DoesInboundActivityExistParams{ |
||||
Iri: iri, |
||||
Actor: actorIRI, |
||||
Type: eventType, |
||||
}) |
||||
if err != nil { |
||||
return false, err |
||||
} |
||||
|
||||
return exists > 0, nil |
||||
} |
@ -0,0 +1,57 @@
@@ -0,0 +1,57 @@
|
||||
package notificationsrepository |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/owncast/owncast/storage/data" |
||||
"github.com/owncast/owncast/storage/sqlstorage" |
||||
"github.com/pkg/errors" |
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
type NotificationsRepository interface{} |
||||
|
||||
type SqlNotificationsRepository struct { |
||||
datastore *data.Store |
||||
} |
||||
|
||||
func New(datastore *data.Store) *SqlNotificationsRepository { |
||||
return &SqlNotificationsRepository{datastore} |
||||
} |
||||
|
||||
var temporaryGlobalInstance *SqlNotificationsRepository |
||||
|
||||
func Get() *SqlNotificationsRepository { |
||||
if temporaryGlobalInstance == nil { |
||||
temporaryGlobalInstance = &SqlNotificationsRepository{} |
||||
} |
||||
return temporaryGlobalInstance |
||||
} |
||||
|
||||
// AddNotification saves a new user notification destination.
|
||||
func (r *SqlNotificationsRepository) AddNotification(channel, destination string) error { |
||||
return data.GetDatastore().GetQueries().AddNotification(context.Background(), sqlstorage.AddNotificationParams{ |
||||
Channel: channel, |
||||
Destination: destination, |
||||
}) |
||||
} |
||||
|
||||
// RemoveNotificationForChannel removes a notification destination.
|
||||
func (r *SqlNotificationsRepository) RemoveNotificationForChannel(channel, destination string) error { |
||||
log.Debugln("Removing notification for channel", channel) |
||||
return data.GetDatastore().GetQueries().RemoveNotificationDestinationForChannel(context.Background(), sqlstorage.RemoveNotificationDestinationForChannelParams{ |
||||
Channel: channel, |
||||
Destination: destination, |
||||
}) |
||||
} |
||||
|
||||
// GetNotificationDestinationsForChannel will return a collection of
|
||||
// destinations to notify for a given channel.
|
||||
func (r *SqlNotificationsRepository) GetNotificationDestinationsForChannel(channel string) ([]string, error) { |
||||
result, err := data.GetDatastore().GetQueries().GetNotificationDestinationsForChannel(context.Background(), channel) |
||||
if err != nil { |
||||
return nil, errors.Wrap(err, "unable to query notification destinations for channel "+channel) |
||||
} |
||||
|
||||
return result, nil |
||||
} |
@ -1,6 +1,14 @@
@@ -1,6 +1,14 @@
|
||||
# SQL Queries |
||||
# SQL Storage |
||||
|
||||
sqlc generates **type-safe code** from SQL. Here's how it works: |
||||
This package contains the base SQL schema, migrations and queries. It should not need to be imported by any package other than the datstore. |
||||
|
||||
## SQL Migrations |
||||
|
||||
Add migrations to `migrations.go` and use raw SQL make your required changes between schema versions. |
||||
|
||||
## SQL Queries |
||||
|
||||
_sqlc_ generates **type-safe code** from SQL. Here's how it works: |
||||
|
||||
1. You define the schema in `schema.sql`. |
||||
1. You write your queries in `query.sql` using regular SQL. |
@ -1,8 +1,8 @@
@@ -1,8 +1,8 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.15.0
|
||||
// sqlc v1.18.0
|
||||
|
||||
package db |
||||
package sqlstorage |
||||
|
||||
import ( |
||||
"context" |
@ -0,0 +1,102 @@
@@ -0,0 +1,102 @@
|
||||
package sqlstorage |
||||
|
||||
import ( |
||||
"database/sql" |
||||
"fmt" |
||||
"os" |
||||
"path/filepath" |
||||
"time" |
||||
|
||||
"github.com/owncast/owncast/services/config" |
||||
"github.com/owncast/owncast/utils" |
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
// InitializeDatabase will open the datastore and make it available.
|
||||
func InitializeDatabase(file string, schemaVersion int) (*sql.DB, error) { |
||||
// Allow support for in-memory databases for tests.
|
||||
|
||||
var db *sql.DB |
||||
|
||||
if file == ":memory:" { |
||||
inMemoryDb, err := sql.Open("sqlite3", file) |
||||
if err != nil { |
||||
log.Fatal(err.Error()) |
||||
} |
||||
db = inMemoryDb |
||||
} else { |
||||
// Create empty DB file if it doesn't exist.
|
||||
if !utils.DoesFileExists(file) { |
||||
log.Traceln("Creating new database at", file) |
||||
|
||||
_, err := os.Create(file) //nolint:gosec
|
||||
if err != nil { |
||||
log.Fatal(err.Error()) |
||||
} |
||||
} |
||||
|
||||
onDiskDb, err := sql.Open("sqlite3", fmt.Sprintf("file:%s?_cache_size=10000&cache=shared&_journal_mode=WAL", file)) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
db = onDiskDb |
||||
db.SetMaxOpenConns(1) |
||||
} |
||||
|
||||
// Some SQLite optimizations
|
||||
_, _ = db.Exec("pragma journal_mode = WAL") |
||||
_, _ = db.Exec("pragma synchronous = normal") |
||||
_, _ = db.Exec("pragma temp_store = memory") |
||||
_, _ = db.Exec("pragma wal_checkpoint(full)") |
||||
|
||||
if _, err := db.Exec(`CREATE TABLE IF NOT EXISTS config ( |
||||
"key" string NOT NULL PRIMARY KEY, |
||||
"value" TEXT |
||||
);`); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
CreateAllTables(db) |
||||
|
||||
var version int |
||||
err := db.QueryRow("SELECT value FROM config WHERE key='version'"). |
||||
Scan(&version) |
||||
if err != nil { |
||||
if err != sql.ErrNoRows { |
||||
return nil, err |
||||
} |
||||
|
||||
// fresh database: initialize it with the current schema version
|
||||
_, err := db.Exec("INSERT INTO config(key, value) VALUES(?, ?)", "version", schemaVersion) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
version = schemaVersion |
||||
} |
||||
|
||||
// is database from a newer Owncast version?
|
||||
if version > schemaVersion { |
||||
return nil, fmt.Errorf("incompatible database version %d (versions up to %d are supported)", |
||||
version, schemaVersion) |
||||
} |
||||
|
||||
// is database schema outdated?
|
||||
if version < schemaVersion { |
||||
migrations := NewSqlMigrations(db) |
||||
|
||||
if err := migrations.MigrateDatabaseSchema(db, version, schemaVersion); err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
|
||||
dbBackupTicker := time.NewTicker(1 * time.Hour) |
||||
go func() { |
||||
c := config.GetConfig() |
||||
backupFile := filepath.Join(c.BackupDirectory, "owncastdb.bak") |
||||
for range dbBackupTicker.C { |
||||
utils.Backup(db, backupFile) |
||||
} |
||||
}() |
||||
|
||||
return db, nil |
||||
} |
@ -1,8 +1,8 @@
@@ -1,8 +1,8 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.15.0
|
||||
// sqlc v1.18.0
|
||||
|
||||
package db |
||||
package sqlstorage |
||||
|
||||
import ( |
||||
"database/sql" |
@ -1,9 +1,9 @@
@@ -1,9 +1,9 @@
|
||||
// Code generated by sqlc. DO NOT EDIT.
|
||||
// versions:
|
||||
// sqlc v1.15.0
|
||||
// sqlc v1.18.0
|
||||
// source: query.sql
|
||||
|
||||
package db |
||||
package sqlstorage |
||||
|
||||
import ( |
||||
"context" |
@ -0,0 +1,20 @@
@@ -0,0 +1,20 @@
|
||||
package sqlstorage |
||||
|
||||
import ( |
||||
"database/sql" |
||||
|
||||
log "github.com/sirupsen/logrus" |
||||
) |
||||
|
||||
// MustExec will execute a SQL statement on a provided database instance.
|
||||
func MustExec(s string, db *sql.DB) { |
||||
stmt, err := db.Prepare(s) |
||||
if err != nil { |
||||
log.Panic(err) |
||||
} |
||||
defer stmt.Close() |
||||
_, err = stmt.Exec() |
||||
if err != nil { |
||||
log.Warnln(err) |
||||
} |
||||
} |
@ -0,0 +1,47 @@
@@ -0,0 +1,47 @@
|
||||
package userrepository |
||||
|
||||
import ( |
||||
"context" |
||||
"strings" |
||||
|
||||
"github.com/owncast/owncast/models" |
||||
"github.com/owncast/owncast/storage/sqlstorage" |
||||
) |
||||
|
||||
// AddAuth will add an external authentication token and type for a user.
|
||||
func (r *SqlUserRepository) AddAuth(userID, authToken string, authType models.AuthType) error { |
||||
return r.datastore.GetQueries().AddAuthForUser(context.Background(), sqlstorage.AddAuthForUserParams{ |
||||
UserID: userID, |
||||
Token: authToken, |
||||
Type: string(authType), |
||||
}) |
||||
} |
||||
|
||||
// GetUserByAuth will return an existing user given auth details if a user
|
||||
// has previously authenticated with that method.
|
||||
func (r *SqlUserRepository) GetUserByAuth(authToken string, authType models.AuthType) *models.User { |
||||
u, err := r.datastore.GetQueries().GetUserByAuth(context.Background(), sqlstorage.GetUserByAuthParams{ |
||||
Token: authToken, |
||||
Type: string(authType), |
||||
}) |
||||
if err != nil { |
||||
return nil |
||||
} |
||||
|
||||
var scopes []string |
||||
if u.Scopes.Valid { |
||||
scopes = strings.Split(u.Scopes.String, ",") |
||||
} |
||||
|
||||
return &models.User{ |
||||
ID: u.ID, |
||||
DisplayName: u.DisplayName, |
||||
DisplayColor: int(u.DisplayColor), |
||||
CreatedAt: u.CreatedAt.Time, |
||||
DisabledAt: &u.DisabledAt.Time, |
||||
PreviousNames: strings.Split(u.PreviousNames.String, ","), |
||||
NameChangedAt: &u.NamechangedAt.Time, |
||||
AuthenticatedAt: &u.AuthenticatedAt.Time, |
||||
Scopes: scopes, |
||||
} |
||||
} |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue