Browse Source

feat(video): first pass at adding clip functionality

pull/3395/head
Gabe Kangas 2 years ago
parent
commit
96fb5ebb82
No known key found for this signature in database
GPG Key ID: 4345B2060657F330
  1. 151
      controllers/clips.go
  2. 26
      core/data/replays.go
  3. 17
      db/models.go
  4. 31
      db/query.sql
  5. 236
      db/query.sql.go
  6. 22
      db/schema.sql
  7. 63
      models/flexibledate.go
  8. 42
      models/flexibledate_test.go
  9. 141
      replays/clips.go
  10. 23
      replays/hlsRecorder.go
  11. 1
      replays/hlsSegment.go
  12. 14
      replays/outputConfiguration.go
  13. 195
      replays/playlistGenerator.go
  14. 2
      replays/stream.go
  15. 142
      replays/streamClipPlaylistGenerator.go
  16. 129
      replays/streamReplayPlaylistGenerator.go
  17. 4
      router/router.go
  18. 21
      utils/utils.go

151
controllers/clips.go

@ -0,0 +1,151 @@
package controllers
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"strings"
"github.com/owncast/owncast/replays"
log "github.com/sirupsen/logrus"
)
// GetAllClips will return all clips that have been previously created.
func GetAllClips(w http.ResponseWriter, r *http.Request) {
clips, err := replays.GetAllClips()
if err != nil {
log.Errorln(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
WriteResponse(w, clips)
}
// AddClip will create a new clip for a given stream and time window.
func AddClip(w http.ResponseWriter, r *http.Request) {
type addClipRequest struct {
StreamId string `json:"streamId"`
ClipTitle string `json:"clipTitle"`
RelativeStartTimeSeconds float32 `json:"relativeStartTimeSeconds"`
RelativeEndTimeSeconds float32 `json:"relativeEndTimeSeconds"`
}
if r.Method != http.MethodPost {
BadRequestHandler(w, nil)
return
}
decoder := json.NewDecoder(r.Body)
var request addClipRequest
if request.RelativeEndTimeSeconds < request.RelativeStartTimeSeconds {
BadRequestHandler(w, errors.New("end time must be after start time"))
return
}
if err := decoder.Decode(&request); err != nil {
log.Errorln(err)
WriteSimpleResponse(w, false, "unable to create clip")
return
}
streamId := request.StreamId
clipTitle := request.ClipTitle
startTime := request.RelativeStartTimeSeconds
endTime := request.RelativeEndTimeSeconds
// Some validation
playlistGenerator := replays.NewPlaylistGenerator()
stream, err := playlistGenerator.GetStream(streamId)
if err != nil {
BadRequestHandler(w, errors.New("stream not found"))
return
}
if stream.StartTime.IsZero() {
BadRequestHandler(w, errors.New("stream start time not found"))
return
}
// Make sure the proposed clip start time and end time are within
// the start and end time of the stream.
finalSegment, err := replays.GetFinalSegmentForStream(streamId)
if err != nil {
InternalErrorHandler(w, err)
return
}
if finalSegment.RelativeTimestamp < startTime {
BadRequestHandler(w, errors.New("start time is after the known end of the stream"))
return
}
clipId, duration, err := replays.AddClipForStream(streamId, clipTitle, "", startTime, endTime)
if err != nil {
log.Errorln(err)
w.WriteHeader(http.StatusInternalServerError)
return
}
WriteSimpleResponse(w, true, "clip "+clipId+" created with duration of "+fmt.Sprint(duration)+" seconds")
}
// GetClip will return playable content for a given clip Id.
func GetClip(w http.ResponseWriter, r *http.Request) {
pathComponents := strings.Split(r.URL.Path, "/")
if len(pathComponents) == 3 {
// Return the master playlist for the requested stream
clipId := pathComponents[2]
getClipMasterPlaylist(clipId, w)
return
} else if len(pathComponents) == 4 {
// Return the media playlist for the requested stream and output config
clipId := pathComponents[2]
outputConfigId := pathComponents[3]
getClipMediaPlaylist(clipId, outputConfigId, w)
return
}
BadRequestHandler(w, nil)
}
// getReplayMasterPlaylist will return a complete replay of a stream
// as a HLS playlist.
func getClipMasterPlaylist(clipId string, w http.ResponseWriter) {
playlistGenerator := replays.NewPlaylistGenerator()
playlist, err := playlistGenerator.GenerateMasterPlaylistForClip(clipId)
if err != nil {
log.Println(err)
}
if playlist == nil {
w.WriteHeader(http.StatusNotFound)
return
}
w.Header().Add("Content-Type", "application/x-mpegURL")
if _, err := w.Write(playlist.Encode().Bytes()); err != nil {
log.Errorln(err)
return
}
}
// getClipMediaPlaylist will return media playlist for a given clip
// and stream output configuration.
func getClipMediaPlaylist(clipId, outputConfigId string, w http.ResponseWriter) {
playlistGenerator := replays.NewPlaylistGenerator()
playlist, err := playlistGenerator.GenerateMediaPlaylistForClipAndConfiguration(clipId, outputConfigId)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
return
}
w.Header().Add("Content-Type", "application/x-mpegURL")
if _, err := w.Write(playlist.Encode().Bytes()); err != nil {
log.Errorln(err)
return
}
}

26
core/data/recording.go → core/data/replays.go

@ -8,7 +8,8 @@ func createRecordingTables(db *sql.DB) {
"stream_id" string NOT NULL, "stream_id" string NOT NULL,
"output_configuration_id" string NOT NULL, "output_configuration_id" string NOT NULL,
"path" TEXT NOT NULL, "path" TEXT NOT NULL,
"timestamp" DATE DEFAULT CURRENT_TIMESTAMP NOT NULL, "relative_timestamp" REAL NOT NULL,
"timestamp" DATETIME,
PRIMARY KEY (id) PRIMARY KEY (id)
);CREATE INDEX video_segments_stream_id ON video_segments (stream_id);CREATE INDEX video_segments_stream_id_timestamp ON video_segments (stream_id,timestamp);` );CREATE INDEX video_segments_stream_id ON video_segments (stream_id);CREATE INDEX video_segments_stream_id_timestamp ON video_segments (stream_id,timestamp);`
@ -22,15 +23,15 @@ func createRecordingTables(db *sql.DB) {
"framerate" INTEGER NOT NULL, "framerate" INTEGER NOT NULL,
"resolution_width" INTEGER, "resolution_width" INTEGER,
"resolution_height" INTEGER, "resolution_height" INTEGER,
"timestamp" DATE DEFAULT CURRENT_TIMESTAMP NOT NULL, "timestamp" DATETIME,
PRIMARY KEY (id) PRIMARY KEY (id)
);CREATE INDEX video_segment_output_configuration_stream_id ON video_segment_output_configuration (stream_id);` );CREATE INDEX video_segment_output_configuration_stream_id ON video_segment_output_configuration (stream_id);`
createVideoStreamsTableSQL := `CREATE TABLE IF NOT EXISTS streams ( createVideoStreamsTableSQL := `CREATE TABLE IF NOT EXISTS streams (
"id" string NOT NULL, "id" string NOT NULL,
"stream_title" TEXT, "stream_title" TEXT,
"start_time" DATE NOT NULL, "start_time" DATETIME,
"end_time" DATE, "end_time" DATETIME,
PRIMARY KEY (id) PRIMARY KEY (id)
); );
CREATE INDEX streams_id ON streams (id); CREATE INDEX streams_id ON streams (id);
@ -38,7 +39,24 @@ func createRecordingTables(db *sql.DB) {
CREATE INDEX streams_start_end_time ON streams (start_time,end_time); CREATE INDEX streams_start_end_time ON streams (start_time,end_time);
` `
createClipsTableSQL := `CREATE TABLE IF NOT EXISTS replay_clips (
"id" string NOT NULL,
"stream_id" string NOT NULL,
"clipped_by" string,
"clip_title" TEXT,
"relative_start_time" REAL,
"relative_end_time" REAL,
"timestamp" DATETIME,
PRIMARY KEY (id),
FOREIGN KEY(stream_id) REFERENCES streams(id)
);
CREATE INDEX clip_id ON replay_clips (id);
CREATE INDEX clip_stream_id ON replay_clips (stream_id);
CREATE INDEX clip_start_end_time ON replay_clips (start_time,end_time);
`
MustExec(createSegmentsTableSQL, db) MustExec(createSegmentsTableSQL, db)
MustExec(createVideoOutputConfigsTableSQL, db) MustExec(createVideoOutputConfigsTableSQL, db)
MustExec(createVideoStreamsTableSQL, db) MustExec(createVideoStreamsTableSQL, db)
MustExec(createClipsTableSQL, db)
} }

