mirror of https://github.com/ErsatzTV/ErsatzTV.git
Browse Source
* hls segmenter wip * log message * close unused transcode sessions after 2 minutes * use frame rate for 2s keyframes in hls segmenter * add frame rate to media version * fix segmenter framerate calculation * automatically restart hls segmenter with next scheduled item * cleanup * update changelog * decrease segmenter start delaypull/401/head
40 changed files with 3936 additions and 33 deletions
@ -0,0 +1,6 @@
@@ -0,0 +1,6 @@
|
||||
namespace ErsatzTV.Application |
||||
{ |
||||
public interface IFFmpegWorkerRequest |
||||
{ |
||||
} |
||||
} |
||||
@ -0,0 +1,9 @@
@@ -0,0 +1,9 @@
|
||||
using ErsatzTV.Core; |
||||
using LanguageExt; |
||||
using MediatR; |
||||
using Unit = LanguageExt.Unit; |
||||
|
||||
namespace ErsatzTV.Application.Streaming.Commands |
||||
{ |
||||
public record CleanUpFFmpegSessions : IRequest<Either<BaseError, Unit>>, IFFmpegWorkerRequest; |
||||
} |
||||
@ -0,0 +1,27 @@
@@ -0,0 +1,27 @@
|
||||
using System.Threading; |
||||
using System.Threading.Channels; |
||||
using System.Threading.Tasks; |
||||
using ErsatzTV.Core; |
||||
using LanguageExt; |
||||
using MediatR; |
||||
using Unit = LanguageExt.Unit; |
||||
|
||||
namespace ErsatzTV.Application.Streaming.Commands |
||||
{ |
||||
public class CleanUpFFmpegSessionsHandler : IRequestHandler<CleanUpFFmpegSessions, Either<BaseError, Unit>> |
||||
{ |
||||
private readonly ChannelWriter<IFFmpegWorkerRequest> _channel; |
||||
|
||||
public CleanUpFFmpegSessionsHandler(ChannelWriter<IFFmpegWorkerRequest> channel) |
||||
{ |
||||
_channel = channel; |
||||
} |
||||
|
||||
public async Task<Either<BaseError, Unit>> |
||||
Handle(CleanUpFFmpegSessions request, CancellationToken cancellationToken) |
||||
{ |
||||
await _channel.WriteAsync(request, cancellationToken); |
||||
return Unit.Default; |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,9 @@
@@ -0,0 +1,9 @@
|
||||
using ErsatzTV.Core; |
||||
using LanguageExt; |
||||
|
||||
namespace ErsatzTV.Application.Streaming.Commands |
||||
{ |
||||
public record StartFFmpegSession(string ChannelNumber, bool StartAtZero) : |
||||
MediatR.IRequest<Either<BaseError, Unit>>, |
||||
IFFmpegWorkerRequest; |
||||
} |
||||
@ -0,0 +1,67 @@
@@ -0,0 +1,67 @@
|
||||
using System; |
||||
using System.IO; |
||||
using System.Threading; |
||||
using System.Threading.Channels; |
||||
using System.Threading.Tasks; |
||||
using ErsatzTV.Core; |
||||
using ErsatzTV.Core.Errors; |
||||
using ErsatzTV.Core.Interfaces.FFmpeg; |
||||
using ErsatzTV.Core.Interfaces.Metadata; |
||||
using LanguageExt; |
||||
using static LanguageExt.Prelude; |
||||
|
||||
namespace ErsatzTV.Application.Streaming.Commands |
||||
{ |
||||
public class StartFFmpegSessionHandler : MediatR.IRequestHandler<StartFFmpegSession, Either<BaseError, Unit>> |
||||
{ |
||||
private readonly ChannelWriter<IFFmpegWorkerRequest> _channel; |
||||
private readonly IFFmpegSegmenterService _ffmpegSegmenterService; |
||||
private readonly ILocalFileSystem _localFileSystem; |
||||
|
||||
public StartFFmpegSessionHandler( |
||||
IFFmpegSegmenterService ffmpegSegmenterService, |
||||
ILocalFileSystem localFileSystem, |
||||
ChannelWriter<IFFmpegWorkerRequest> channel) |
||||
{ |
||||
_ffmpegSegmenterService = ffmpegSegmenterService; |
||||
_localFileSystem = localFileSystem; |
||||
_channel = channel; |
||||
} |
||||
|
||||
public Task<Either<BaseError, Unit>> Handle(StartFFmpegSession request, CancellationToken cancellationToken) => |
||||
Validate(request) |
||||
.MapT(_ => StartProcess(request)) |
||||
// this weirdness is needed to maintain the error type (.ToEitherAsync() just gives BaseError)
|
||||
.Bind(v => v.ToEither().MapLeft(seq => seq.Head()).MapAsync<BaseError, Task<Unit>, Unit>(identity)); |
||||
|
||||
private async Task<Unit> StartProcess(StartFFmpegSession request) |
||||
{ |
||||
await _channel.WriteAsync(request); |
||||
|
||||
// TODO: find some other way to let ffmpeg get ahead
|
||||
await Task.Delay(TimeSpan.FromSeconds(5)); |
||||
|
||||
return Unit.Default; |
||||
} |
||||
|
||||
private Task<Validation<BaseError, Unit>> Validate(StartFFmpegSession request) => |
||||
ProcessMustNotExist(request) |
||||
.BindT(_ => FolderMustBeEmpty(request)); |
||||
|
||||
private Task<Validation<BaseError, Unit>> ProcessMustNotExist(StartFFmpegSession request) => |
||||
Optional(_ffmpegSegmenterService.ProcessExistsForChannel(request.ChannelNumber)) |
||||
.Filter(containsKey => containsKey == false) |
||||
.Map(_ => Unit.Default) |
||||
.ToValidation<BaseError>(new ChannelHasProcess()) |
||||
.AsTask(); |
||||
|
||||
private Task<Validation<BaseError, Unit>> FolderMustBeEmpty(StartFFmpegSession request) |
||||
{ |
||||
string folder = Path.Combine(FileSystemLayout.TranscodeFolder, request.ChannelNumber); |
||||
_localFileSystem.EnsureFolderExists(folder); |
||||
_localFileSystem.EmptyFolder(folder); |
||||
|
||||
return Task.FromResult<Validation<BaseError, Unit>>(Unit.Default); |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,9 @@
@@ -0,0 +1,9 @@
|
||||
using ErsatzTV.Core; |
||||
using LanguageExt; |
||||
using MediatR; |
||||
using Unit = LanguageExt.Unit; |
||||
|
||||
namespace ErsatzTV.Application.Streaming.Commands |
||||
{ |
||||
public record TouchFFmpegSession(string Path) : IRequest<Either<BaseError, Unit>>, IFFmpegWorkerRequest; |
||||
} |
||||
@ -0,0 +1,9 @@
@@ -0,0 +1,9 @@
|
||||
namespace ErsatzTV.Core.Errors |
||||
{ |
||||
public class ChannelHasProcess : BaseError |
||||
{ |
||||
public ChannelHasProcess() : base("Channel already has ffmpeg process") |
||||
{ |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,96 @@
@@ -0,0 +1,96 @@
|
||||
using System; |
||||
using System.Collections.Concurrent; |
||||
using System.Collections.Generic; |
||||
using System.Diagnostics; |
||||
using System.Linq; |
||||
using System.Threading; |
||||
using ErsatzTV.Core.Interfaces.FFmpeg; |
||||
using LanguageExt; |
||||
using Microsoft.Extensions.Logging; |
||||
|
||||
namespace ErsatzTV.Core.FFmpeg |
||||
{ |
||||
public class FFmpegSegmenterService : IFFmpegSegmenterService |
||||
{ |
||||
private static readonly ConcurrentDictionary<string, ProcessAndToken> Processes = new(); |
||||
|
||||
private readonly ILogger<FFmpegSegmenterService> _logger; |
||||
|
||||
public FFmpegSegmenterService(ILogger<FFmpegSegmenterService> logger) => _logger = logger; |
||||
|
||||
public bool ProcessExistsForChannel(string channelNumber) |
||||
{ |
||||
if (Processes.TryGetValue(channelNumber, out ProcessAndToken processAndToken)) |
||||
{ |
||||
if (!processAndToken.Process.HasExited || !Processes.TryRemove( |
||||
new KeyValuePair<string, ProcessAndToken>(channelNumber, processAndToken))) |
||||
{ |
||||
return true; |
||||
} |
||||
} |
||||
|
||||
return false; |
||||
} |
||||
|
||||
public bool TryAdd(string channelNumber, Process process) |
||||
{ |
||||
var cts = new CancellationTokenSource(); |
||||
var processAndToken = new ProcessAndToken(process, cts, DateTimeOffset.Now); |
||||
if (Processes.TryAdd(channelNumber, processAndToken)) |
||||
{ |
||||
CancellationToken token = cts.Token; |
||||
token.Register(process.Kill); |
||||
return true; |
||||
} |
||||
|
||||
return false; |
||||
} |
||||
|
||||
public void TouchChannel(string channelNumber) |
||||
{ |
||||
if (Processes.TryGetValue(channelNumber, out ProcessAndToken processAndToken)) |
||||
{ |
||||
ProcessAndToken newValue = processAndToken with { LastAccess = DateTimeOffset.Now }; |
||||
if (!Processes.TryUpdate(channelNumber, newValue, processAndToken)) |
||||
{ |
||||
_logger.LogWarning("Failed to update last access for channel {Channel}", channelNumber); |
||||
} |
||||
} |
||||
} |
||||
|
||||
public void CleanUpSessions() |
||||
{ |
||||
foreach ((string key, (_, CancellationTokenSource cts, DateTimeOffset lastAccess)) in Processes.ToList()) |
||||
{ |
||||
// TODO: configure this time span? 5 min?
|
||||
if (DateTimeOffset.Now.Subtract(lastAccess) > TimeSpan.FromMinutes(2)) |
||||
{ |
||||
_logger.LogDebug("Cleaning up ffmpeg session for channel {Channel}", key); |
||||
|
||||
cts.Cancel(); |
||||
Processes.TryRemove(key, out _); |
||||
} |
||||
} |
||||
} |
||||
|
||||
public Unit KillAll() |
||||
{ |
||||
foreach ((string key, ProcessAndToken processAndToken) in Processes.ToList()) |
||||
{ |
||||
try |
||||
{ |
||||
processAndToken.TokenSource.Cancel(); |
||||
Processes.TryRemove(key, out _); |
||||
} |
||||
catch (Exception ex) |
||||
{ |
||||
_logger.LogInformation(ex, "Error killing process"); |
||||
} |
||||
} |
||||
|
||||
return Unit.Default; |
||||
} |
||||
|
||||
private record ProcessAndToken(Process Process, CancellationTokenSource TokenSource, DateTimeOffset LastAccess); |
||||
} |
||||
} |
||||
@ -0,0 +1,14 @@
@@ -0,0 +1,14 @@
|
||||
using System.Diagnostics; |
||||
using LanguageExt; |
||||
|
||||
namespace ErsatzTV.Core.Interfaces.FFmpeg |
||||
{ |
||||
public interface IFFmpegSegmenterService |
||||
{ |
||||
bool ProcessExistsForChannel(string channelNumber); |
||||
bool TryAdd(string channelNumber, Process process); |
||||
void TouchChannel(string channelNumber); |
||||
void CleanUpSessions(); |
||||
Unit KillAll(); |
||||
} |
||||
} |
||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,36 @@
@@ -0,0 +1,36 @@
|
||||
using Microsoft.EntityFrameworkCore.Migrations; |
||||
|
||||
namespace ErsatzTV.Infrastructure.Migrations |
||||
{ |
||||
public partial class Add_MediaVersionRFrameRate : Migration |
||||
{ |
||||
protected override void Up(MigrationBuilder migrationBuilder) |
||||
{ |
||||
migrationBuilder.AddColumn<string>( |
||||
name: "RFrameRate", |
||||
table: "MediaVersion", |
||||
type: "TEXT", |
||||
nullable: true); |
||||
|
||||
migrationBuilder.Sql("UPDATE MediaVersion SET DateUpdated = '0001-01-01 00:00:00'"); |
||||
migrationBuilder.Sql("UPDATE LibraryFolder SET Etag = NULL"); |
||||
migrationBuilder.Sql("UPDATE EmbyMovie SET Etag = NULL"); |
||||
migrationBuilder.Sql("UPDATE EmbyShow SET Etag = NULL"); |
||||
migrationBuilder.Sql("UPDATE EmbySeason SET Etag = NULL"); |
||||
migrationBuilder.Sql("UPDATE EmbyEpisode SET Etag = NULL"); |
||||
migrationBuilder.Sql("UPDATE JellyfinMovie SET Etag = NULL"); |
||||
migrationBuilder.Sql("UPDATE JellyfinShow SET Etag = NULL"); |
||||
migrationBuilder.Sql("UPDATE JellyfinSeason SET Etag = NULL"); |
||||
migrationBuilder.Sql("UPDATE JellyfinEpisode SET Etag = NULL"); |
||||
migrationBuilder.Sql("UPDATE LibraryPath SET LastScan = '0001-01-01 00:00:00'"); |
||||
migrationBuilder.Sql("UPDATE Library SET LastScan = '0001-01-01 00:00:00'"); |
||||
} |
||||
|
||||
protected override void Down(MigrationBuilder migrationBuilder) |
||||
{ |
||||
migrationBuilder.DropColumn( |
||||
name: "RFrameRate", |
||||
table: "MediaVersion"); |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,34 @@
@@ -0,0 +1,34 @@
|
||||
using System; |
||||
using System.Threading; |
||||
using System.Threading.Channels; |
||||
using System.Threading.Tasks; |
||||
using ErsatzTV.Application; |
||||
using ErsatzTV.Application.Streaming.Commands; |
||||
using Microsoft.Extensions.Hosting; |
||||
using Microsoft.Extensions.Logging; |
||||
|
||||
namespace ErsatzTV.Services |
||||
{ |
||||
public class FFmpegSchedulerService : BackgroundService |
||||
{ |
||||
private readonly ILogger<FFmpegSchedulerService> _logger; |
||||
private readonly ChannelWriter<IFFmpegWorkerRequest> _workerChannel; |
||||
|
||||
public FFmpegSchedulerService( |
||||
ChannelWriter<IFFmpegWorkerRequest> workerChannel, |
||||
ILogger<FFmpegSchedulerService> logger) |
||||
{ |
||||
_workerChannel = workerChannel; |
||||
_logger = logger; |
||||
} |
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken cancellationToken) |
||||
{ |
||||
while (!cancellationToken.IsCancellationRequested) |
||||
{ |
||||
await _workerChannel.WriteAsync(new CleanUpFFmpegSessions(), cancellationToken); |
||||
await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,121 @@
@@ -0,0 +1,121 @@
|
||||
using System; |
||||
using System.Diagnostics; |
||||
using System.IO; |
||||
using System.Threading; |
||||
using System.Threading.Channels; |
||||
using System.Threading.Tasks; |
||||
using ErsatzTV.Application; |
||||
using ErsatzTV.Application.Streaming.Commands; |
||||
using ErsatzTV.Application.Streaming.Queries; |
||||
using ErsatzTV.Core; |
||||
using ErsatzTV.Core.Interfaces.FFmpeg; |
||||
using LanguageExt; |
||||
using MediatR; |
||||
using Microsoft.Extensions.DependencyInjection; |
||||
using Microsoft.Extensions.Hosting; |
||||
using Microsoft.Extensions.Logging; |
||||
using static LanguageExt.Prelude; |
||||
|
||||
namespace ErsatzTV.Services |
||||
{ |
||||
public class FFmpegWorkerService : BackgroundService |
||||
{ |
||||
private readonly ChannelReader<IFFmpegWorkerRequest> _channel; |
||||
private readonly ChannelWriter<IFFmpegWorkerRequest> _channelWriter; |
||||
private readonly ILogger<FFmpegWorkerService> _logger; |
||||
private readonly IFFmpegSegmenterService _ffmpegSegmenterService; |
||||
private readonly IServiceScopeFactory _serviceScopeFactory; |
||||
|
||||
public FFmpegWorkerService( |
||||
ChannelReader<IFFmpegWorkerRequest> channel, |
||||
ChannelWriter<IFFmpegWorkerRequest> channelWriter, |
||||
IServiceScopeFactory serviceScopeFactory, |
||||
ILogger<FFmpegWorkerService> logger, |
||||
IFFmpegSegmenterService ffmpegSegmenterService) |
||||
{ |
||||
_channel = channel; |
||||
_channelWriter = channelWriter; |
||||
_serviceScopeFactory = serviceScopeFactory; |
||||
_logger = logger; |
||||
_ffmpegSegmenterService = ffmpegSegmenterService; |
||||
} |
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken cancellationToken) |
||||
{ |
||||
_logger.LogInformation("FFmpeg worker service started"); |
||||
|
||||
await foreach (IFFmpegWorkerRequest request in _channel.ReadAllAsync(cancellationToken)) |
||||
{ |
||||
try |
||||
{ |
||||
using IServiceScope scope = _serviceScopeFactory.CreateScope(); |
||||
IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>(); |
||||
|
||||
switch (request) |
||||
{ |
||||
case TouchFFmpegSession touchFFmpegSession: |
||||
foreach (DirectoryInfo parent in Optional(Directory.GetParent(touchFFmpegSession.Path))) |
||||
{ |
||||
_ffmpegSegmenterService.TouchChannel(parent.Name); |
||||
} |
||||
break; |
||||
case CleanUpFFmpegSessions: |
||||
_ffmpegSegmenterService.CleanUpSessions(); |
||||
break; |
||||
case StartFFmpegSession startFFmpegSession: |
||||
_logger.LogInformation( |
||||
"Starting ffmpeg session for channel {Channel}", |
||||
startFFmpegSession.ChannelNumber); |
||||
|
||||
if (!_ffmpegSegmenterService.ProcessExistsForChannel(startFFmpegSession.ChannelNumber)) |
||||
{ |
||||
var req = new GetPlayoutItemProcessByChannelNumber( |
||||
startFFmpegSession.ChannelNumber, |
||||
"segmenter", |
||||
startFFmpegSession.StartAtZero); |
||||
Either<BaseError, Process> maybeProcess = await mediator.Send(req, cancellationToken); |
||||
maybeProcess.Match( |
||||
process => |
||||
{ |
||||
if (_ffmpegSegmenterService.TryAdd(startFFmpegSession.ChannelNumber, process)) |
||||
{ |
||||
_logger.LogDebug( |
||||
"ffmpeg hls arguments {FFmpegArguments}", |
||||
string.Join(" ", process.StartInfo.ArgumentList)); |
||||
|
||||
process.Start(); |
||||
process.EnableRaisingEvents = true; |
||||
process.Exited += (_, _) => |
||||
{ |
||||
if (process.ExitCode == 0) |
||||
{ |
||||
_channelWriter.TryWrite( |
||||
new StartFFmpegSession(startFFmpegSession.ChannelNumber, true)); |
||||
} |
||||
else |
||||
{ |
||||
_logger.LogDebug( |
||||
"hls segmenter for channel {Channel} exited with code {ExitCode}", |
||||
startFFmpegSession.ChannelNumber, |
||||
process.ExitCode); |
||||
} |
||||
}; |
||||
} |
||||
}, |
||||
_ => { }); |
||||
} |
||||
|
||||
break; |
||||
} |
||||
} |
||||
catch (Exception ex) |
||||
{ |
||||
_logger.LogWarning(ex, "Failed to handle ffmpeg worker request"); |
||||
} |
||||
} |
||||
|
||||
// kill any running processes after cancellation
|
||||
_ffmpegSegmenterService.KillAll(); |
||||
} |
||||
} |
||||
} |
||||
Loading…
Reference in new issue