using System.IO.Abstractions; using System.Threading.Channels; using Bugsnag; using ErsatzTV.Application.Channels; using ErsatzTV.Application.Graphics; using ErsatzTV.Application.Maintenance; using ErsatzTV.Core; using ErsatzTV.Core.Domain; using ErsatzTV.Core.Errors; using ErsatzTV.Core.FFmpeg; using ErsatzTV.Core.Interfaces.FFmpeg; using ErsatzTV.Core.Interfaces.Metadata; using ErsatzTV.Core.Interfaces.Repositories; using ErsatzTV.Core.Interfaces.Streaming; using ErsatzTV.FFmpeg; using ErsatzTV.FFmpeg.OutputFormat; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; namespace ErsatzTV.Application.Streaming; public class StartFFmpegSessionHandler : IRequestHandler> { private readonly IClient _client; private readonly IFileSystem _fileSystem; private readonly IConfigElementRepository _configElementRepository; private readonly IFFmpegSegmenterService _ffmpegSegmenterService; private readonly IGraphicsEngine _graphicsEngine; private readonly IHlsPlaylistFilter _hlsPlaylistFilter; private readonly IHlsInitSegmentCache _hlsInitSegmentCache; private readonly IHostApplicationLifetime _hostApplicationLifetime; private readonly ILocalFileSystem _localFileSystem; private readonly ILogger _logger; private readonly IMediator _mediator; private readonly IServiceScopeFactory _serviceScopeFactory; private readonly ILogger _sessionWorkerLogger; private readonly ChannelWriter _workerChannel; public StartFFmpegSessionHandler( IHlsPlaylistFilter hlsPlaylistFilter, IHlsInitSegmentCache hlsInitSegmentCache, IServiceScopeFactory serviceScopeFactory, IMediator mediator, IClient client, IFileSystem fileSystem, ILocalFileSystem localFileSystem, ILogger logger, ILogger sessionWorkerLogger, IFFmpegSegmenterService ffmpegSegmenterService, IConfigElementRepository configElementRepository, IGraphicsEngine graphicsEngine, IHostApplicationLifetime hostApplicationLifetime, ChannelWriter workerChannel) { _hlsPlaylistFilter = hlsPlaylistFilter; _hlsInitSegmentCache = hlsInitSegmentCache; _serviceScopeFactory = serviceScopeFactory; _mediator = mediator; _client = client; _fileSystem = fileSystem; _localFileSystem = localFileSystem; _logger = logger; _sessionWorkerLogger = sessionWorkerLogger; _ffmpegSegmenterService = ffmpegSegmenterService; _configElementRepository = configElementRepository; _graphicsEngine = graphicsEngine; _hostApplicationLifetime = hostApplicationLifetime; _workerChannel = workerChannel; } public Task> Handle(StartFFmpegSession request, CancellationToken cancellationToken) => Validate(request) .MapT(_ => StartProcess(request, cancellationToken)) // this weirdness is needed to maintain the error type (.ToEitherAsync() just gives BaseError) #pragma warning disable VSTHRD103 .Bind(v => v.ToEither().MapLeft(seq => seq.Head()).MapAsync, Unit>(identity)); #pragma warning restore VSTHRD103 private async Task StartProcess(StartFFmpegSession request, CancellationToken cancellationToken) { Option idleTimeout = await _configElementRepository .GetValue(ConfigElementKey.FFmpegSegmenterTimeout, cancellationToken) .Map(maybeTimeout => maybeTimeout.Match(i => TimeSpan.FromSeconds(i), () => TimeSpan.FromMinutes(1))); Option targetFramerate = await _mediator.Send( new GetChannelFramerate(request.ChannelNumber), cancellationToken); // disable idle timeout when configured to keep running Option channel = await _mediator.Send(new GetChannelByNumber(request.ChannelNumber), cancellationToken); if (await channel.Map(c => c.IdleBehavior is ChannelIdleBehavior.KeepRunning).IfNoneAsync(false)) { idleTimeout = Option.None; } await _mediator.Send(new RefreshGraphicsElements(), cancellationToken); HlsSessionWorker worker = GetSessionWorker(request, targetFramerate); _ffmpegSegmenterService.AddOrUpdateWorker(request.ChannelNumber, worker); // fire and forget worker _ = worker.Run(request.ChannelNumber, idleTimeout, _hostApplicationLifetime.ApplicationStopping) .ContinueWith( _ => { _ffmpegSegmenterService.RemoveWorker(request.ChannelNumber, out IHlsSessionWorker inactiveWorker); inactiveWorker?.Dispose(); _workerChannel.TryWrite(new ReleaseMemory(false)); }, TaskScheduler.Default); int initialSegmentCount = await _configElementRepository .GetValue(ConfigElementKey.FFmpegInitialSegmentCount, cancellationToken) .Map(maybeCount => maybeCount.Match(identity, () => 1)); await worker.WaitForPlaylistSegments(initialSegmentCount, cancellationToken); return Unit.Default; } private HlsSessionWorker GetSessionWorker(StartFFmpegSession request, Option targetFramerate) => request.Mode switch { _ => new HlsSessionWorker( _serviceScopeFactory, _graphicsEngine, _client, OutputFormatKind.Hls, _hlsPlaylistFilter, _hlsInitSegmentCache, _configElementRepository, _fileSystem, _localFileSystem, _sessionWorkerLogger, targetFramerate) }; private Task> Validate(StartFFmpegSession request) => SessionMustBeInactive(request) .BindT(_ => FolderMustBeEmpty(request)); private Task> SessionMustBeInactive(StartFFmpegSession request) { var result = Optional(_ffmpegSegmenterService.TryAddWorker(request.ChannelNumber, null)) .Where(success => success) .Map(_ => Unit.Default) .ToValidation(new ChannelSessionAlreadyActive()); if (result.IsFail && _ffmpegSegmenterService.TryGetWorker( request.ChannelNumber, out IHlsSessionWorker worker)) { worker?.Touch(Option.None); } return result.AsTask(); } private Task> FolderMustBeEmpty(StartFFmpegSession request) { string folder = Path.Combine(FileSystemLayout.TranscodeFolder, request.ChannelNumber); _logger.LogDebug("Preparing transcode folder {Folder}", folder); _localFileSystem.EnsureFolderExists(folder); _localFileSystem.EmptyFolder(folder); return Task.FromResult>(Unit.Default); } }