17
db/models.go

@ -72,10 +72,20 @@ type Notification struct {
CreatedAt sql.NullTime CreatedAt sql.NullTime
} }
type ReplayClip struct {
ID string
StreamID string
ClippedBy sql.NullString
ClipTitle sql.NullString
RelativeStartTime sql.NullFloat64
RelativeEndTime sql.NullFloat64
Timestamp sql.NullTime
}
type Stream struct { type Stream struct {
ID string ID string
StreamTitle sql.NullString StreamTitle sql.NullString
StartTime time.Time StartTime sql.NullTime
EndTime sql.NullTime EndTime sql.NullTime
} }
@ -104,7 +114,8 @@ type VideoSegment struct {
StreamID string StreamID string
OutputConfigurationID string OutputConfigurationID string
Path string Path string
Timestamp time.Time RelativeTimestamp float32
Timestamp sql.NullTime
} }
type VideoSegmentOutputConfiguration struct { type VideoSegmentOutputConfiguration struct {
@ -117,5 +128,5 @@ type VideoSegmentOutputConfiguration struct {
Framerate int32 Framerate int32
ResolutionWidth sql.NullInt32 ResolutionWidth sql.NullInt32
ResolutionHeight sql.NullInt32 ResolutionHeight sql.NullInt32
Timestamp time.Time Timestamp sql.NullTime
} }

31
db/query.sql

@ -127,16 +127,39 @@ SELECT id, stream_id, variant_id, name, segment_duration, bitrate, framerate, re
SELECT id, stream_id, output_configuration_id, path, timestamp FROM video_segments WHERE output_configuration_id = $1 ORDER BY timestamp ASC; SELECT id, stream_id, output_configuration_id, path, timestamp FROM video_segments WHERE output_configuration_id = $1 ORDER BY timestamp ASC;
-- name: GetSegmentsForOutputIdAndWindow :many -- name: GetSegmentsForOutputIdAndWindow :many
SELECT id, stream_id, output_configuration_id, timestamp FROM video_segments WHERE output_configuration_id = $1 AND timestamp >= $2 AND timestamp <= $3 ORDER BY timestamp ASC; SELECT id, stream_id, output_configuration_id, path, relative_timestamp, timestamp FROM video_segments WHERE output_configuration_id = $1 AND (cast ( relative_timestamp as int ) - ( relative_timestamp < cast ( relative_timestamp as int ))) >= @start_seconds::REAL AND (cast ( relative_timestamp as int ) + ( relative_timestamp > cast ( relative_timestamp as int ))) <= @end_seconds::REAL ORDER BY relative_timestamp ASC;
-- name: InsertStream :exec -- name: InsertStream :exec
INSERT INTO streams (id, stream_title, start_time, end_time) VALUES($1, $2, $3, $4); INSERT INTO streams (id, stream_title, start_time, end_time) VALUES($1, $2, $3, $4);
-- name: InsertOutputConfiguration :exec -- name: InsertOutputConfiguration :exec
INSERT INTO video_segment_output_configuration (id, variant_id, stream_id, name, segment_duration, bitrate, framerate, resolution_width, resolution_height) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9); INSERT INTO video_segment_output_configuration (id, variant_id, stream_id, name, segment_duration, bitrate, framerate, resolution_width, resolution_height, timestamp) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10);
-- name: InsertSegment :exec -- name: InsertSegment :exec
INSERT INTO video_segments (id, stream_id, output_configuration_id, path) VALUES($1, $2, $3, $4); INSERT INTO video_segments (id, stream_id, output_configuration_id, path, relative_timestamp, timestamp) VALUES($1, $2, $3, $4, $5, $6);
-- name: SetStreamEnded :exec -- name: SetStreamEnded :exec
UPDATE streams SET end_time = CURRENT_TIMESTAMP WHERE id = $1; UPDATE streams SET end_time = $1 WHERE id = $2;
-- name: InsertClip :exec
INSERT INTO replay_clips (id, stream_id, clip_title, relative_start_time, relative_end_time, timestamp) VALUES($1, $2, $3, $4, $5, $6);
-- name: GetAllClips :many
SELECT rc.id AS id, rc.clip_title, rc.stream_id, rc.relative_start_time, rc.relative_end_time, (rc.relative_end_time - rc.relative_start_time) AS duration_seconds, rc.timestamp, s.stream_title AS stream_title
FROM replay_clips rc
JOIN streams s ON rc.stream_id = s.id
ORDER BY timestamp DESC;
-- name: GetAllClipsForStream :many
SELECT rc.id AS clip_id, rc.stream_id, rc.clipped_by, rc.clip_title, rc.relative_start_time, rc.relative_end_time, rc.timestamp,
s.id AS stream_id, s.stream_title AS stream_title
FROM replay_clips rc
JOIN streams s ON rc.stream_id = s.id
WHERE rc.stream_id = $1
ORDER BY timestamp DESC;
-- name: GetClip :one
SELECT id AS clip_id, stream_id, clipped_by, clip_title, timestamp AS clip_timestamp, relative_start_time, relative_end_time FROM replay_clips WHERE id = $1;
-- name: GetFinalSegmentForStream :one
SELECT id, stream_id, output_configuration_id, path, relative_timestamp, timestamp FROM video_segments WHERE stream_id = $1 ORDER BY relative_timestamp DESC LIMIT 1;

236
db/query.sql.go

