Browse Source

rework hls session state (#1401)

pull/1402/head
Jason Dove 2 years ago committed by GitHub
parent
commit
225b95449c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      ErsatzTV.Application/Streaming/HlsSessionState.cs
  2. 8
      ErsatzTV.Application/Streaming/HlsSessionWorkAheadState.cs
  3. 110
      ErsatzTV.Application/Streaming/HlsSessionWorker.cs
  4. 3
      ErsatzTV.Application/Streaming/PlayoutItemProcessModel.cs
  5. 2
      ErsatzTV.Application/Streaming/Queries/GetConcatProcessByChannelNumberHandler.cs
  6. 2
      ErsatzTV.Application/Streaming/Queries/GetErrorProcessHandler.cs
  7. 10
      ErsatzTV.Application/Streaming/Queries/GetPlayoutItemProcessByChannelNumberHandler.cs
  8. 2
      ErsatzTV.Application/Streaming/Queries/GetWrappedProcessByChannelNumberHandler.cs

10
ErsatzTV.Application/Streaming/HlsSessionState.cs

@ -0,0 +1,10 @@
namespace ErsatzTV.Application.Streaming;
public enum HlsSessionState
{
SeekAndWorkAhead,
ZeroAndWorkAhead,
SeekAndRealtime,
ZeroAndRealtime,
PlayoutUpdated
}

8
ErsatzTV.Application/Streaming/HlsSessionWorkAheadState.cs

@ -1,8 +0,0 @@
namespace ErsatzTV.Application.Streaming;
public enum HlsSessionWorkAheadState
{
MaxSpeed,
SeekAndRealtime,
RealtimeFromZero
}

110
ErsatzTV.Application/Streaming/HlsSessionWorker.cs

@ -27,15 +27,13 @@ public class HlsSessionWorker : IHlsSessionWorker
private readonly IServiceScopeFactory _serviceScopeFactory; private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly object _sync = new(); private readonly object _sync = new();
private string _channelNumber; private string _channelNumber;
private bool _firstProcess;
private bool _hasWrittenSegments; private bool _hasWrittenSegments;
private DateTimeOffset _lastAccess; private DateTimeOffset _lastAccess;
private DateTimeOffset _lastDelete = DateTimeOffset.MinValue; private DateTimeOffset _lastDelete = DateTimeOffset.MinValue;
private bool _seekNextItem;
private Option<int> _targetFramerate; private Option<int> _targetFramerate;
private Timer _timer; private Timer _timer;
private DateTimeOffset _transcodedUntil; private DateTimeOffset _transcodedUntil;
private HlsSessionWorkAheadState _workAheadState; private HlsSessionState _state;
public HlsSessionWorker( public HlsSessionWorker(
IHlsPlaylistFilter hlsPlaylistFilter, IHlsPlaylistFilter hlsPlaylistFilter,
@ -104,9 +102,7 @@ public class HlsSessionWorker : IHlsSessionWorker
public void PlayoutUpdated() public void PlayoutUpdated()
{ {
_firstProcess = true; _state = HlsSessionState.PlayoutUpdated;
_seekNextItem = true;
_workAheadState = HlsSessionWorkAheadState.SeekAndRealtime;
} }
public async Task Run(string channelNumber, TimeSpan idleTimeout, CancellationToken incomingCancellationToken) public async Task Run(string channelNumber, TimeSpan idleTimeout, CancellationToken incomingCancellationToken)
@ -148,12 +144,8 @@ public class HlsSessionWorker : IHlsSessionWorker
_transcodedUntil = DateTimeOffset.Now; _transcodedUntil = DateTimeOffset.Now;
PlaylistStart = _transcodedUntil; PlaylistStart = _transcodedUntil;
_firstProcess = true;
bool initialWorkAhead = Volatile.Read(ref _workAheadCount) < await GetWorkAheadLimit(); bool initialWorkAhead = Volatile.Read(ref _workAheadCount) < await GetWorkAheadLimit();
_workAheadState = initialWorkAhead _state = initialWorkAhead ? HlsSessionState.SeekAndWorkAhead : HlsSessionState.SeekAndRealtime;
? HlsSessionWorkAheadState.MaxSpeed
: HlsSessionWorkAheadState.SeekAndRealtime;
if (!await Transcode(!initialWorkAhead, cancellationToken)) if (!await Transcode(!initialWorkAhead, cancellationToken))
{ {
@ -206,6 +198,42 @@ public class HlsSessionWorker : IHlsSessionWorker
} }
} }
private HlsSessionState NextState(HlsSessionState state, PlayoutItemProcessModel processModel)
{
bool isComplete = processModel?.IsComplete == true;
HlsSessionState result = state switch
{
// playout updates should have the channel start over, transcode method will throttle if needed
HlsSessionState.PlayoutUpdated => HlsSessionState.SeekAndWorkAhead,
// after seeking and NOT completing the item, seek again, transcode method will throttle if needed
HlsSessionState.SeekAndWorkAhead when !isComplete => HlsSessionState.SeekAndWorkAhead,
// after seeking and completing the item, start at zero
HlsSessionState.SeekAndWorkAhead => HlsSessionState.ZeroAndWorkAhead,
// after starting and zero and NOT completing the item, seek, transcode method will throttle if needed
HlsSessionState.ZeroAndWorkAhead when !isComplete => HlsSessionState.SeekAndWorkAhead,
// after starting at zero and completing the item, start at zero again, transcode method will throttle if needed
HlsSessionState.ZeroAndWorkAhead => HlsSessionState.ZeroAndWorkAhead,
// realtime will always complete items, so start next at zero
HlsSessionState.SeekAndRealtime => HlsSessionState.ZeroAndRealtime,
// realtime will always complete items, so start next at zero
HlsSessionState.ZeroAndRealtime => HlsSessionState.ZeroAndRealtime,
// this will never happen with the enum
_ => throw new InvalidOperationException()
};
_logger.LogDebug("HLS session state {Last} => {Next}", state, result);
return result;
}
private async Task<bool> Transcode( private async Task<bool> Transcode(
bool realtime, bool realtime,
CancellationToken cancellationToken) CancellationToken cancellationToken)
@ -226,30 +254,34 @@ public class HlsSessionWorker : IHlsSessionWorker
_channelNumber); _channelNumber);
} }
// throttle to realtime if needed
if (realtime)
{
HlsSessionState nextState = _state switch
{
HlsSessionState.SeekAndWorkAhead => HlsSessionState.SeekAndRealtime,
HlsSessionState.ZeroAndWorkAhead => HlsSessionState.ZeroAndRealtime,
_ => _state
};
if (nextState != _state)
{
_logger.LogDebug("HLS session state throttling {Last} => {Next}", _state, nextState);
_state = nextState;
}
}
IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>(); IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
long ptsOffset = await GetPtsOffset(mediator, _channelNumber, cancellationToken); long ptsOffset = await GetPtsOffset(mediator, _channelNumber, cancellationToken);
// _logger.LogInformation("PTS offset: {PtsOffset}", ptsOffset); // _logger.LogInformation("PTS offset: {PtsOffset}", ptsOffset);
// this shouldn't happen, but respect realtime _logger.LogInformation("HLS session state: {State}", _state);
if (realtime && _workAheadState is HlsSessionWorkAheadState.MaxSpeed)
{
_workAheadState = HlsSessionWorkAheadState.SeekAndRealtime;
}
// this happens when we initially transcode (at max speed) insufficient content to work
// in realtime yet, so we need to reset to max speed
if (!realtime && _workAheadState is not HlsSessionWorkAheadState.MaxSpeed)
{
_workAheadState = HlsSessionWorkAheadState.MaxSpeed;
}
_logger.LogInformation("Work ahead state: {State}", _workAheadState);
DateTimeOffset now = (_firstProcess || _workAheadState is HlsSessionWorkAheadState.MaxSpeed) DateTimeOffset now = _state is HlsSessionState.SeekAndWorkAhead
? DateTimeOffset.Now ? DateTimeOffset.Now
: _transcodedUntil.AddSeconds(_workAheadState is HlsSessionWorkAheadState.SeekAndRealtime ? 0 : 1); : _transcodedUntil.AddSeconds(_state is HlsSessionState.SeekAndRealtime ? 0 : 1);
bool startAtZero = _workAheadState is HlsSessionWorkAheadState.RealtimeFromZero && !_firstProcess; bool startAtZero = _state is HlsSessionState.ZeroAndWorkAhead or HlsSessionState.ZeroAndRealtime;
var request = new GetPlayoutItemProcessByChannelNumber( var request = new GetPlayoutItemProcessByChannelNumber(
_channelNumber, _channelNumber,
@ -295,20 +327,7 @@ public class HlsSessionWorker : IHlsSessionWorker
_logger.LogInformation("HLS process has completed for channel {Channel}", _channelNumber); _logger.LogInformation("HLS process has completed for channel {Channel}", _channelNumber);
_logger.LogDebug("Transcoded until: {Until}", processModel.Until); _logger.LogDebug("Transcoded until: {Until}", processModel.Until);
_transcodedUntil = processModel.Until; _transcodedUntil = processModel.Until;
_firstProcess = false; _state = NextState(_state, processModel);
if (_seekNextItem)
{
_firstProcess = true;
_seekNextItem = false;
}
_workAheadState = _workAheadState switch
{
HlsSessionWorkAheadState.MaxSpeed => HlsSessionWorkAheadState.SeekAndRealtime,
HlsSessionWorkAheadState.SeekAndRealtime => HlsSessionWorkAheadState.RealtimeFromZero,
_ => _workAheadState
};
_hasWrittenSegments = true; _hasWrittenSegments = true;
return true; return true;
} }
@ -353,12 +372,7 @@ public class HlsSessionWorker : IHlsSessionWorker
if (commandResult.ExitCode == 0) if (commandResult.ExitCode == 0)
{ {
_firstProcess = false; _state = NextState(_state, null);
if (_seekNextItem)
{
_firstProcess = true;
_seekNextItem = false;
}
_hasWrittenSegments = true; _hasWrittenSegments = true;

3
ErsatzTV.Application/Streaming/PlayoutItemProcessModel.cs

@ -5,4 +5,5 @@ namespace ErsatzTV.Application.Streaming;
public record PlayoutItemProcessModel( public record PlayoutItemProcessModel(
Command Process, Command Process,
Option<TimeSpan> MaybeDuration, Option<TimeSpan> MaybeDuration,
DateTimeOffset Until); DateTimeOffset Until,
bool IsComplete);

2
ErsatzTV.Application/Streaming/Queries/GetConcatProcessByChannelNumberHandler.cs

@ -37,6 +37,6 @@ public class GetConcatProcessByChannelNumberHandler : FFmpegProcessHandler<GetCo
request.Scheme, request.Scheme,
request.Host); request.Host);
return new PlayoutItemProcessModel(process, Option<TimeSpan>.None, DateTimeOffset.MaxValue); return new PlayoutItemProcessModel(process, Option<TimeSpan>.None, DateTimeOffset.MaxValue, true);
} }
} }

