diff --git a/ErsatzTV.Application/Streaming/HlsSessionState.cs b/ErsatzTV.Application/Streaming/HlsSessionState.cs new file mode 100644 index 000000000..5014f464e --- /dev/null +++ b/ErsatzTV.Application/Streaming/HlsSessionState.cs @@ -0,0 +1,10 @@ +namespace ErsatzTV.Application.Streaming; + +public enum HlsSessionState +{ + SeekAndWorkAhead, + ZeroAndWorkAhead, + SeekAndRealtime, + ZeroAndRealtime, + PlayoutUpdated +} diff --git a/ErsatzTV.Application/Streaming/HlsSessionWorkAheadState.cs b/ErsatzTV.Application/Streaming/HlsSessionWorkAheadState.cs deleted file mode 100644 index 7a0ed49c8..000000000 --- a/ErsatzTV.Application/Streaming/HlsSessionWorkAheadState.cs +++ /dev/null @@ -1,8 +0,0 @@ -namespace ErsatzTV.Application.Streaming; - -public enum HlsSessionWorkAheadState -{ - MaxSpeed, - SeekAndRealtime, - RealtimeFromZero -} diff --git a/ErsatzTV.Application/Streaming/HlsSessionWorker.cs b/ErsatzTV.Application/Streaming/HlsSessionWorker.cs index 130e8d35e..cf01b0ff1 100644 --- a/ErsatzTV.Application/Streaming/HlsSessionWorker.cs +++ b/ErsatzTV.Application/Streaming/HlsSessionWorker.cs @@ -27,15 +27,13 @@ public class HlsSessionWorker : IHlsSessionWorker private readonly IServiceScopeFactory _serviceScopeFactory; private readonly object _sync = new(); private string _channelNumber; - private bool _firstProcess; private bool _hasWrittenSegments; private DateTimeOffset _lastAccess; private DateTimeOffset _lastDelete = DateTimeOffset.MinValue; - private bool _seekNextItem; private Option _targetFramerate; private Timer _timer; private DateTimeOffset _transcodedUntil; - private HlsSessionWorkAheadState _workAheadState; + private HlsSessionState _state; public HlsSessionWorker( IHlsPlaylistFilter hlsPlaylistFilter, @@ -104,9 +102,7 @@ public class HlsSessionWorker : IHlsSessionWorker public void PlayoutUpdated() { - _firstProcess = true; - _seekNextItem = true; - _workAheadState = HlsSessionWorkAheadState.SeekAndRealtime; + _state = HlsSessionState.PlayoutUpdated; } public async Task Run(string channelNumber, TimeSpan idleTimeout, CancellationToken incomingCancellationToken) @@ -148,12 +144,8 @@ public class HlsSessionWorker : IHlsSessionWorker _transcodedUntil = DateTimeOffset.Now; PlaylistStart = _transcodedUntil; - _firstProcess = true; - bool initialWorkAhead = Volatile.Read(ref _workAheadCount) < await GetWorkAheadLimit(); - _workAheadState = initialWorkAhead - ? HlsSessionWorkAheadState.MaxSpeed - : HlsSessionWorkAheadState.SeekAndRealtime; + _state = initialWorkAhead ? HlsSessionState.SeekAndWorkAhead : HlsSessionState.SeekAndRealtime; if (!await Transcode(!initialWorkAhead, cancellationToken)) { @@ -206,6 +198,42 @@ public class HlsSessionWorker : IHlsSessionWorker } } + private HlsSessionState NextState(HlsSessionState state, PlayoutItemProcessModel processModel) + { + bool isComplete = processModel?.IsComplete == true; + + HlsSessionState result = state switch + { + // playout updates should have the channel start over, transcode method will throttle if needed + HlsSessionState.PlayoutUpdated => HlsSessionState.SeekAndWorkAhead, + + // after seeking and NOT completing the item, seek again, transcode method will throttle if needed + HlsSessionState.SeekAndWorkAhead when !isComplete => HlsSessionState.SeekAndWorkAhead, + + // after seeking and completing the item, start at zero + HlsSessionState.SeekAndWorkAhead => HlsSessionState.ZeroAndWorkAhead, + + // after starting and zero and NOT completing the item, seek, transcode method will throttle if needed + HlsSessionState.ZeroAndWorkAhead when !isComplete => HlsSessionState.SeekAndWorkAhead, + + // after starting at zero and completing the item, start at zero again, transcode method will throttle if needed + HlsSessionState.ZeroAndWorkAhead => HlsSessionState.ZeroAndWorkAhead, + + // realtime will always complete items, so start next at zero + HlsSessionState.SeekAndRealtime => HlsSessionState.ZeroAndRealtime, + + // realtime will always complete items, so start next at zero + HlsSessionState.ZeroAndRealtime => HlsSessionState.ZeroAndRealtime, + + // this will never happen with the enum + _ => throw new InvalidOperationException() + }; + + _logger.LogDebug("HLS session state {Last} => {Next}", state, result); + + return result; + } + private async Task Transcode( bool realtime, CancellationToken cancellationToken) @@ -226,30 +254,34 @@ public class HlsSessionWorker : IHlsSessionWorker _channelNumber); } + // throttle to realtime if needed + if (realtime) + { + HlsSessionState nextState = _state switch + { + HlsSessionState.SeekAndWorkAhead => HlsSessionState.SeekAndRealtime, + HlsSessionState.ZeroAndWorkAhead => HlsSessionState.ZeroAndRealtime, + _ => _state + }; + + if (nextState != _state) + { + _logger.LogDebug("HLS session state throttling {Last} => {Next}", _state, nextState); + _state = nextState; + } + } + IMediator mediator = scope.ServiceProvider.GetRequiredService(); long ptsOffset = await GetPtsOffset(mediator, _channelNumber, cancellationToken); // _logger.LogInformation("PTS offset: {PtsOffset}", ptsOffset); - // this shouldn't happen, but respect realtime - if (realtime && _workAheadState is HlsSessionWorkAheadState.MaxSpeed) - { - _workAheadState = HlsSessionWorkAheadState.SeekAndRealtime; - } - - // this happens when we initially transcode (at max speed) insufficient content to work - // in realtime yet, so we need to reset to max speed - if (!realtime && _workAheadState is not HlsSessionWorkAheadState.MaxSpeed) - { - _workAheadState = HlsSessionWorkAheadState.MaxSpeed; - } - - _logger.LogInformation("Work ahead state: {State}", _workAheadState); + _logger.LogInformation("HLS session state: {State}", _state); - DateTimeOffset now = (_firstProcess || _workAheadState is HlsSessionWorkAheadState.MaxSpeed) + DateTimeOffset now = _state is HlsSessionState.SeekAndWorkAhead ? DateTimeOffset.Now - : _transcodedUntil.AddSeconds(_workAheadState is HlsSessionWorkAheadState.SeekAndRealtime ? 0 : 1); - bool startAtZero = _workAheadState is HlsSessionWorkAheadState.RealtimeFromZero && !_firstProcess; + : _transcodedUntil.AddSeconds(_state is HlsSessionState.SeekAndRealtime ? 0 : 1); + bool startAtZero = _state is HlsSessionState.ZeroAndWorkAhead or HlsSessionState.ZeroAndRealtime; var request = new GetPlayoutItemProcessByChannelNumber( _channelNumber, @@ -295,20 +327,7 @@ public class HlsSessionWorker : IHlsSessionWorker _logger.LogInformation("HLS process has completed for channel {Channel}", _channelNumber); _logger.LogDebug("Transcoded until: {Until}", processModel.Until); _transcodedUntil = processModel.Until; - _firstProcess = false; - if (_seekNextItem) - { - _firstProcess = true; - _seekNextItem = false; - } - - _workAheadState = _workAheadState switch - { - HlsSessionWorkAheadState.MaxSpeed => HlsSessionWorkAheadState.SeekAndRealtime, - HlsSessionWorkAheadState.SeekAndRealtime => HlsSessionWorkAheadState.RealtimeFromZero, - _ => _workAheadState - }; - + _state = NextState(_state, processModel); _hasWrittenSegments = true; return true; } @@ -353,12 +372,7 @@ public class HlsSessionWorker : IHlsSessionWorker if (commandResult.ExitCode == 0) { - _firstProcess = false; - if (_seekNextItem) - { - _firstProcess = true; - _seekNextItem = false; - } + _state = NextState(_state, null); _hasWrittenSegments = true; diff --git a/ErsatzTV.Application/Streaming/PlayoutItemProcessModel.cs b/ErsatzTV.Application/Streaming/PlayoutItemProcessModel.cs index 38abae52d..66db458b0 100644 --- a/ErsatzTV.Application/Streaming/PlayoutItemProcessModel.cs +++ b/ErsatzTV.Application/Streaming/PlayoutItemProcessModel.cs @@ -5,4 +5,5 @@ namespace ErsatzTV.Application.Streaming; public record PlayoutItemProcessModel( Command Process, Option MaybeDuration, - DateTimeOffset Until); + DateTimeOffset Until, + bool IsComplete); diff --git a/ErsatzTV.Application/Streaming/Queries/GetConcatProcessByChannelNumberHandler.cs b/ErsatzTV.Application/Streaming/Queries/GetConcatProcessByChannelNumberHandler.cs index 7864d5450..8ecc75418 100644 --- a/ErsatzTV.Application/Streaming/Queries/GetConcatProcessByChannelNumberHandler.cs +++ b/ErsatzTV.Application/Streaming/Queries/GetConcatProcessByChannelNumberHandler.cs @@ -37,6 +37,6 @@ public class GetConcatProcessByChannelNumberHandler : FFmpegProcessHandler.None, DateTimeOffset.MaxValue); + return new PlayoutItemProcessModel(process, Option.None, DateTimeOffset.MaxValue, true); } } diff --git a/ErsatzTV.Application/Streaming/Queries/GetErrorProcessHandler.cs b/ErsatzTV.Application/Streaming/Queries/GetErrorProcessHandler.cs index ea87a038e..97fefadca 100644 --- a/ErsatzTV.Application/Streaming/Queries/GetErrorProcessHandler.cs +++ b/ErsatzTV.Application/Streaming/Queries/GetErrorProcessHandler.cs @@ -36,6 +36,6 @@ public class GetErrorProcessHandler : FFmpegProcessHandler channel.FFmpegProfile.VaapiDevice, Optional(channel.FFmpegProfile.QsvExtraHardwareFrames)); - return new PlayoutItemProcessModel(process, request.MaybeDuration, request.Until); + return new PlayoutItemProcessModel(process, request.MaybeDuration, request.Until, true); } } diff --git a/ErsatzTV.Application/Streaming/Queries/GetPlayoutItemProcessByChannelNumberHandler.cs b/ErsatzTV.Application/Streaming/Queries/GetPlayoutItemProcessByChannelNumberHandler.cs index a80c3468d..aaea32783 100644 --- a/ErsatzTV.Application/Streaming/Queries/GetPlayoutItemProcessByChannelNumberHandler.cs +++ b/ErsatzTV.Application/Streaming/Queries/GetPlayoutItemProcessByChannelNumberHandler.cs @@ -178,11 +178,13 @@ public class GetPlayoutItemProcessByChannelNumberHandler : FFmpegProcessHandler< TimeSpan outPoint = playoutItemWithPath.PlayoutItem.OutPoint; DateTimeOffset effectiveNow = request.StartAtZero ? start : now; TimeSpan duration = finish - effectiveNow; + var isComplete = true; if (!request.HlsRealtime && duration > TimeSpan.FromMinutes(2)) { finish = effectiveNow + TimeSpan.FromMinutes(2); outPoint = finish - start + TimeSpan.FromMinutes(2); + isComplete = false; } Command process = await _ffmpegProcessService.ForPlayoutItem( @@ -216,7 +218,7 @@ public class GetPlayoutItemProcessByChannelNumberHandler : FFmpegProcessHandler< playoutItemWithPath.PlayoutItem.DisableWatermarks, _ => { }); - var result = new PlayoutItemProcessModel(process, duration, finish); + var result = new PlayoutItemProcessModel(process, duration, finish, isComplete); return Right(result); } @@ -253,7 +255,7 @@ public class GetPlayoutItemProcessByChannelNumberHandler : FFmpegProcessHandler< channel.FFmpegProfile.VaapiDevice, Optional(channel.FFmpegProfile.QsvExtraHardwareFrames)); - return new PlayoutItemProcessModel(offlineProcess, maybeDuration, finish); + return new PlayoutItemProcessModel(offlineProcess, maybeDuration, finish, true); case PlayoutItemDoesNotExistOnDisk: Command doesNotExistProcess = await _ffmpegProcessService.ForError( ffmpegPath, @@ -266,7 +268,7 @@ public class GetPlayoutItemProcessByChannelNumberHandler : FFmpegProcessHandler< channel.FFmpegProfile.VaapiDevice, Optional(channel.FFmpegProfile.QsvExtraHardwareFrames)); - return new PlayoutItemProcessModel(doesNotExistProcess, maybeDuration, finish); + return new PlayoutItemProcessModel(doesNotExistProcess, maybeDuration, finish, true); default: Command errorProcess = await _ffmpegProcessService.ForError( ffmpegPath, @@ -279,7 +281,7 @@ public class GetPlayoutItemProcessByChannelNumberHandler : FFmpegProcessHandler< channel.FFmpegProfile.VaapiDevice, Optional(channel.FFmpegProfile.QsvExtraHardwareFrames)); - return new PlayoutItemProcessModel(errorProcess, maybeDuration, finish); + return new PlayoutItemProcessModel(errorProcess, maybeDuration, finish, true); } } diff --git a/ErsatzTV.Application/Streaming/Queries/GetWrappedProcessByChannelNumberHandler.cs b/ErsatzTV.Application/Streaming/Queries/GetWrappedProcessByChannelNumberHandler.cs index 28efe1353..0c1c65810 100644 --- a/ErsatzTV.Application/Streaming/Queries/GetWrappedProcessByChannelNumberHandler.cs +++ b/ErsatzTV.Application/Streaming/Queries/GetWrappedProcessByChannelNumberHandler.cs @@ -37,6 +37,6 @@ public class GetWrappedProcessByChannelNumberHandler : FFmpegProcessHandler.None, DateTimeOffset.MaxValue); + return new PlayoutItemProcessModel(process, Option.None, DateTimeOffset.MaxValue, true); } }