@ -205,6 +205,139 @@ func (q *Queries) DoesInboundActivityExist(ctx context.Context, arg DoesInboundA
return count, err return count, err
} }
const getAllClips = `-- name: GetAllClips :many
SELECT rc.id AS id, rc.clip_title, rc.stream_id, rc.relative_start_time, rc.relative_end_time, (rc.relative_end_time - rc.relative_start_time) AS duration_seconds, rc.timestamp, s.stream_title AS stream_title
FROM replay_clips rc
JOIN streams s ON rc.stream_id = s.id
ORDER BY timestamp DESC
`
type GetAllClipsRow struct {
ID string
ClipTitle sql.NullString
StreamID string
RelativeStartTime sql.NullFloat64
RelativeEndTime sql.NullFloat64
DurationSeconds int32
Timestamp sql.NullTime
StreamTitle sql.NullString
}
func (q *Queries) GetAllClips(ctx context.Context) ([]GetAllClipsRow, error) {
rows, err := q.db.QueryContext(ctx, getAllClips)
if err != nil {
return nil, err
}
defer rows.Close()
var items []GetAllClipsRow
for rows.Next() {
var i GetAllClipsRow
if err := rows.Scan(
&i.ID,
&i.ClipTitle,
&i.StreamID,
&i.RelativeStartTime,
&i.RelativeEndTime,
&i.DurationSeconds,
&i.Timestamp,
&i.StreamTitle,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getAllClipsForStream = `-- name: GetAllClipsForStream :many
SELECT rc.id AS clip_id, rc.stream_id, rc.clipped_by, rc.clip_title, rc.relative_start_time, rc.relative_end_time, rc.timestamp,
s.id AS stream_id, s.stream_title AS stream_title
FROM replay_clips rc
JOIN streams s ON rc.stream_id = s.id
WHERE rc.stream_id = $1
ORDER BY timestamp DESC
`
type GetAllClipsForStreamRow struct {
ClipID string
StreamID string
ClippedBy sql.NullString
ClipTitle sql.NullString
RelativeStartTime sql.NullFloat64
RelativeEndTime sql.NullFloat64
Timestamp sql.NullTime
StreamID_2 string
StreamTitle sql.NullString
}
func (q *Queries) GetAllClipsForStream(ctx context.Context, streamID string) ([]GetAllClipsForStreamRow, error) {
rows, err := q.db.QueryContext(ctx, getAllClipsForStream, streamID)
if err != nil {
return nil, err
}
defer rows.Close()
var items []GetAllClipsForStreamRow
for rows.Next() {
var i GetAllClipsForStreamRow
if err := rows.Scan(
&i.ClipID,
&i.StreamID,
&i.ClippedBy,
&i.ClipTitle,
&i.RelativeStartTime,
&i.RelativeEndTime,
&i.Timestamp,
&i.StreamID_2,
&i.StreamTitle,
); err != nil {
return nil, err
}
items = append(items, i)
}
if err := rows.Close(); err != nil {
return nil, err
}
if err := rows.Err(); err != nil {
return nil, err
}
return items, nil
}
const getClip = `-- name: GetClip :one
SELECT id AS clip_id, stream_id, clipped_by, clip_title, timestamp AS clip_timestamp, relative_start_time, relative_end_time FROM replay_clips WHERE id = $1
`
type GetClipRow struct {
ClipID string
StreamID string
ClippedBy sql.NullString
ClipTitle sql.NullString
ClipTimestamp sql.NullTime
RelativeStartTime sql.NullFloat64
RelativeEndTime sql.NullFloat64
}
func (q *Queries) GetClip(ctx context.Context, id string) (GetClipRow, error) {
row := q.db.QueryRowContext(ctx, getClip, id)
var i GetClipRow
err := row.Scan(
&i.ClipID,
&i.StreamID,
&i.ClippedBy,
&i.ClipTitle,
&i.ClipTimestamp,
&i.RelativeStartTime,
&i.RelativeEndTime,
)
return i, err
}
const getFederationFollowerApprovalRequests = `-- name: GetFederationFollowerApprovalRequests :many const getFederationFollowerApprovalRequests = `-- name: GetFederationFollowerApprovalRequests :many
SELECT iri, inbox, name, username, image, created_at FROM ap_followers WHERE approved_at IS null AND disabled_at is null SELECT iri, inbox, name, username, image, created_at FROM ap_followers WHERE approved_at IS null AND disabled_at is null
` `
@ -296,6 +429,24 @@ func (q *Queries) GetFederationFollowersWithOffset(ctx context.Context, arg GetF
return items, nil return items, nil
} }
const getFinalSegmentForStream = `-- name: GetFinalSegmentForStream :one
SELECT id, stream_id, output_configuration_id, path, relative_timestamp, timestamp FROM video_segments WHERE stream_id = $1 ORDER BY relative_timestamp DESC LIMIT 1
`
func (q *Queries) GetFinalSegmentForStream(ctx context.Context, streamID string) (VideoSegment, error) {
row := q.db.QueryRowContext(ctx, getFinalSegmentForStream, streamID)
var i VideoSegment
err := row.Scan(
&i.ID,
&i.StreamID,
&i.OutputConfigurationID,
&i.Path,
&i.RelativeTimestamp,
&i.Timestamp,
)
return i, err
}
const getFollowerByIRI = `-- name: GetFollowerByIRI :one const getFollowerByIRI = `-- name: GetFollowerByIRI :one
SELECT iri, inbox, name, username, image, request, request_object, created_at, approved_at, disabled_at FROM ap_followers WHERE iri = $1 SELECT iri, inbox, name, username, image, request, request_object, created_at, approved_at, disabled_at FROM ap_followers WHERE iri = $1
` `
@ -670,15 +821,23 @@ const getSegmentsForOutputId = `-- name: GetSegmentsForOutputId :many
SELECT id, stream_id, output_configuration_id, path, timestamp FROM video_segments WHERE output_configuration_id = $1 ORDER BY timestamp ASC SELECT id, stream_id, output_configuration_id, path, timestamp FROM video_segments WHERE output_configuration_id = $1 ORDER BY timestamp ASC
` `
func (q *Queries) GetSegmentsForOutputId(ctx context.Context, outputConfigurationID string) ([]VideoSegment, error) { type GetSegmentsForOutputIdRow struct {
ID string
StreamID string
OutputConfigurationID string
Path string
Timestamp sql.NullTime
}
func (q *Queries) GetSegmentsForOutputId(ctx context.Context, outputConfigurationID string) ([]GetSegmentsForOutputIdRow, error) {
rows, err := q.db.QueryContext(ctx, getSegmentsForOutputId, outputConfigurationID) rows, err := q.db.QueryContext(ctx, getSegmentsForOutputId, outputConfigurationID)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer rows.Close() defer rows.Close()
var items []VideoSegment var items []GetSegmentsForOutputIdRow
for rows.Next() { for rows.Next() {
var i VideoSegment var i GetSegmentsForOutputIdRow
if err := rows.Scan( if err := rows.Scan(
&i.ID, &i.ID,
&i.StreamID, &i.StreamID,
@ -700,35 +859,30 @@ func (q *Queries) GetSegmentsForOutputId(ctx context.Context, outputConfiguratio
} }
const getSegmentsForOutputIdAndWindow = `-- name: GetSegmentsForOutputIdAndWindow :many const getSegmentsForOutputIdAndWindow = `-- name: GetSegmentsForOutputIdAndWindow :many
SELECT id, stream_id, output_configuration_id, timestamp FROM video_segments WHERE output_configuration_id = $1 AND timestamp >= $2 AND timestamp <= $3 ORDER BY timestamp ASC SELECT id, stream_id, output_configuration_id, path, relative_timestamp, timestamp FROM video_segments WHERE output_configuration_id = $1 AND (cast ( relative_timestamp as int ) - ( relative_timestamp < cast ( relative_timestamp as int ))) >= $2::REAL AND (cast ( relative_timestamp as int ) + ( relative_timestamp > cast ( relative_timestamp as int ))) <= $3::REAL ORDER BY relative_timestamp ASC
` `
type GetSegmentsForOutputIdAndWindowParams struct { type GetSegmentsForOutputIdAndWindowParams struct {
OutputConfigurationID string OutputConfigurationID string
Timestamp time.Time StartSeconds float32
Timestamp_2 time.Time EndSeconds float32
} }
type GetSegmentsForOutputIdAndWindowRow struct { func (q *Queries) GetSegmentsForOutputIdAndWindow(ctx context.Context, arg GetSegmentsForOutputIdAndWindowParams) ([]VideoSegment, error) {
ID string rows, err := q.db.QueryContext(ctx, getSegmentsForOutputIdAndWindow, arg.OutputConfigurationID, arg.StartSeconds, arg.EndSeconds)
StreamID string
OutputConfigurationID string
Timestamp time.Time
}
func (q *Queries) GetSegmentsForOutputIdAndWindow(ctx context.Context, arg GetSegmentsForOutputIdAndWindowParams) ([]GetSegmentsForOutputIdAndWindowRow, error) {
rows, err := q.db.QueryContext(ctx, getSegmentsForOutputIdAndWindow, arg.OutputConfigurationID, arg.Timestamp, arg.Timestamp_2)
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer rows.Close() defer rows.Close()
var items []GetSegmentsForOutputIdAndWindowRow var items []VideoSegment
for rows.Next() { for rows.Next() {
var i GetSegmentsForOutputIdAndWindowRow var i VideoSegment
if err := rows.Scan( if err := rows.Scan(
&i.ID, &i.ID,
&i.StreamID, &i.StreamID,
&i.OutputConfigurationID, &i.OutputConfigurationID,
&i.Path,
&i.RelativeTimestamp,
&i.Timestamp, &i.Timestamp,
); err != nil { ); err != nil {
return nil, err return nil, err
@ -876,8 +1030,33 @@ func (q *Queries) GetUserDisplayNameByToken(ctx context.Context, token string) (
return display_name, err return display_name, err
} }
const insertClip = `-- name: InsertClip :exec
INSERT INTO replay_clips (id, stream_id, clip_title, relative_start_time, relative_end_time, timestamp) VALUES($1, $2, $3, $4, $5, $6)
`
type InsertClipParams struct {
ID string
StreamID string
ClipTitle sql.NullString
RelativeStartTime sql.NullFloat64
RelativeEndTime sql.NullFloat64
Timestamp sql.NullTime
}
func (q *Queries) InsertClip(ctx context.Context, arg InsertClipParams) error {
_, err := q.db.ExecContext(ctx, insertClip,
arg.ID,
arg.StreamID,
arg.ClipTitle,
arg.RelativeStartTime,
arg.RelativeEndTime,
arg.Timestamp,
)
return err
}
const insertOutputConfiguration = `-- name: InsertOutputConfiguration :exec const insertOutputConfiguration = `-- name: InsertOutputConfiguration :exec
INSERT INTO video_segment_output_configuration (id, variant_id, stream_id, name, segment_duration, bitrate, framerate, resolution_width, resolution_height) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9) INSERT INTO video_segment_output_configuration (id, variant_id, stream_id, name, segment_duration, bitrate, framerate, resolution_width, resolution_height, timestamp) VALUES($1, $2, $3, $4, $5, $6, $7, $8, $9, $10)
` `
type InsertOutputConfigurationParams struct { type InsertOutputConfigurationParams struct {
@ -890,6 +1069,7 @@ type InsertOutputConfigurationParams struct {
Framerate int32 Framerate int32
ResolutionWidth sql.NullInt32 ResolutionWidth sql.NullInt32
ResolutionHeight sql.NullInt32 ResolutionHeight sql.NullInt32
Timestamp sql.NullTime
} }
func (q *Queries) InsertOutputConfiguration(ctx context.Context, arg InsertOutputConfigurationParams) error { func (q *Queries) InsertOutputConfiguration(ctx context.Context, arg InsertOutputConfigurationParams) error {
@ -903,12 +1083,13 @@ func (q *Queries) InsertOutputConfiguration(ctx context.Context, arg InsertOutpu
arg.Framerate, arg.Framerate,
arg.ResolutionWidth, arg.ResolutionWidth,
arg.ResolutionHeight, arg.ResolutionHeight,
arg.Timestamp,
) )
return err return err
} }
const insertSegment = `-- name: InsertSegment :exec const insertSegment = `-- name: InsertSegment :exec
INSERT INTO video_segments (id, stream_id, output_configuration_id, path) VALUES($1, $2, $3, $4) INSERT INTO video_segments (id, stream_id, output_configuration_id, path, relative_timestamp, timestamp) VALUES($1, $2, $3, $4, $5, $6)
` `
type InsertSegmentParams struct { type InsertSegmentParams struct {
@ -916,6 +1097,8 @@ type InsertSegmentParams struct {
StreamID string StreamID string
OutputConfigurationID string OutputConfigurationID string
Path string Path string
RelativeTimestamp float32
Timestamp sql.NullTime
} }
func (q *Queries) InsertSegment(ctx context.Context, arg InsertSegmentParams) error { func (q *Queries) InsertSegment(ctx context.Context, arg InsertSegmentParams) error {
@ -924,6 +1107,8 @@ func (q *Queries) InsertSegment(ctx context.Context, arg InsertSegmentParams) er
arg.StreamID, arg.StreamID,
arg.OutputConfigurationID, arg.OutputConfigurationID,
arg.Path, arg.Path,
arg.RelativeTimestamp,
arg.Timestamp,
) )
return err return err
} }
@ -935,7 +1120,7 @@ INSERT INTO streams (id, stream_title, start_time, end_time) VALUES($1, $2, $3,
type InsertStreamParams struct { type InsertStreamParams struct {
ID string ID string
StreamTitle sql.NullString StreamTitle sql.NullString
StartTime time.Time StartTime sql.NullTime
EndTime sql.NullTime EndTime sql.NullTime
} }
@ -1032,11 +1217,16 @@ func (q *Queries) SetAccessTokenToOwner(ctx context.Context, arg SetAccessTokenT
} }
const setStreamEnded = `-- name: SetStreamEnded :exec const setStreamEnded = `-- name: SetStreamEnded :exec
UPDATE streams SET end_time = CURRENT_TIMESTAMP WHERE id = $1 UPDATE streams SET end_time = $1 WHERE id = $2
` `
func (q *Queries) SetStreamEnded(ctx context.Context, id string) error { type SetStreamEndedParams struct {
_, err := q.db.ExecContext(ctx, setStreamEnded, id) EndTime sql.NullTime
ID string
}
func (q *Queries) SetStreamEnded(ctx context.Context, arg SetStreamEndedParams) error {
_, err := q.db.ExecContext(ctx, setStreamEnded, arg.EndTime, arg.ID)
return err return err
} }

22
db/schema.sql

@ -102,7 +102,7 @@ CREATE TABLE IF NOT EXISTS messages (
CREATE TABLE IF NOT EXISTS streams ( CREATE TABLE IF NOT EXISTS streams (
"id" string NOT NULL PRIMARY KEY, "id" string NOT NULL PRIMARY KEY,
"stream_title" TEXT, "stream_title" TEXT,
"start_time" DATE NOT NULL, "start_time" DATE,
"end_time" DATE, "end_time" DATE,
PRIMARY KEY (id) PRIMARY KEY (id)
); );
@ -121,7 +121,7 @@ CREATE TABLE IF NOT EXISTS video_segment_output_configuration (
"framerate" INTEGER NOT NULL, "framerate" INTEGER NOT NULL,
"resolution_width" INTEGER, "resolution_width" INTEGER,
"resolution_height" INTEGER, "resolution_height" INTEGER,
"timestamp" DATE DEFAULT CURRENT_TIMESTAMP NOT NULL, "timestamp" DATE,
PRIMARY KEY (id) PRIMARY KEY (id)
); );
CREATE INDEX video_segment_output_configuration_stream_id ON video_segment_output_configuration (stream_id); CREATE INDEX video_segment_output_configuration_stream_id ON video_segment_output_configuration (stream_id);
@ -133,9 +133,25 @@ CREATE TABLE IF NOT EXISTS video_segments (
"stream_id" string NOT NULL, "stream_id" string NOT NULL,
"output_configuration_id" string NOT NULL, "output_configuration_id" string NOT NULL,
"path" TEXT NOT NULL, "path" TEXT NOT NULL,
"timestamp" DATE DEFAULT CURRENT_TIMESTAMP NOT NULL, "relative_timestamp" REAL NOT NULL,
"timestamp" DATE,
PRIMARY KEY (id) PRIMARY KEY (id)
); );
CREATE INDEX video_segments_stream_id ON video_segments (stream_id); CREATE INDEX video_segments_stream_id ON video_segments (stream_id);
CREATE INDEX video_segments_stream_id_timestamp ON video_segments (stream_id,timestamp); CREATE INDEX video_segments_stream_id_timestamp ON video_segments (stream_id,timestamp);
-- Record the details of a replayable clip.
CREATE TABLE IF NOT EXISTS replay_clips (
"id" string NOT NULL,
"stream_id" string NOT NULL,
"clipped_by" string,
"clip_title" TEXT,
"relative_start_time" REAL,
"relative_end_time" REAL,
"timestamp" DATE,
PRIMARY KEY (id),
FOREIGN KEY(stream_id) REFERENCES streams(id)
);
CREATE INDEX clip_id ON replay_clips (id);
CREATE INDEX clip_stream_id ON replay_clips (stream_id);
CREATE INDEX clip_start_end_time ON replay_clips (start_time,end_time);

63
models/flexibledate.go

@ -0,0 +1,63 @@
package models
import (
"database/sql"
"errors"
"time"
)
type FlexibleDate struct {
time.Time
}
func (self *FlexibleDate) UnmarshalJSON(b []byte) (err error) {
s := string(b)
// Get rid of the quotes "" around the value.
s = s[1 : len(s)-1]
result, err := FlexibleDateParse(s)
if err != nil {
return err
}
self.Time = result
return
}
// FlexibleDateParse is a convinience function to parse a date that could be
// a string, a time.Time, or a sql.NullTime.
func FlexibleDateParse(date interface{}) (time.Time, error) {
// If it's within a sql.NullTime wrapper, return the time from that.
nulltime, ok := date.(sql.NullTime)
if ok {
return nulltime.Time, nil
}
// Parse as string
datestring, ok := date.(string)
if ok {
t, err := time.Parse(time.RFC3339Nano, datestring)
if err == nil {
return t, nil
}
t, err = time.Parse("2006-01-02T15:04:05.999999999Z0700", datestring)
if err == nil {
return t, nil
}
t, err = time.Parse("2006-01-02 15:04:05.999999999-07:00", datestring)
if err == nil {
return t, nil
}
}
dateobject, ok := date.(time.Time)
if ok {
return dateobject, nil
}
return time.Time{}, errors.New("unable to parse date")
}

42
models/flexibledate_test.go

@ -0,0 +1,42 @@
package models
import (
"database/sql"
"encoding/json"
"testing"
"time"
)
func TestFlexibleDateParsing(t *testing.T) {
type testJson struct {
Testdate FlexibleDate `json:"testdate"`
}
nullTime := sql.NullTime{Time: time.Unix(1591614434, 0), Valid: true}
testNullTime, err := FlexibleDateParse(nullTime)
if err != nil {
t.Error(err)
}
if testNullTime.Unix() != nullTime.Time.Unix() {
t.Errorf("Expected %d but got %d", nullTime.Time.Unix(), testNullTime.Unix())
}
testStrings := map[string]time.Time{
"2023-08-10 17:40:15.376736475-07:00": time.Unix(1691714415, 0),
}
for testString, expectedTime := range testStrings {
testJsonString := `{"testdate":"` + testString + `"}`
response := testJson{}
err := json.Unmarshal([]byte(testJsonString), &response)
if err != nil {
t.Error(err)
}
if response.Testdate.Time.Unix() != expectedTime.Unix() {
t.Errorf("Expected %d but got %d", expectedTime.Unix(), response.Testdate.Time.Unix())
}
}
}

141
replays/clips.go

@ -0,0 +1,141 @@
package replays
import (
"context"
"database/sql"
"fmt"
"time"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/db"
"github.com/owncast/owncast/utils"
"github.com/pkg/errors"
"github.com/teris-io/shortid"
)
// Clip represents a clip that has been created from a stream.
// A clip is a subset of a stream that has has start and end seconds
// relative to the start of the stream.
type Clip struct {
ID string `json:"id"`
StreamId string `json:"stream_id"`
ClippedBy string `json:"clipped_by,omitempty"`
ClipTitle string `json:"title,omitempty"`
StreamTitle string `json:"stream_title,omitempty"`
RelativeStartTime float32 `json:"relativeStartTime"`
RelativeEndTime float32 `json:"relativeEndTime"`
DurationSeconds int `json:"durationSeconds"`
Manifest string `json:"manifest,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
// GetClips will return all clips that have been recorded.
func GetAllClips() ([]*Clip, error) {
clips, err := data.GetDatastore().GetQueries().GetAllClips(context.Background())
if err != nil {
return nil, errors.WithMessage(err, "failure to get clips")
}
response := []*Clip{}
for _, clip := range clips {
s := Clip{
ID: clip.ID,
ClipTitle: clip.ClipTitle.String,
StreamId: clip.StreamID,
StreamTitle: clip.StreamTitle.String,
RelativeStartTime: float32(clip.RelativeStartTime.Float64),
RelativeEndTime: float32(clip.RelativeEndTime.Float64),
DurationSeconds: int(clip.DurationSeconds),
Timestamp: clip.Timestamp.Time,
Manifest: fmt.Sprintf("/clip/%s", clip.ID),
}
response = append(response, &s)
}
return response, nil
}
// GetAllClipsForStream will return all clips that have been recorded for a stream.
func GetAllClipsForStream(streamId string) ([]*Clip, error) {
clips, err := data.GetDatastore().GetQueries().GetAllClipsForStream(context.Background(), streamId)
if err != nil {
return nil, errors.WithMessage(err, "failure to get clips")
}
response := []*Clip{}
for _, clip := range clips {
s := Clip{
ID: clip.ClipID,
ClipTitle: clip.ClipTitle.String,
StreamTitle: clip.StreamTitle.String,
RelativeStartTime: float32(clip.RelativeStartTime.Float64),
RelativeEndTime: float32(clip.RelativeEndTime.Float64),
Timestamp: clip.Timestamp.Time,
Manifest: fmt.Sprintf("/clips/%s", clip.ClipID),
}
response = append(response, &s)
}
return response, nil
}
// AddClipForStream will save a new clip for a stream.
func AddClipForStream(streamId, clipTitle, clippedBy string, relativeStartTimeSeconds, relativeEndTimeSeconds float32) (string, int, error) {
playlistGenerator := NewPlaylistGenerator()
// Verify this stream exists
if _, err := playlistGenerator.GetStream(streamId); err != nil {
return "", 0, errors.WithMessage(err, "stream not found")
}
// Verify this stream has at least one output configuration.
configs, err := playlistGenerator.GetConfigurationsForStream(streamId)
if err != nil {
return "", 0, errors.WithMessage(err, "unable to get configurations for stream")
}
if len(configs) == 0 {
return "", 0, errors.New("no configurations found for stream")
}
// We want the start and end seconds to be aligned to the segment so
// round up and down the values to get a fully inclusive segment range.
config := configs[0]
segmentDuration := int(config.SegmentDuration)
updatedRelativeStartTimeSeconds := utils.RoundDownToNearest(relativeStartTimeSeconds, segmentDuration)
updatedRelativeEndTimeSeconds := utils.RoundUpToNearest(relativeEndTimeSeconds, segmentDuration)
clipId := shortid.MustGenerate()
duration := updatedRelativeEndTimeSeconds - updatedRelativeStartTimeSeconds
err = data.GetDatastore().GetQueries().InsertClip(context.Background(), db.InsertClipParams{
ID: clipId,
StreamID: streamId,
ClipTitle: sql.NullString{String: clipTitle, Valid: clipTitle != ""},
RelativeStartTime: sql.NullFloat64{Float64: float64(updatedRelativeStartTimeSeconds), Valid: true},
RelativeEndTime: sql.NullFloat64{Float64: float64(updatedRelativeEndTimeSeconds), Valid: true},
Timestamp: sql.NullTime{Time: time.Now(), Valid: true},
})
if err != nil {
return "", 0, errors.WithMessage(err, "failure to add clip")
}
return clipId, duration, nil
}
// GetFinalSegmentForStream will return the final known segment for a stream.
func GetFinalSegmentForStream(streamId string) (*HLSSegment, error) {
segmentResponse, err := data.GetDatastore().GetQueries().GetFinalSegmentForStream(context.Background(), streamId)
if err != nil {
return nil, errors.Wrap(err, "unable to get final segment for stream")
}
segment := HLSSegment{
ID: segmentResponse.ID,
StreamID: segmentResponse.StreamID,
OutputConfigurationID: segmentResponse.OutputConfigurationID,
Path: segmentResponse.Path,
RelativeTimestamp: segmentResponse.RelativeTimestamp,
Timestamp: segmentResponse.Timestamp.Time,
}
return &segment, nil
}

23
replays/hlsRecorder.go

@ -46,7 +46,7 @@ func NewRecording(streamID string) *HLSRecorder {
if err := h.datastore.GetQueries().InsertStream(context.Background(), db.InsertStreamParams{ if err := h.datastore.GetQueries().InsertStream(context.Background(), db.InsertStreamParams{
ID: streamID, ID: streamID,
StartTime: h.startTime, StartTime: sql.NullTime{Time: h.startTime, Valid: true},
StreamTitle: sql.NullString{String: streamTitle, Valid: validTitle}, StreamTitle: sql.NullString{String: streamTitle, Valid: validTitle},
}); err != nil { }); err != nil {
log.Panicln(err) log.Panicln(err)
@ -66,6 +66,7 @@ func NewRecording(streamID string) *HLSRecorder {
Framerate: int32(o.Framerate), Framerate: int32(o.Framerate),
ResolutionWidth: sql.NullInt32{Int32: int32(o.ScaledWidth), Valid: true}, ResolutionWidth: sql.NullInt32{Int32: int32(o.ScaledWidth), Valid: true},
ResolutionHeight: sql.NullInt32{Int32: int32(o.ScaledHeight), Valid: true}, ResolutionHeight: sql.NullInt32{Int32: int32(o.ScaledHeight), Valid: true},
Timestamp: sql.NullTime{Time: time.Now(), Valid: true},
}); err != nil { }); err != nil {
log.Panicln(err) log.Panicln(err)
} }
@ -104,18 +105,15 @@ func (h *HLSRecorder) SegmentWritten(path string) {
} }
p := strings.ReplaceAll(path, "data/", "") p := strings.ReplaceAll(path, "data/", "")
relativeTimestamp := time.Since(h.startTime)
segment := HLSSegment{
ID: shortid.MustGenerate(),
StreamID: h.streamID,
Path: p,
}
if err := h.datastore.GetQueries().InsertSegment(context.Background(), db.InsertSegmentParams{ if err := h.datastore.GetQueries().InsertSegment(context.Background(), db.InsertSegmentParams{
ID: segment.ID, ID: shortid.MustGenerate(),
StreamID: segment.StreamID, StreamID: h.streamID,
OutputConfigurationID: h.outputConfigurations[outputConfigurationIndex].ID, OutputConfigurationID: h.outputConfigurations[outputConfigurationIndex].ID,
Path: segment.Path, Path: p,
RelativeTimestamp: float32(relativeTimestamp.Seconds()),
Timestamp: sql.NullTime{Time: time.Now(), Valid: true},
}); err != nil { }); err != nil {
log.Errorln(err) log.Errorln(err)
} }
@ -124,7 +122,10 @@ func (h *HLSRecorder) SegmentWritten(path string) {
// StreamEnded is called when a stream is ended so the end time can be noted // StreamEnded is called when a stream is ended so the end time can be noted
// in the stream's metadata. // in the stream's metadata.
func (h *HLSRecorder) StreamEnded() { func (h *HLSRecorder) StreamEnded() {
if err := h.datastore.GetQueries().SetStreamEnded(context.Background(), h.streamID); err != nil { if err := h.datastore.GetQueries().SetStreamEnded(context.Background(), db.SetStreamEndedParams{
ID: h.streamID,
EndTime: sql.NullTime{Time: time.Now(), Valid: true},
}); err != nil {
log.Errorln(err) log.Errorln(err)
} }
} }

1
replays/hlsSegment.go

@ -7,6 +7,7 @@ type HLSSegment struct {
ID string ID string
StreamID string StreamID string
Timestamp time.Time Timestamp time.Time
RelativeTimestamp float32
OutputConfigurationID string OutputConfigurationID string
Path string Path string
} }

14
replays/outputConfiguration.go

@ -1,5 +1,7 @@
package replays package replays
import "github.com/pkg/errors"
type HLSOutputConfiguration struct { type HLSOutputConfiguration struct {
ID string ID string
StreamId string StreamId string
@ -11,3 +13,15 @@ type HLSOutputConfiguration struct {
Framerate int Framerate int
SegmentDuration float64 SegmentDuration float64
} }
func (config *HLSOutputConfiguration) Validate() error {
if config.VideoBitrate == 0 {
return errors.New("video bitrate is unavailable")
}
if config.Framerate == 0 {
return errors.New("video framerate is unavailable")
}
return nil
}

195
replays/playlistGenerator.go

@ -3,135 +3,12 @@ package replays
import ( import (
"context" "context"
"fmt" "fmt"
"strings"
"time" "time"
"github.com/grafov/m3u8" "github.com/grafov/m3u8"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/db"
"github.com/pkg/errors" "github.com/pkg/errors"
) )
/*
The PlaylistGenerator is responsible for creating the master and media
playlists, in order to replay a stream in whole, or part. It requires detailed
metadata about how the initial live stream was configured, as well as a
access to every segment that was created during the live stream.
*/
type PlaylistGenerator struct {
datastore *data.Datastore
}
func NewPlaylistGenerator() *PlaylistGenerator {
return &PlaylistGenerator{
datastore: data.GetDatastore(),
}
}
func (p *PlaylistGenerator) GenerateMasterPlaylistForStream(streamId string) (*m3u8.MasterPlaylist, error) {
// Determine the different output configurations for this stream.
configs, err := p.GetConfigurationsForStream(streamId)
if err != nil {
return nil, errors.Wrap(err, "failed to get configurations for stream")
}
// Create the master playlist that will hold the different media playlists.
masterPlaylist := p.createNewMasterPlaylist()
// Create the media playlists for each output configuration.
for _, config := range configs {
// Verify the validity of the configuration.
if config.VideoBitrate == 0 {
return nil, errors.New("video bitrate is unavailable")
}
if config.Framerate == 0 {
return nil, errors.New("video framerate is unavailable")
}
mediaPlaylist, err := p.GenerateMediaPlaylistForStreamAndConfiguration(streamId, config.ID)
if err != nil {
return nil, errors.Wrap(err, "failed to create media playlist")
}
// Append the media playlist to the master playlist.
params := m3u8.VariantParams{
ProgramId: 1,
Name: config.Name,
FrameRate: float64(config.Framerate),
Bandwidth: uint32(config.VideoBitrate * 1000),
// Match what is generated in our live playlists.
Codecs: "avc1.64001f,mp4a.40.2",
}
// If both the width and height are set then we can set that as
// the resolution in the media playlist.
if config.ScaledHeight > 0 && config.ScaledWidth > 0 {
params.Resolution = fmt.Sprintf("%dx%d", config.ScaledWidth, config.ScaledHeight)
}
// Add the media playlist to the master playlist.
publicPlaylistPath := strings.Join([]string{"/replay", streamId, config.ID}, "/")
masterPlaylist.Append(publicPlaylistPath, mediaPlaylist, params)
}
// Return the final master playlist that contains all the media playlists.
return masterPlaylist, nil
}
func (p *PlaylistGenerator) GenerateMediaPlaylistForStreamAndConfiguration(streamId, outputConfigurationId string) (*m3u8.MediaPlaylist, error) {
stream, err := p.GetStream(streamId)
if err != nil {
return nil, errors.Wrap(err, "failed to get stream")
}
config, err := p.GetOutputConfig(outputConfigurationId)
if err != nil {
return nil, errors.Wrap(err, "failed to get output configuration")
}
// Fetch all the segments for this configuration.
segments, err := p.GetAllSegmentsForOutputConfiguration(outputConfigurationId)
if err != nil {
return nil, errors.Wrap(err, "failed to get all segments for output configuration")
}
// Create the media playlist for this configuration and add the segments.
mediaPlaylist, err := p.createMediaPlaylistForConfigurationAndSegments(config, stream.StartTime, stream.InProgress, segments)
if err != nil {
return nil, errors.Wrap(err, "failed to create media playlist")
}
return mediaPlaylist, nil
}
func (p *PlaylistGenerator) GetStream(streamId string) (*Stream, error) {
stream, err := p.datastore.GetQueries().GetStreamById(context.Background(), streamId)
if stream.ID == "" {
return nil, errors.Wrap(err, "failed to get stream")
}
s := Stream{
ID: stream.ID,
Title: stream.StreamTitle.String,
StartTime: stream.StartTime,
EndTime: stream.EndTime.Time,
InProgress: !stream.EndTime.Valid,
}
return &s, nil
}
func (p *PlaylistGenerator) GetOutputConfig(outputConfigId string) (*HLSOutputConfiguration, error) {
config, err := p.datastore.GetQueries().GetOutputConfigurationForId(context.Background(), outputConfigId)
if err != nil {
return nil, errors.Wrap(err, "failed to get output configuration")
}
return createConfigFromConfigRow(config), nil
}
// GetConfigurationsForStream returns the output configurations for a given stream. // GetConfigurationsForStream returns the output configurations for a given stream.
func (p *PlaylistGenerator) GetConfigurationsForStream(streamId string) ([]*HLSOutputConfiguration, error) { func (p *PlaylistGenerator) GetConfigurationsForStream(streamId string) ([]*HLSOutputConfiguration, error) {
outputConfigRows, err := p.datastore.GetQueries().GetOutputConfigurationsForStreamId(context.Background(), streamId) outputConfigRows, err := p.datastore.GetQueries().GetOutputConfigurationsForStreamId(context.Background(), streamId)
@ -158,28 +35,6 @@ func (p *PlaylistGenerator) GetConfigurationsForStream(streamId string) ([]*HLSO
return outputConfigs, nil return outputConfigs, nil
} }
// GetAllSegmentsForOutputConfiguration returns all the segments for a given output config.
func (p *PlaylistGenerator) GetAllSegmentsForOutputConfiguration(outputId string) ([]HLSSegment, error) {
segmentRows, err := p.datastore.GetQueries().GetSegmentsForOutputId(context.Background(), outputId)
if err != nil {
return nil, errors.Wrap(err, "failed to get segments for output config")
}
segments := []HLSSegment{}
for _, row := range segmentRows {
segment := HLSSegment{
ID: row.ID,
StreamID: row.StreamID,
OutputConfigurationID: row.OutputConfigurationID,
Timestamp: row.Timestamp,
Path: row.Path,
}
segments = append(segments, segment)
}
return segments, nil
}
func (p *PlaylistGenerator) createMediaPlaylistForConfigurationAndSegments(configuration *HLSOutputConfiguration, startTime time.Time, inProgress bool, segments []HLSSegment) (*m3u8.MediaPlaylist, error) { func (p *PlaylistGenerator) createMediaPlaylistForConfigurationAndSegments(configuration *HLSOutputConfiguration, startTime time.Time, inProgress bool, segments []HLSSegment) (*m3u8.MediaPlaylist, error) {
playlistSize := len(segments) playlistSize := len(segments)
segmentDuration := configuration.SegmentDuration segmentDuration := configuration.SegmentDuration
@ -242,17 +97,43 @@ func (p *PlaylistGenerator) createNewMasterPlaylist() *m3u8.MasterPlaylist {
return playlist return playlist
} }
func createConfigFromConfigRow(row db.GetOutputConfigurationForIdRow) *HLSOutputConfiguration { // GetAllSegmentsForOutputConfiguration returns all the segments for a given output config.
config := HLSOutputConfiguration{ func (p *PlaylistGenerator) GetAllSegmentsForOutputConfiguration(outputId string) ([]HLSSegment, error) {
ID: row.ID, segmentRows, err := p.datastore.GetQueries().GetSegmentsForOutputId(context.Background(), outputId)
StreamId: row.StreamID, if err != nil {
VariantId: row.VariantID, return nil, errors.Wrap(err, "failed to get segments for output config")
Name: row.Name,
VideoBitrate: int(row.Bitrate),
Framerate: int(row.Framerate),
ScaledHeight: int(row.ResolutionWidth.Int32),
ScaledWidth: int(row.ResolutionHeight.Int32),
SegmentDuration: float64(row.SegmentDuration),
} }
return &config
segments := []HLSSegment{}
for _, row := range segmentRows {
segment := HLSSegment{
ID: row.ID,
StreamID: row.StreamID,
OutputConfigurationID: row.OutputConfigurationID,
Timestamp: row.Timestamp.Time,
Path: row.Path,
}
segments = append(segments, segment)
}
return segments, nil
}
func (p *PlaylistGenerator) getMediaPlaylistParamsForConfig(config *HLSOutputConfiguration) m3u8.VariantParams {
params := m3u8.VariantParams{
ProgramId: 1,
Name: config.Name,
FrameRate: float64(config.Framerate),
Bandwidth: uint32(config.VideoBitrate * 1000),
// Match what is generated in our live playlists.
Codecs: "avc1.64001f,mp4a.40.2",
}
// If both the width and height are set then we can set that as
// the resolution in the media playlist.
if config.ScaledHeight > 0 && config.ScaledWidth > 0 {
params.Resolution = fmt.Sprintf("%dx%d", config.ScaledWidth, config.ScaledHeight)
}
return params
} }

2
replays/stream.go

@ -30,7 +30,7 @@ func GetStreams() ([]*Stream, error) {
s := Stream{ s := Stream{
ID: stream.ID, ID: stream.ID,
Title: stream.StreamTitle.String, Title: stream.StreamTitle.String,
StartTime: stream.StartTime, StartTime: stream.StartTime.Time,
EndTime: stream.EndTime.Time, EndTime: stream.EndTime.Time,
InProgress: !stream.EndTime.Valid, InProgress: !stream.EndTime.Valid,
Manifest: fmt.Sprintf("/replay/%s", stream.ID), Manifest: fmt.Sprintf("/replay/%s", stream.ID),

142
replays/streamClipPlaylistGenerator.go

@ -0,0 +1,142 @@
package replays
import (
"context"
"strings"
"github.com/grafov/m3u8"
"github.com/owncast/owncast/db"
"github.com/owncast/owncast/models"
"github.com/pkg/errors"
)
// GenerateMasterPlaylistForClip returns a master playlist for a given clip Id.
// It includes references to the media playlists for each output configuration.
func (p *PlaylistGenerator) GenerateMasterPlaylistForClip(clipId string) (*m3u8.MasterPlaylist, error) {
clip, err := p.datastore.GetQueries().GetClip(context.Background(), clipId)
if err != nil {
return nil, errors.Wrap(err, "unable to fetch requested clip")
}
streamId := clip.StreamID
configs, err := p.GetConfigurationsForStream(streamId)
if err != nil {
return nil, errors.Wrap(err, "failed to get configurations for stream")
}
// Create the master playlist that will hold the different media playlists.
masterPlaylist := p.createNewMasterPlaylist()
// Create the media playlists for each output configuration.
for _, config := range configs {
// Verify the validity of the configuration.
if err := config.Validate(); err != nil {
return nil, errors.Wrap(err, "invalid output configuration")
}
mediaPlaylist, err := p.GenerateMediaPlaylistForClipAndConfiguration(clipId, config.ID)
if err != nil {
return nil, errors.Wrap(err, "failed to create clip media playlist")
}
// Append the media playlist to the master playlist.
params := p.getMediaPlaylistParamsForConfig(config)
// Add the media playlist to the master playlist.
publicPlaylistPath := strings.Join([]string{"/clip", clipId, config.ID}, "/")
masterPlaylist.Append(publicPlaylistPath, mediaPlaylist, params)
}
// Return the final master playlist that contains all the media playlists.
return masterPlaylist, nil
}
// GenerateMediaPlaylistForClipAndConfiguration returns a media playlist for a
// given clip Id and output configuration.
func (p *PlaylistGenerator) GenerateMediaPlaylistForClipAndConfiguration(clipId, outputConfigurationId string) (*m3u8.MediaPlaylist, error) {
clip, err := p.GetClip(clipId)
if err != nil {
return nil, errors.Wrap(err, "failed to get stream")
}
config, err := p.GetOutputConfig(outputConfigurationId)
if err != nil {
return nil, errors.Wrap(err, "failed to get output configuration")
}
clipStartSeconds := clip.RelativeStartTime
clipEndSeconds := clip.RelativeEndTime
// Fetch all the segments for this configuration.
segments, err := p.GetAllSegmentsForOutputConfigurationAndWindow(outputConfigurationId, clipStartSeconds, clipEndSeconds)
if err != nil {
return nil, errors.Wrap(err, "failed to get all clip segments for output configuration")
}
// Create the media playlist for this configuration and add the segments.
mediaPlaylist, err := p.createMediaPlaylistForConfigurationAndSegments(config, clip.Timestamp, false, segments)
if err != nil {
return nil, errors.Wrap(err, "failed to create clip media playlist")
}
return mediaPlaylist, nil
}
// GetClip returns a clip by its ID.
func (p *PlaylistGenerator) GetClip(clipId string) (*Clip, error) {
clip, err := p.datastore.GetQueries().GetClip(context.Background(), clipId)
if err != nil {
return nil, errors.Wrap(err, "failed to get clip")
}
if clip.ClipID == "" {
return nil, errors.Wrap(err, "failed to get clip")
}
if !clip.RelativeEndTime.Valid {
return nil, errors.Wrap(err, "failed to get clip")
}
timestamp, err := models.FlexibleDateParse(clip.ClipTimestamp)
if err != nil {
return nil, errors.Wrap(err, "failed to parse clip timestamp")
}
c := Clip{
ID: clip.ClipID,
StreamId: clip.StreamID,
ClipTitle: clip.ClipTitle.String,
RelativeStartTime: float32(clip.RelativeStartTime.Float64),
RelativeEndTime: float32(clip.RelativeEndTime.Float64),
Timestamp: timestamp,
}
return &c, nil
}
// GetAllSegmentsForOutputConfigurationAndWindow returns all the segments for a
// given output config and time window.
func (p *PlaylistGenerator) GetAllSegmentsForOutputConfigurationAndWindow(configId string, startSeconds, endSeconds float32) ([]HLSSegment, error) {
segmentRows, err := p.datastore.GetQueries().GetSegmentsForOutputIdAndWindow(context.Background(), db.GetSegmentsForOutputIdAndWindowParams{
OutputConfigurationID: configId,
StartSeconds: startSeconds,
EndSeconds: endSeconds,
})
if err != nil {
return nil, errors.Wrap(err, "failed to get clip segments for output config")
}
segments := []HLSSegment{}
for _, row := range segmentRows {
segment := HLSSegment{
ID: row.ID,
StreamID: row.StreamID,
OutputConfigurationID: row.OutputConfigurationID,
Timestamp: row.Timestamp.Time,
Path: row.Path,
}
segments = append(segments, segment)
}
return segments, nil
}

129
replays/streamReplayPlaylistGenerator.go

@ -0,0 +1,129 @@
package replays
import (
"context"
"strings"
"github.com/grafov/m3u8"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/db"
"github.com/pkg/errors"
)
/*
The PlaylistGenerator is responsible for creating the master and media
playlists, in order to replay a stream in whole, or part. It requires detailed
metadata about how the initial live stream was configured, as well as a
access to every segment that was created during the live stream.
*/
type PlaylistGenerator struct {
datastore *data.Datastore
}
func NewPlaylistGenerator() *PlaylistGenerator {
return &PlaylistGenerator{
datastore: data.GetDatastore(),
}
}
func (p *PlaylistGenerator) GenerateMasterPlaylistForStream(streamId string) (*m3u8.MasterPlaylist, error) {
// Determine the different output configurations for this stream.
configs, err := p.GetConfigurationsForStream(streamId)
if err != nil {
return nil, errors.Wrap(err, "failed to get configurations for stream")
}
// Create the master playlist that will hold the different media playlists.
masterPlaylist := p.createNewMasterPlaylist()
// Create the media playlists for each output configuration.
for _, config := range configs {
// Verify the validity of the configuration.
if err := config.Validate(); err != nil {
return nil, errors.Wrap(err, "invalid output configuration")
}
mediaPlaylist, err := p.GenerateMediaPlaylistForStreamAndConfiguration(streamId, config.ID)
if err != nil {
return nil, errors.Wrap(err, "failed to create media playlist")
}
// Append the media playlist to the master playlist.
params := p.getMediaPlaylistParamsForConfig(config)
// Add the media playlist to the master playlist.
publicPlaylistPath := strings.Join([]string{"/replay", streamId, config.ID}, "/")
masterPlaylist.Append(publicPlaylistPath, mediaPlaylist, params)
}
// Return the final master playlist that contains all the media playlists.
return masterPlaylist, nil
}
func (p *PlaylistGenerator) GenerateMediaPlaylistForStreamAndConfiguration(streamId, outputConfigurationId string) (*m3u8.MediaPlaylist, error) {
stream, err := p.GetStream(streamId)
if err != nil {
return nil, errors.Wrap(err, "failed to get stream")
}
config, err := p.GetOutputConfig(outputConfigurationId)
if err != nil {
return nil, errors.Wrap(err, "failed to get output configuration")
}
// Fetch all the segments for this configuration.
segments, err := p.GetAllSegmentsForOutputConfiguration(outputConfigurationId)
if err != nil {
return nil, errors.Wrap(err, "failed to get all segments for output configuration")
}
// Create the media playlist for this configuration and add the segments.
mediaPlaylist, err := p.createMediaPlaylistForConfigurationAndSegments(config, stream.StartTime, stream.InProgress, segments)
if err != nil {
return nil, errors.Wrap(err, "failed to create media playlist")
}
return mediaPlaylist, nil
}
func (p *PlaylistGenerator) GetStream(streamId string) (*Stream, error) {
stream, err := p.datastore.GetQueries().GetStreamById(context.Background(), streamId)
if stream.ID == "" {
return nil, errors.Wrap(err, "failed to get stream")
}
s := Stream{
ID: stream.ID,
Title: stream.StreamTitle.String,
StartTime: stream.StartTime.Time,
EndTime: stream.EndTime.Time,
InProgress: !stream.EndTime.Valid,
}
return &s, nil
}
func (p *PlaylistGenerator) GetOutputConfig(outputConfigId string) (*HLSOutputConfiguration, error) {
config, err := p.datastore.GetQueries().GetOutputConfigurationForId(context.Background(), outputConfigId)
if err != nil {
return nil, errors.Wrap(err, "failed to get output configuration")
}
return createConfigFromConfigRow(config), nil
}
func createConfigFromConfigRow(row db.GetOutputConfigurationForIdRow) *HLSOutputConfiguration {
config := HLSOutputConfiguration{
ID: row.ID,
StreamId: row.StreamID,
VariantId: row.VariantID,
Name: row.Name,
VideoBitrate: int(row.Bitrate),
Framerate: int(row.Framerate),
ScaledHeight: int(row.ResolutionWidth.Int32),
ScaledWidth: int(row.ResolutionHeight.Int32),
SegmentDuration: float64(row.SegmentDuration),
}
return &config
}

4
router/router.go

@ -397,6 +397,10 @@ func Start() error {
http.HandleFunc("/api/replays", controllers.GetReplays) http.HandleFunc("/api/replays", controllers.GetReplays)
http.HandleFunc("/replay/", controllers.GetReplay) http.HandleFunc("/replay/", controllers.GetReplay)
http.HandleFunc("/api/clips", controllers.GetAllClips)
http.HandleFunc("/api/clip", controllers.AddClip)
http.HandleFunc("/clip/", controllers.GetClip)
// ActivityPub has its own router // ActivityPub has its own router
activitypub.Start(data.GetDatastore()) activitypub.Start(data.GetDatastore())

21
utils/utils.go

@ -6,6 +6,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"math"
"math/rand" "math/rand"
"net/url" "net/url"
"os" "os"
@ -411,3 +412,23 @@ func DecodeBase64Image(url string) (bytes []byte, extension string, err error) {
return bytes, extension, nil return bytes, extension, nil
} }
// RoundUpToNearest rounds up to the nearest number divisible.
func RoundUpToNearest(x float32, to int) int {
xInt := int(math.Ceil(float64(x)))
if xInt%to == 0 {
return xInt
}
return xInt + to - xInt%to
}
// RoundDownToNearest rounds down to the nearest number divisible.
func RoundDownToNearest(x float32, to int) int {
xInt := int(math.Floor(float64(x)))
if xInt%to == 0 {
return xInt
}
return xInt - xInt%to
}

Loading…
Cancel
Save