From 73e58a7801c0300949960db3d444290fbb92fbfe Mon Sep 17 00:00:00 2001 From: Gabe Kangas Date: Fri, 26 Nov 2021 20:53:27 -0800 Subject: [PATCH] Refactor the offline clip handling. More stable, reduced function complexity. --- core/core.go | 27 ++++---- core/offlineState.go | 119 ++++++++++++++++++++++++++++++++++ core/streamState.go | 81 ++--------------------- core/transcoder/transcoder.go | 25 +++++-- 4 files changed, 158 insertions(+), 94 deletions(-) create mode 100644 core/offlineState.go diff --git a/core/core.go b/core/core.go index 5f1e37f56..4926eab61 100644 --- a/core/core.go +++ b/core/core.go @@ -1,7 +1,6 @@ package core import ( - "io" "os" "path" "path/filepath" @@ -16,7 +15,6 @@ import ( "github.com/owncast/owncast/core/user" "github.com/owncast/owncast/core/webhooks" "github.com/owncast/owncast/models" - "github.com/owncast/owncast/static" "github.com/owncast/owncast/utils" "github.com/owncast/owncast/yp" ) @@ -29,8 +27,10 @@ var ( _broadcaster *models.Broadcaster ) -var handler transcoder.HLSHandler -var fileWriter = transcoder.FileWriterReceiverService{} +var ( + handler transcoder.HLSHandler + fileWriter = transcoder.FileWriterReceiverService{} +) // Start starts up the core processing. func Start() error { @@ -95,23 +95,22 @@ func createInitialOfflineState() error { func transitionToOfflineVideoStreamContent() { log.Traceln("Firing transcoder with offline stream state") - r, w := io.Pipe() - _transcoder := transcoder.NewTranscoder() - _transcoder.SetInput("pipe:0") - _transcoder.SetStdin(r) _transcoder.SetIdentifier("offline") - go _transcoder.Start() + _transcoder.SetLatencyLevel(models.GetLatencyLevel(4)) + _transcoder.SetIsEvent(true) - d := static.GetOfflineSegment() - if _, err := w.Write(d); err != nil { - log.Errorln(err) + offlineFilePath, err := saveOfflineClipToDisk("offline.ts") + if err != nil { + log.Fatalln("unable to save offline clip:", err) } + _transcoder.SetInput(offlineFilePath) + go _transcoder.Start() + // Copy the logo to be the thumbnail logo := data.GetLogoPath() - err := utils.Copy(filepath.Join("data", logo), "webroot/thumbnail.jpg") - if err != nil { + if err = utils.Copy(filepath.Join("data", logo), "webroot/thumbnail.jpg"); err != nil { log.Warnln(err) } diff --git a/core/offlineState.go b/core/offlineState.go new file mode 100644 index 000000000..47ff34958 --- /dev/null +++ b/core/offlineState.go @@ -0,0 +1,119 @@ +package core + +import ( + "bufio" + "fmt" + "os" + "path/filepath" + + "github.com/grafov/m3u8" + "github.com/owncast/owncast/config" + "github.com/owncast/owncast/static" + "github.com/owncast/owncast/utils" + log "github.com/sirupsen/logrus" +) + +func appendOfflineToVariantPlaylist(index int, playlistFilePath string) { + f, err := os.OpenFile(playlistFilePath, os.O_CREATE|os.O_RDWR, os.ModePerm) //nolint + if err != nil { + log.Fatalln(err) + } + + playlist, _, err := m3u8.DecodeFrom(bufio.NewReader(f), true) + if err != nil { + log.Fatalln(err) + } + + if err := f.Close(); err != nil { + log.Errorln("error closing playlist file", err) + } + + variantPlaylist := playlist.(*m3u8.MediaPlaylist) + variantPlaylist.MediaType = m3u8.EVENT + + tmpFileName := fmt.Sprintf("tmp-stream-%d.m3u8", index) + atomicWriteTmpPlaylistFile, err := os.CreateTemp(os.TempDir(), tmpFileName) + if err != nil { + log.Errorln("error creating tmp playlist file to write to", playlistFilePath, err) + return + } + + if _, err := atomicWriteTmpPlaylistFile.Write(variantPlaylist.Encode().Bytes()); err != nil { + log.Errorln(err) + } + + // Manually add the offline clip to the end of the media playlist. + _, _ = atomicWriteTmpPlaylistFile.WriteString("#EXT-X-DISCONTINUITY\n") + // If "offline" content gets changed then change the duration below + _, _ = atomicWriteTmpPlaylistFile.WriteString("#EXTINF:8.000000,\n") + _, _ = atomicWriteTmpPlaylistFile.WriteString("offline.ts\n") + _, _ = atomicWriteTmpPlaylistFile.WriteString("#EXT-X-ENDLIST\n") + + if err := atomicWriteTmpPlaylistFile.Close(); err != nil { + log.Errorln(err) + } + + if err := utils.Move(atomicWriteTmpPlaylistFile.Name(), playlistFilePath); err != nil { + log.Errorln("error moving temp playlist to overwrite existing one", err) + } +} + +func makeVariantIndexOffline(index int, offlineFilePath string, offlineFilename string) { + playlistFilePath := fmt.Sprintf(filepath.Join(config.HLSStoragePath, "%d/stream.m3u8"), index) + segmentFilePath := fmt.Sprintf(filepath.Join(config.HLSStoragePath, "%d/%s"), index, offlineFilename) + + if err := utils.Copy(offlineFilePath, segmentFilePath); err != nil { + log.Warnln(err) + } + + if _, err := _storage.Save(segmentFilePath, 0); err != nil { + log.Warnln(err) + } + + if utils.DoesFileExists(playlistFilePath) { + appendOfflineToVariantPlaylist(index, playlistFilePath) + } else { + createEmptyOfflinePlaylist(playlistFilePath, offlineFilename) + } + if _, err := _storage.Save(playlistFilePath, 0); err != nil { + log.Warnln(err) + } +} + +func createEmptyOfflinePlaylist(playlistFilePath string, offlineFilename string) { + p, err := m3u8.NewMediaPlaylist(1, 1) + if err != nil { + log.Errorln(err) + } + + // If "offline" content gets changed then change the duration below + if err := p.Append(offlineFilename, 8.0, ""); err != nil { + log.Errorln(err) + } + + p.Close() + f, err := os.Create(playlistFilePath) + if err != nil { + log.Errorln(err) + } + defer f.Close() + if _, err := f.Write(p.Encode().Bytes()); err != nil { + log.Errorln(err) + } +} + +func saveOfflineClipToDisk(offlineFilename string) (string, error) { + offlineFileData := static.GetOfflineSegment() + offlineTmpFile, err := os.CreateTemp(os.TempDir(), offlineFilename) + if err != nil { + log.Errorln("unable to create temp file for offline video segment", err) + } + + if _, err = offlineTmpFile.Write(offlineFileData); err != nil { + return "", fmt.Errorf("unable to write offline segment to disk: %s", err) + } + + offlineFilePath := offlineTmpFile.Name() + + return offlineFilePath, nil +} diff --git a/core/streamState.go b/core/streamState.go index 6f057f95d..4b4d32db3 100644 --- a/core/streamState.go +++ b/core/streamState.go @@ -1,11 +1,7 @@ package core import ( - "bufio" - "fmt" "io" - "os" - "path/filepath" "time" log "github.com/sirupsen/logrus" @@ -17,10 +13,7 @@ import ( "github.com/owncast/owncast/core/transcoder" "github.com/owncast/owncast/core/webhooks" "github.com/owncast/owncast/models" - "github.com/owncast/owncast/static" "github.com/owncast/owncast/utils" - - "github.com/grafov/m3u8" ) // After the stream goes offline this timer fires a full cleanup after N min. @@ -85,19 +78,14 @@ func SetStreamAsDisconnected() { _stats.LastConnectTime = nil _broadcaster = nil - offlineFileData := static.GetOfflineSegment() offlineFilename := "offline.ts" - offlineTmpFile, err := os.CreateTemp(os.TempDir(), offlineFilename) - if err != nil { - log.Errorln("unable to create temp file for offline video segment") - } - if _, err = offlineTmpFile.Write(offlineFileData); err != nil { - log.Errorln("unable to write offline segment to disk", err) + offlineFilePath, err := saveOfflineClipToDisk(offlineFilename) + if err != nil { + log.Errorln(err) + return } - offlineFilePath := offlineTmpFile.Name() - transcoder.StopThumbnailGenerator() rtmp.Disconnect() @@ -111,69 +99,12 @@ func SetStreamAsDisconnected() { if _currentBroadcast == nil { stopOnlineCleanupTimer() transitionToOfflineVideoStreamContent() + log.Errorln("unexpected nil _currentBroadcast") return } for index := range _currentBroadcast.OutputSettings { - playlistFilePath := fmt.Sprintf(filepath.Join(config.HLSStoragePath, "%d/stream.m3u8"), index) - segmentFilePath := fmt.Sprintf(filepath.Join(config.HLSStoragePath, "%d/%s"), index, offlineFilename) - - if err := utils.Copy(offlineFilePath, segmentFilePath); err != nil { - log.Warnln(err) - } - if _, err := _storage.Save(segmentFilePath, 0); err != nil { - log.Warnln(err) - } - if utils.DoesFileExists(playlistFilePath) { - f, err := os.OpenFile(playlistFilePath, os.O_CREATE|os.O_RDWR, os.ModePerm) //nolint - if err != nil { - log.Errorln(err) - } - defer f.Close() - - playlist, _, err := m3u8.DecodeFrom(bufio.NewReader(f), true) - if err != nil { - log.Fatalln(err) - } - - variantPlaylist := playlist.(*m3u8.MediaPlaylist) - if len(variantPlaylist.Segments) > data.GetStreamLatencyLevel().SegmentCount { - variantPlaylist.Segments = variantPlaylist.Segments[:len(variantPlaylist.Segments)] - } - - if err := variantPlaylist.Append(offlineFilename, 8.0, ""); err != nil { - log.Fatalln(err) - } - if err := variantPlaylist.SetDiscontinuity(); err != nil { - log.Fatalln(err) - } - if _, err := f.WriteAt(variantPlaylist.Encode().Bytes(), 0); err != nil { - log.Errorln(err) - } - } else { - p, err := m3u8.NewMediaPlaylist(1, 1) - if err != nil { - log.Errorln(err) - } - - // If "offline" content gets changed then change the duration below - if err := p.Append(offlineFilename, 8.0, ""); err != nil { - log.Errorln(err) - } - - p.Close() - f, err := os.Create(playlistFilePath) - if err != nil { - log.Errorln(err) - } - defer f.Close() - if _, err := f.Write(p.Encode().Bytes()); err != nil { - log.Errorln(err) - } - } - if _, err := _storage.Save(playlistFilePath, 0); err != nil { - log.Warnln(err) - } + makeVariantIndexOffline(index, offlineFilePath, offlineFilename) } StartOfflineCleanupTimer() diff --git a/core/transcoder/transcoder.go b/core/transcoder/transcoder.go index 0c6f257a9..71544910d 100644 --- a/core/transcoder/transcoder.go +++ b/core/transcoder/transcoder.go @@ -35,6 +35,7 @@ type Transcoder struct { currentStreamOutputSettings []models.StreamOutputVariant currentLatencyLevel models.LatencyLevel + isEvent bool TranscoderCompleted func(error) } @@ -153,6 +154,16 @@ func (t *Transcoder) Start() { } } +// SetLatencyLevel will set the latency level for the instance of the transcoder. +func (t *Transcoder) SetLatencyLevel(level models.LatencyLevel) { + t.currentLatencyLevel = level +} + +// SetIsEvent will allow you to set a stream as an "event". +func (t *Transcoder) SetIsEvent(isEvent bool) { + t.isEvent = isEvent +} + func (t *Transcoder) getString() string { port := t.internalListenerPort localListenerAddress := "http://127.0.0.1:" + port @@ -170,6 +181,14 @@ func (t *Transcoder) getString() string { t.segmentIdentifier = shortid.MustGenerate() } + hlsEventString := "" + if t.isEvent { + hlsEventString = "-hls_playlist_type event" + } else { + // Don't let the transcoder close the playlist. We do it manually. + hlsOptionFlags = append(hlsOptionFlags, "omit_endlist") + } + hlsOptionsString := "" if len(hlsOptionFlags) > 0 { hlsOptionsString = "-hls_flags " + strings.Join(hlsOptionFlags, "+") @@ -191,6 +210,7 @@ func (t *Transcoder) getString() string { "-hls_time", strconv.Itoa(t.currentLatencyLevel.SecondsPerSegment), // Length of each segment "-hls_list_size", strconv.Itoa(t.currentLatencyLevel.SegmentCount), // Max # in variant playlist hlsOptionsString, + hlsEventString, "-segment_format_options", "mpegts_flags=mpegts_copyts=1", // Video settings @@ -411,11 +431,6 @@ func (t *Transcoder) SetOutputPath(output string) { t.segmentOutputPath = output } -// SetAppendToStream enables appending to the HLS stream instead of overwriting. -func (t *Transcoder) SetAppendToStream(append bool) { - t.appendToStream = append -} - // SetIdentifier enables appending a unique identifier to segment file name. func (t *Transcoder) SetIdentifier(output string) { t.segmentIdentifier = output