Browse Source

fix hls session worker lifetime (#1414)

pull/1415/head
Jason Dove 2 years ago committed by GitHub
parent
commit
82de3136cd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 42
      ErsatzTV.Application/Streaming/Commands/StartFFmpegSessionHandler.cs
  2. 57
      ErsatzTV.Application/Streaming/HlsSessionWorker.cs
  3. 1
      ErsatzTV/Startup.cs

42
ErsatzTV.Application/Streaming/Commands/StartFFmpegSessionHandler.cs

@ -1,5 +1,7 @@
using System.Diagnostics; using System.Diagnostics;
using System.Threading.Channels; using System.Threading.Channels;
using Bugsnag;
using ErsatzTV.Application.Channels;
using ErsatzTV.Application.Maintenance; using ErsatzTV.Application.Maintenance;
using ErsatzTV.Core; using ErsatzTV.Core;
using ErsatzTV.Core.Domain; using ErsatzTV.Core.Domain;
@ -17,22 +19,34 @@ public class StartFFmpegSessionHandler : IRequestHandler<StartFFmpegSession, Eit
{ {
private readonly IConfigElementRepository _configElementRepository; private readonly IConfigElementRepository _configElementRepository;
private readonly IFFmpegSegmenterService _ffmpegSegmenterService; private readonly IFFmpegSegmenterService _ffmpegSegmenterService;
private readonly IHlsPlaylistFilter _hlsPlaylistFilter;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly IMediator _mediator;
private readonly IClient _client;
private readonly ILocalFileSystem _localFileSystem; private readonly ILocalFileSystem _localFileSystem;
private readonly ILogger<StartFFmpegSessionHandler> _logger; private readonly ILogger<StartFFmpegSessionHandler> _logger;
private readonly IServiceScopeFactory _serviceScopeFactory; private readonly ILogger<HlsSessionWorker> _sessionWorkerLogger;
private readonly ChannelWriter<IBackgroundServiceRequest> _workerChannel; private readonly ChannelWriter<IBackgroundServiceRequest> _workerChannel;
public StartFFmpegSessionHandler( public StartFFmpegSessionHandler(
IHlsPlaylistFilter hlsPlaylistFilter,
IServiceScopeFactory serviceScopeFactory,
IMediator mediator,
IClient client,
ILocalFileSystem localFileSystem, ILocalFileSystem localFileSystem,
ILogger<StartFFmpegSessionHandler> logger, ILogger<StartFFmpegSessionHandler> logger,
IServiceScopeFactory serviceScopeFactory, ILogger<HlsSessionWorker> sessionWorkerLogger,
IFFmpegSegmenterService ffmpegSegmenterService, IFFmpegSegmenterService ffmpegSegmenterService,
IConfigElementRepository configElementRepository, IConfigElementRepository configElementRepository,
ChannelWriter<IBackgroundServiceRequest> workerChannel) ChannelWriter<IBackgroundServiceRequest> workerChannel)
{ {
_hlsPlaylistFilter = hlsPlaylistFilter;
_serviceScopeFactory = serviceScopeFactory;
_mediator = mediator;
_client = client;
_localFileSystem = localFileSystem; _localFileSystem = localFileSystem;
_logger = logger; _logger = logger;
_serviceScopeFactory = serviceScopeFactory; _sessionWorkerLogger = sessionWorkerLogger;
_ffmpegSegmenterService = ffmpegSegmenterService; _ffmpegSegmenterService = ffmpegSegmenterService;
_configElementRepository = configElementRepository; _configElementRepository = configElementRepository;
_workerChannel = workerChannel; _workerChannel = workerChannel;
@ -52,8 +66,18 @@ public class StartFFmpegSessionHandler : IRequestHandler<StartFFmpegSession, Eit
.GetValue<int>(ConfigElementKey.FFmpegSegmenterTimeout) .GetValue<int>(ConfigElementKey.FFmpegSegmenterTimeout)
.Map(maybeTimeout => maybeTimeout.Match(i => TimeSpan.FromSeconds(i), () => TimeSpan.FromMinutes(1))); .Map(maybeTimeout => maybeTimeout.Match(i => TimeSpan.FromSeconds(i), () => TimeSpan.FromMinutes(1)));
using IServiceScope scope = _serviceScopeFactory.CreateScope(); Option<int> targetFramerate = await _mediator.Send(
HlsSessionWorker worker = scope.ServiceProvider.GetRequiredService<HlsSessionWorker>(); new GetChannelFramerate(request.ChannelNumber),
cancellationToken);
var worker = new HlsSessionWorker(
_serviceScopeFactory,
_client,
_hlsPlaylistFilter,
_configElementRepository,
_localFileSystem,
_sessionWorkerLogger,
targetFramerate);
_ffmpegSegmenterService.SessionWorkers.AddOrUpdate(request.ChannelNumber, _ => worker, (_, _) => worker); _ffmpegSegmenterService.SessionWorkers.AddOrUpdate(request.ChannelNumber, _ => worker, (_, _) => worker);
// fire and forget worker // fire and forget worker
@ -63,7 +87,9 @@ public class StartFFmpegSessionHandler : IRequestHandler<StartFFmpegSession, Eit
{ {
_ffmpegSegmenterService.SessionWorkers.TryRemove( _ffmpegSegmenterService.SessionWorkers.TryRemove(
request.ChannelNumber, request.ChannelNumber,
out IHlsSessionWorker _); out IHlsSessionWorker inactiveWorker);
inactiveWorker?.Dispose();
_workerChannel.TryWrite(new ReleaseMemory(false)); _workerChannel.TryWrite(new ReleaseMemory(false));
}, },
@ -74,8 +100,8 @@ public class StartFFmpegSessionHandler : IRequestHandler<StartFFmpegSession, Eit
request.ChannelNumber, request.ChannelNumber,
"live.m3u8"); "live.m3u8");
IConfigElementRepository repo = scope.ServiceProvider.GetRequiredService<IConfigElementRepository>(); int initialSegmentCount = await _configElementRepository
int initialSegmentCount = await repo.GetValue<int>(ConfigElementKey.FFmpegInitialSegmentCount) .GetValue<int>(ConfigElementKey.FFmpegInitialSegmentCount)
.Map(maybeCount => maybeCount.Match(identity, () => 1)); .Map(maybeCount => maybeCount.Match(identity, () => 1));
await WaitForPlaylistSegments(playlistFileName, initialSegmentCount, worker, cancellationToken); await WaitForPlaylistSegments(playlistFileName, initialSegmentCount, worker, cancellationToken);

57
ErsatzTV.Application/Streaming/HlsSessionWorker.cs

@ -5,7 +5,6 @@ using System.Timers;
using Bugsnag; using Bugsnag;
using CliWrap; using CliWrap;
using CliWrap.Buffered; using CliWrap.Buffered;
using ErsatzTV.Application.Channels;
using ErsatzTV.Core; using ErsatzTV.Core;
using ErsatzTV.Core.Domain; using ErsatzTV.Core.Domain;
using ErsatzTV.Core.FFmpeg; using ErsatzTV.Core.FFmpeg;
@ -22,10 +21,13 @@ public class HlsSessionWorker : IHlsSessionWorker
{ {
private static readonly SemaphoreSlim Slim = new(1, 1); private static readonly SemaphoreSlim Slim = new(1, 1);
private static int _workAheadCount; private static int _workAheadCount;
private readonly IMediator _mediator;
private readonly IClient _client;
private readonly IHlsPlaylistFilter _hlsPlaylistFilter; private readonly IHlsPlaylistFilter _hlsPlaylistFilter;
private readonly IConfigElementRepository _configElementRepository;
private readonly ILocalFileSystem _localFileSystem; private readonly ILocalFileSystem _localFileSystem;
private readonly ILogger<HlsSessionWorker> _logger; private readonly ILogger<HlsSessionWorker> _logger;
private readonly IServiceScopeFactory _serviceScopeFactory; private readonly Option<int> _targetFramerate;
private readonly object _sync = new(); private readonly object _sync = new();
private string _channelNumber; private string _channelNumber;
private bool _disposedValue; private bool _disposedValue;
@ -33,20 +35,27 @@ public class HlsSessionWorker : IHlsSessionWorker
private DateTimeOffset _lastAccess; private DateTimeOffset _lastAccess;
private DateTimeOffset _lastDelete = DateTimeOffset.MinValue; private DateTimeOffset _lastDelete = DateTimeOffset.MinValue;
private HlsSessionState _state; private HlsSessionState _state;
private Option<int> _targetFramerate;
private Timer _timer; private Timer _timer;
private DateTimeOffset _transcodedUntil; private DateTimeOffset _transcodedUntil;
private IServiceScope _serviceScope;
public HlsSessionWorker( public HlsSessionWorker(
IHlsPlaylistFilter hlsPlaylistFilter,
IServiceScopeFactory serviceScopeFactory, IServiceScopeFactory serviceScopeFactory,
IClient client,
IHlsPlaylistFilter hlsPlaylistFilter,
IConfigElementRepository configElementRepository,
ILocalFileSystem localFileSystem, ILocalFileSystem localFileSystem,
ILogger<HlsSessionWorker> logger) ILogger<HlsSessionWorker> logger,
Option<int> targetFramerate)
{ {
_serviceScope = serviceScopeFactory.CreateScope();
_mediator = _serviceScope.ServiceProvider.GetRequiredService<IMediator>();
_client = client;
_hlsPlaylistFilter = hlsPlaylistFilter; _hlsPlaylistFilter = hlsPlaylistFilter;
_serviceScopeFactory = serviceScopeFactory; _configElementRepository = configElementRepository;
_localFileSystem = localFileSystem; _localFileSystem = localFileSystem;
_logger = logger; _logger = logger;
_targetFramerate = targetFramerate;
} }
public DateTimeOffset PlaylistStart { get; private set; } public DateTimeOffset PlaylistStart { get; private set; }
@ -133,17 +142,11 @@ public class HlsSessionWorker : IHlsSessionWorker
_logger.LogInformation("Starting HLS session for channel {Channel}", channelNumber); _logger.LogInformation("Starting HLS session for channel {Channel}", channelNumber);
using IServiceScope scope = _serviceScopeFactory.CreateScope(); if (_localFileSystem.ListFiles(Path.Combine(FileSystemLayout.TranscodeFolder, _channelNumber)).Any())
IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
ILocalFileSystem localFileSystem = scope.ServiceProvider.GetRequiredService<ILocalFileSystem>();
if (localFileSystem.ListFiles(Path.Combine(FileSystemLayout.TranscodeFolder, _channelNumber)).Any())
{ {
_logger.LogError("Transcode folder is NOT empty!"); _logger.LogError("Transcode folder is NOT empty!");
} }
_targetFramerate = await mediator.Send(
new GetChannelFramerate(channelNumber),
cancellationToken);
Touch(); Touch();
_transcodedUntil = DateTimeOffset.Now; _transcodedUntil = DateTimeOffset.Now;
@ -210,6 +213,10 @@ public class HlsSessionWorker : IHlsSessionWorker
if (disposing) if (disposing)
{ {
_timer.Dispose(); _timer.Dispose();
_timer = null;
_serviceScope.Dispose();
_serviceScope = null;
} }
_disposedValue = true; _disposedValue = true;
@ -256,8 +263,6 @@ public class HlsSessionWorker : IHlsSessionWorker
bool realtime, bool realtime,
CancellationToken cancellationToken) CancellationToken cancellationToken)
{ {
using IServiceScope scope = _serviceScopeFactory.CreateScope();
try try
{ {
if (!realtime) if (!realtime)
@ -289,9 +294,7 @@ public class HlsSessionWorker : IHlsSessionWorker
} }
} }
IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>(); long ptsOffset = await GetPtsOffset(_channelNumber, cancellationToken);
long ptsOffset = await GetPtsOffset(mediator, _channelNumber, cancellationToken);
// _logger.LogInformation("PTS offset: {PtsOffset}", ptsOffset); // _logger.LogInformation("PTS offset: {PtsOffset}", ptsOffset);
_logger.LogInformation("HLS session state: {State}", _state); _logger.LogInformation("HLS session state: {State}", _state);
@ -312,7 +315,7 @@ public class HlsSessionWorker : IHlsSessionWorker
// _logger.LogInformation("Request {@Request}", request); // _logger.LogInformation("Request {@Request}", request);
Either<BaseError, PlayoutItemProcessModel> result = await mediator.Send(request, cancellationToken); Either<BaseError, PlayoutItemProcessModel> result = await _mediator.Send(request, cancellationToken);
// _logger.LogInformation("Result {Result}", result.ToString()); // _logger.LogInformation("Result {Result}", result.ToString());
@ -365,7 +368,7 @@ public class HlsSessionWorker : IHlsSessionWorker
commandResult.ExitCode, commandResult.ExitCode,
commandResult.StandardError); commandResult.StandardError);
Either<BaseError, PlayoutItemProcessModel> maybeOfflineProcess = await mediator.Send( Either<BaseError, PlayoutItemProcessModel> maybeOfflineProcess = await _mediator.Send(
new GetErrorProcess( new GetErrorProcess(
_channelNumber, _channelNumber,
"segmenter", "segmenter",
@ -414,8 +417,7 @@ public class HlsSessionWorker : IHlsSessionWorker
try try
{ {
IClient client = scope.ServiceProvider.GetRequiredService<IClient>(); _client.Notify(ex);
client.Notify(ex);
} }
catch (Exception) catch (Exception)
{ {
@ -494,10 +496,7 @@ public class HlsSessionWorker : IHlsSessionWorker
} }
} }
private async Task<long> GetPtsOffset( private async Task<long> GetPtsOffset(string channelNumber, CancellationToken cancellationToken)
IMediator mediator,
string channelNumber,
CancellationToken cancellationToken)
{ {
await Slim.WaitAsync(cancellationToken); await Slim.WaitAsync(cancellationToken);
try try
@ -510,7 +509,7 @@ public class HlsSessionWorker : IHlsSessionWorker
return result; return result;
} }
Either<BaseError, PtsAndDuration> queryResult = await mediator.Send( Either<BaseError, PtsAndDuration> queryResult = await _mediator.Send(
new GetLastPtsDuration(channelNumber), new GetLastPtsDuration(channelNumber),
cancellationToken); cancellationToken);
@ -534,9 +533,7 @@ public class HlsSessionWorker : IHlsSessionWorker
private async Task<int> GetWorkAheadLimit() private async Task<int> GetWorkAheadLimit()
{ {
using IServiceScope scope = _serviceScopeFactory.CreateScope(); return await _configElementRepository.GetValue<int>(ConfigElementKey.FFmpegWorkAheadSegmenters)
IConfigElementRepository repo = scope.ServiceProvider.GetRequiredService<IConfigElementRepository>();
return await repo.GetValue<int>(ConfigElementKey.FFmpegWorkAheadSegmenters)
.Map(maybeCount => maybeCount.Match(identity, () => 1)); .Map(maybeCount => maybeCount.Match(identity, () => 1));
} }

1
ErsatzTV/Startup.cs

@ -660,7 +660,6 @@ public class Startup
services.AddScoped<ISongVideoGenerator, SongVideoGenerator>(); services.AddScoped<ISongVideoGenerator, SongVideoGenerator>();
services.AddScoped<IMusicVideoCreditsGenerator, MusicVideoCreditsGenerator>(); services.AddScoped<IMusicVideoCreditsGenerator, MusicVideoCreditsGenerator>();
services.AddScoped<HlsSessionWorker>();
services.AddScoped<IGitHubApiClient, GitHubApiClient>(); services.AddScoped<IGitHubApiClient, GitHubApiClient>();
services.AddScoped<IHtmlSanitizer, HtmlSanitizer>( services.AddScoped<IHtmlSanitizer, HtmlSanitizer>(
_ => _ =>

Loading…
Cancel
Save