From 82de3136cda1f59b72a69cb9e0ff688a644adb75 Mon Sep 17 00:00:00 2001 From: Jason Dove <1695733+jasongdove@users.noreply.github.com> Date: Mon, 4 Sep 2023 08:30:01 -0500 Subject: [PATCH] fix hls session worker lifetime (#1414) --- .../Commands/StartFFmpegSessionHandler.cs | 42 +++++++++++--- .../Streaming/HlsSessionWorker.cs | 57 +++++++++---------- ErsatzTV/Startup.cs | 1 - 3 files changed, 61 insertions(+), 39 deletions(-) diff --git a/ErsatzTV.Application/Streaming/Commands/StartFFmpegSessionHandler.cs b/ErsatzTV.Application/Streaming/Commands/StartFFmpegSessionHandler.cs index 1778790b4..721c68bd7 100644 --- a/ErsatzTV.Application/Streaming/Commands/StartFFmpegSessionHandler.cs +++ b/ErsatzTV.Application/Streaming/Commands/StartFFmpegSessionHandler.cs @@ -1,5 +1,7 @@ using System.Diagnostics; using System.Threading.Channels; +using Bugsnag; +using ErsatzTV.Application.Channels; using ErsatzTV.Application.Maintenance; using ErsatzTV.Core; using ErsatzTV.Core.Domain; @@ -17,22 +19,34 @@ public class StartFFmpegSessionHandler : IRequestHandler _logger; - private readonly IServiceScopeFactory _serviceScopeFactory; + private readonly ILogger _sessionWorkerLogger; private readonly ChannelWriter _workerChannel; public StartFFmpegSessionHandler( + IHlsPlaylistFilter hlsPlaylistFilter, + IServiceScopeFactory serviceScopeFactory, + IMediator mediator, + IClient client, ILocalFileSystem localFileSystem, ILogger logger, - IServiceScopeFactory serviceScopeFactory, + ILogger sessionWorkerLogger, IFFmpegSegmenterService ffmpegSegmenterService, IConfigElementRepository configElementRepository, ChannelWriter workerChannel) { + _hlsPlaylistFilter = hlsPlaylistFilter; + _serviceScopeFactory = serviceScopeFactory; + _mediator = mediator; + _client = client; _localFileSystem = localFileSystem; _logger = logger; - _serviceScopeFactory = serviceScopeFactory; + _sessionWorkerLogger = sessionWorkerLogger; _ffmpegSegmenterService = ffmpegSegmenterService; _configElementRepository = configElementRepository; _workerChannel = workerChannel; @@ -52,8 +66,18 @@ public class StartFFmpegSessionHandler : IRequestHandler(ConfigElementKey.FFmpegSegmenterTimeout) .Map(maybeTimeout => maybeTimeout.Match(i => TimeSpan.FromSeconds(i), () => TimeSpan.FromMinutes(1))); - using IServiceScope scope = _serviceScopeFactory.CreateScope(); - HlsSessionWorker worker = scope.ServiceProvider.GetRequiredService(); + Option targetFramerate = await _mediator.Send( + new GetChannelFramerate(request.ChannelNumber), + cancellationToken); + + var worker = new HlsSessionWorker( + _serviceScopeFactory, + _client, + _hlsPlaylistFilter, + _configElementRepository, + _localFileSystem, + _sessionWorkerLogger, + targetFramerate); _ffmpegSegmenterService.SessionWorkers.AddOrUpdate(request.ChannelNumber, _ => worker, (_, _) => worker); // fire and forget worker @@ -63,7 +87,9 @@ public class StartFFmpegSessionHandler : IRequestHandler(); - int initialSegmentCount = await repo.GetValue(ConfigElementKey.FFmpegInitialSegmentCount) + int initialSegmentCount = await _configElementRepository + .GetValue(ConfigElementKey.FFmpegInitialSegmentCount) .Map(maybeCount => maybeCount.Match(identity, () => 1)); await WaitForPlaylistSegments(playlistFileName, initialSegmentCount, worker, cancellationToken); diff --git a/ErsatzTV.Application/Streaming/HlsSessionWorker.cs b/ErsatzTV.Application/Streaming/HlsSessionWorker.cs index 85d1a02e1..38563e2ba 100644 --- a/ErsatzTV.Application/Streaming/HlsSessionWorker.cs +++ b/ErsatzTV.Application/Streaming/HlsSessionWorker.cs @@ -5,7 +5,6 @@ using System.Timers; using Bugsnag; using CliWrap; using CliWrap.Buffered; -using ErsatzTV.Application.Channels; using ErsatzTV.Core; using ErsatzTV.Core.Domain; using ErsatzTV.Core.FFmpeg; @@ -22,10 +21,13 @@ public class HlsSessionWorker : IHlsSessionWorker { private static readonly SemaphoreSlim Slim = new(1, 1); private static int _workAheadCount; + private readonly IMediator _mediator; + private readonly IClient _client; private readonly IHlsPlaylistFilter _hlsPlaylistFilter; + private readonly IConfigElementRepository _configElementRepository; private readonly ILocalFileSystem _localFileSystem; private readonly ILogger _logger; - private readonly IServiceScopeFactory _serviceScopeFactory; + private readonly Option _targetFramerate; private readonly object _sync = new(); private string _channelNumber; private bool _disposedValue; @@ -33,20 +35,27 @@ public class HlsSessionWorker : IHlsSessionWorker private DateTimeOffset _lastAccess; private DateTimeOffset _lastDelete = DateTimeOffset.MinValue; private HlsSessionState _state; - private Option _targetFramerate; private Timer _timer; private DateTimeOffset _transcodedUntil; + private IServiceScope _serviceScope; public HlsSessionWorker( - IHlsPlaylistFilter hlsPlaylistFilter, IServiceScopeFactory serviceScopeFactory, + IClient client, + IHlsPlaylistFilter hlsPlaylistFilter, + IConfigElementRepository configElementRepository, ILocalFileSystem localFileSystem, - ILogger logger) + ILogger logger, + Option targetFramerate) { + _serviceScope = serviceScopeFactory.CreateScope(); + _mediator = _serviceScope.ServiceProvider.GetRequiredService(); + _client = client; _hlsPlaylistFilter = hlsPlaylistFilter; - _serviceScopeFactory = serviceScopeFactory; + _configElementRepository = configElementRepository; _localFileSystem = localFileSystem; _logger = logger; + _targetFramerate = targetFramerate; } public DateTimeOffset PlaylistStart { get; private set; } @@ -133,17 +142,11 @@ public class HlsSessionWorker : IHlsSessionWorker _logger.LogInformation("Starting HLS session for channel {Channel}", channelNumber); - using IServiceScope scope = _serviceScopeFactory.CreateScope(); - IMediator mediator = scope.ServiceProvider.GetRequiredService(); - ILocalFileSystem localFileSystem = scope.ServiceProvider.GetRequiredService(); - if (localFileSystem.ListFiles(Path.Combine(FileSystemLayout.TranscodeFolder, _channelNumber)).Any()) + if (_localFileSystem.ListFiles(Path.Combine(FileSystemLayout.TranscodeFolder, _channelNumber)).Any()) { _logger.LogError("Transcode folder is NOT empty!"); } - _targetFramerate = await mediator.Send( - new GetChannelFramerate(channelNumber), - cancellationToken); Touch(); _transcodedUntil = DateTimeOffset.Now; @@ -210,6 +213,10 @@ public class HlsSessionWorker : IHlsSessionWorker if (disposing) { _timer.Dispose(); + _timer = null; + + _serviceScope.Dispose(); + _serviceScope = null; } _disposedValue = true; @@ -256,8 +263,6 @@ public class HlsSessionWorker : IHlsSessionWorker bool realtime, CancellationToken cancellationToken) { - using IServiceScope scope = _serviceScopeFactory.CreateScope(); - try { if (!realtime) @@ -289,9 +294,7 @@ public class HlsSessionWorker : IHlsSessionWorker } } - IMediator mediator = scope.ServiceProvider.GetRequiredService(); - - long ptsOffset = await GetPtsOffset(mediator, _channelNumber, cancellationToken); + long ptsOffset = await GetPtsOffset(_channelNumber, cancellationToken); // _logger.LogInformation("PTS offset: {PtsOffset}", ptsOffset); _logger.LogInformation("HLS session state: {State}", _state); @@ -312,7 +315,7 @@ public class HlsSessionWorker : IHlsSessionWorker // _logger.LogInformation("Request {@Request}", request); - Either result = await mediator.Send(request, cancellationToken); + Either result = await _mediator.Send(request, cancellationToken); // _logger.LogInformation("Result {Result}", result.ToString()); @@ -365,7 +368,7 @@ public class HlsSessionWorker : IHlsSessionWorker commandResult.ExitCode, commandResult.StandardError); - Either maybeOfflineProcess = await mediator.Send( + Either maybeOfflineProcess = await _mediator.Send( new GetErrorProcess( _channelNumber, "segmenter", @@ -414,8 +417,7 @@ public class HlsSessionWorker : IHlsSessionWorker try { - IClient client = scope.ServiceProvider.GetRequiredService(); - client.Notify(ex); + _client.Notify(ex); } catch (Exception) { @@ -494,10 +496,7 @@ public class HlsSessionWorker : IHlsSessionWorker } } - private async Task GetPtsOffset( - IMediator mediator, - string channelNumber, - CancellationToken cancellationToken) + private async Task GetPtsOffset(string channelNumber, CancellationToken cancellationToken) { await Slim.WaitAsync(cancellationToken); try @@ -510,7 +509,7 @@ public class HlsSessionWorker : IHlsSessionWorker return result; } - Either queryResult = await mediator.Send( + Either queryResult = await _mediator.Send( new GetLastPtsDuration(channelNumber), cancellationToken); @@ -534,9 +533,7 @@ public class HlsSessionWorker : IHlsSessionWorker private async Task GetWorkAheadLimit() { - using IServiceScope scope = _serviceScopeFactory.CreateScope(); - IConfigElementRepository repo = scope.ServiceProvider.GetRequiredService(); - return await repo.GetValue(ConfigElementKey.FFmpegWorkAheadSegmenters) + return await _configElementRepository.GetValue(ConfigElementKey.FFmpegWorkAheadSegmenters) .Map(maybeCount => maybeCount.Match(identity, () => 1)); } diff --git a/ErsatzTV/Startup.cs b/ErsatzTV/Startup.cs index 95a835a33..7034345b9 100644 --- a/ErsatzTV/Startup.cs +++ b/ErsatzTV/Startup.cs @@ -660,7 +660,6 @@ public class Startup services.AddScoped(); services.AddScoped(); - services.AddScoped(); services.AddScoped(); services.AddScoped( _ =>