2
ErsatzTV.Application/Streaming/Queries/GetErrorProcessHandler.cs

@ -36,6 +36,6 @@ public class GetErrorProcessHandler : FFmpegProcessHandler<GetErrorProcess>
channel.FFmpegProfile.VaapiDevice, channel.FFmpegProfile.VaapiDevice,
Optional(channel.FFmpegProfile.QsvExtraHardwareFrames)); Optional(channel.FFmpegProfile.QsvExtraHardwareFrames));
return new PlayoutItemProcessModel(process, request.MaybeDuration, request.Until); return new PlayoutItemProcessModel(process, request.MaybeDuration, request.Until, true);
} }
} }

10
ErsatzTV.Application/Streaming/Queries/GetPlayoutItemProcessByChannelNumberHandler.cs

@ -178,11 +178,13 @@ public class GetPlayoutItemProcessByChannelNumberHandler : FFmpegProcessHandler<
TimeSpan outPoint = playoutItemWithPath.PlayoutItem.OutPoint; TimeSpan outPoint = playoutItemWithPath.PlayoutItem.OutPoint;
DateTimeOffset effectiveNow = request.StartAtZero ? start : now; DateTimeOffset effectiveNow = request.StartAtZero ? start : now;
TimeSpan duration = finish - effectiveNow; TimeSpan duration = finish - effectiveNow;
var isComplete = true;
if (!request.HlsRealtime && duration > TimeSpan.FromMinutes(2)) if (!request.HlsRealtime && duration > TimeSpan.FromMinutes(2))
{ {
finish = effectiveNow + TimeSpan.FromMinutes(2); finish = effectiveNow + TimeSpan.FromMinutes(2);
outPoint = finish - start + TimeSpan.FromMinutes(2); outPoint = finish - start + TimeSpan.FromMinutes(2);
isComplete = false;
} }
Command process = await _ffmpegProcessService.ForPlayoutItem( Command process = await _ffmpegProcessService.ForPlayoutItem(
@ -216,7 +218,7 @@ public class GetPlayoutItemProcessByChannelNumberHandler : FFmpegProcessHandler<
playoutItemWithPath.PlayoutItem.DisableWatermarks, playoutItemWithPath.PlayoutItem.DisableWatermarks,
_ => { }); _ => { });
var result = new PlayoutItemProcessModel(process, duration, finish); var result = new PlayoutItemProcessModel(process, duration, finish, isComplete);
return Right<BaseError, PlayoutItemProcessModel>(result); return Right<BaseError, PlayoutItemProcessModel>(result);
} }
@ -253,7 +255,7 @@ public class GetPlayoutItemProcessByChannelNumberHandler : FFmpegProcessHandler<
channel.FFmpegProfile.VaapiDevice, channel.FFmpegProfile.VaapiDevice,
Optional(channel.FFmpegProfile.QsvExtraHardwareFrames)); Optional(channel.FFmpegProfile.QsvExtraHardwareFrames));
return new PlayoutItemProcessModel(offlineProcess, maybeDuration, finish); return new PlayoutItemProcessModel(offlineProcess, maybeDuration, finish, true);
case PlayoutItemDoesNotExistOnDisk: case PlayoutItemDoesNotExistOnDisk:
Command doesNotExistProcess = await _ffmpegProcessService.ForError( Command doesNotExistProcess = await _ffmpegProcessService.ForError(
ffmpegPath, ffmpegPath,
@ -266,7 +268,7 @@ public class GetPlayoutItemProcessByChannelNumberHandler : FFmpegProcessHandler<
channel.FFmpegProfile.VaapiDevice, channel.FFmpegProfile.VaapiDevice,
Optional(channel.FFmpegProfile.QsvExtraHardwareFrames)); Optional(channel.FFmpegProfile.QsvExtraHardwareFrames));
return new PlayoutItemProcessModel(doesNotExistProcess, maybeDuration, finish); return new PlayoutItemProcessModel(doesNotExistProcess, maybeDuration, finish, true);
default: default:
Command errorProcess = await _ffmpegProcessService.ForError( Command errorProcess = await _ffmpegProcessService.ForError(
ffmpegPath, ffmpegPath,
@ -279,7 +281,7 @@ public class GetPlayoutItemProcessByChannelNumberHandler : FFmpegProcessHandler<
channel.FFmpegProfile.VaapiDevice, channel.FFmpegProfile.VaapiDevice,
Optional(channel.FFmpegProfile.QsvExtraHardwareFrames)); Optional(channel.FFmpegProfile.QsvExtraHardwareFrames));
return new PlayoutItemProcessModel(errorProcess, maybeDuration, finish); return new PlayoutItemProcessModel(errorProcess, maybeDuration, finish, true);
} }
} }

2
ErsatzTV.Application/Streaming/Queries/GetWrappedProcessByChannelNumberHandler.cs

@ -37,6 +37,6 @@ public class GetWrappedProcessByChannelNumberHandler : FFmpegProcessHandler<GetW
request.Scheme, request.Scheme,
request.Host); request.Host);
return new PlayoutItemProcessModel(process, Option<TimeSpan>.None, DateTimeOffset.MaxValue); return new PlayoutItemProcessModel(process, Option<TimeSpan>.None, DateTimeOffset.MaxValue, true);
} }
} }

Loading…
Cancel
Save