mirror of https://github.com/ErsatzTV/ErsatzTV.git
Browse Source
* rework hls segmenter to start more quickly * don't use realtime encoding for hls until we're at least a minute ahead * ugly but functional playlist filteringpull/408/head
31 changed files with 546 additions and 335 deletions
@ -1,9 +0,0 @@
@@ -1,9 +0,0 @@
|
||||
using ErsatzTV.Core; |
||||
using LanguageExt; |
||||
using MediatR; |
||||
using Unit = LanguageExt.Unit; |
||||
|
||||
namespace ErsatzTV.Application.Streaming.Commands |
||||
{ |
||||
public record CleanUpFFmpegSessions : IRequest<Either<BaseError, Unit>>, IFFmpegWorkerRequest; |
||||
} |
||||
@ -1,27 +0,0 @@
@@ -1,27 +0,0 @@
|
||||
using System.Threading; |
||||
using System.Threading.Channels; |
||||
using System.Threading.Tasks; |
||||
using ErsatzTV.Core; |
||||
using LanguageExt; |
||||
using MediatR; |
||||
using Unit = LanguageExt.Unit; |
||||
|
||||
namespace ErsatzTV.Application.Streaming.Commands |
||||
{ |
||||
public class CleanUpFFmpegSessionsHandler : IRequestHandler<CleanUpFFmpegSessions, Either<BaseError, Unit>> |
||||
{ |
||||
private readonly ChannelWriter<IFFmpegWorkerRequest> _channel; |
||||
|
||||
public CleanUpFFmpegSessionsHandler(ChannelWriter<IFFmpegWorkerRequest> channel) |
||||
{ |
||||
_channel = channel; |
||||
} |
||||
|
||||
public async Task<Either<BaseError, Unit>> |
||||
Handle(CleanUpFFmpegSessions request, CancellationToken cancellationToken) |
||||
{ |
||||
await _channel.WriteAsync(request, cancellationToken); |
||||
return Unit.Default; |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,207 @@
@@ -0,0 +1,207 @@
|
||||
using System; |
||||
using System.Diagnostics; |
||||
using System.IO; |
||||
using System.Threading; |
||||
using System.Threading.Tasks; |
||||
using System.Timers; |
||||
using ErsatzTV.Application.Streaming.Queries; |
||||
using ErsatzTV.Core; |
||||
using ErsatzTV.Core.FFmpeg; |
||||
using ErsatzTV.Core.Interfaces.FFmpeg; |
||||
using LanguageExt; |
||||
using MediatR; |
||||
using Microsoft.Extensions.DependencyInjection; |
||||
using Microsoft.Extensions.Logging; |
||||
using Timer = System.Timers.Timer; |
||||
|
||||
namespace ErsatzTV.Application.Streaming |
||||
{ |
||||
public class HlsSessionWorker : IHlsSessionWorker |
||||
{ |
||||
private readonly IServiceScopeFactory _serviceScopeFactory; |
||||
private readonly ILogger<HlsSessionWorker> _logger; |
||||
private DateTimeOffset _lastAccess; |
||||
private DateTimeOffset _transcodedUntil; |
||||
private readonly Timer _timer = new(TimeSpan.FromMinutes(2).TotalMilliseconds) { AutoReset = false }; |
||||
private readonly object _sync = new(); |
||||
private DateTimeOffset _playlistStart; |
||||
|
||||
public HlsSessionWorker(IServiceScopeFactory serviceScopeFactory, ILogger<HlsSessionWorker> logger) |
||||
{ |
||||
_serviceScopeFactory = serviceScopeFactory; |
||||
_logger = logger; |
||||
} |
||||
|
||||
public DateTimeOffset PlaylistStart => _playlistStart; |
||||
|
||||
public void Touch() |
||||
{ |
||||
lock (_sync) |
||||
{ |
||||
_lastAccess = DateTimeOffset.Now; |
||||
|
||||
_timer.Stop(); |
||||
_timer.Start(); |
||||
} |
||||
} |
||||
|
||||
public async Task Run(string channelNumber) |
||||
{ |
||||
var cts = new CancellationTokenSource(); |
||||
void Cancel(object o, ElapsedEventArgs e) => cts.Cancel(); |
||||
|
||||
try |
||||
{ |
||||
_timer.Elapsed += Cancel; |
||||
|
||||
CancellationToken cancellationToken = cts.Token; |
||||
|
||||
_logger.LogInformation("Starting HLS session for channel {Channel}", channelNumber); |
||||
|
||||
Touch(); |
||||
_transcodedUntil = DateTimeOffset.Now; |
||||
_playlistStart = _transcodedUntil; |
||||
|
||||
// start initial transcode WITHOUT realtime throttle
|
||||
if (!await Transcode(channelNumber, true, false, cancellationToken)) |
||||
{ |
||||
return; |
||||
} |
||||
|
||||
while (!cancellationToken.IsCancellationRequested) |
||||
{ |
||||
// TODO: configurable? 5 minutes?
|
||||
if (DateTimeOffset.Now - _lastAccess > TimeSpan.FromMinutes(2)) |
||||
{ |
||||
_logger.LogInformation("Stopping idle HLS session for channel {Channel}", channelNumber); |
||||
return; |
||||
} |
||||
|
||||
var transcodedBuffer = TimeSpan.FromSeconds( |
||||
Math.Max(0, _transcodedUntil.Subtract(DateTimeOffset.Now).TotalSeconds)); |
||||
if (transcodedBuffer <= TimeSpan.FromMinutes(1)) |
||||
{ |
||||
// only use realtime encoding when we're at least 30 seconds ahead
|
||||
bool realtime = transcodedBuffer >= TimeSpan.FromSeconds(30); |
||||
if (!await Transcode(channelNumber, false, realtime, cancellationToken)) |
||||
{ |
||||
return; |
||||
} |
||||
} |
||||
else |
||||
{ |
||||
await TrimAndDelete(channelNumber, cancellationToken); |
||||
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); |
||||
} |
||||
} |
||||
} |
||||
finally |
||||
{ |
||||
_timer.Elapsed -= Cancel; |
||||
} |
||||
} |
||||
|
||||
private async Task<bool> Transcode(string channelNumber, bool firstProcess, bool realtime, CancellationToken cancellationToken) |
||||
{ |
||||
try |
||||
{ |
||||
using IServiceScope scope = _serviceScopeFactory.CreateScope(); |
||||
IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>(); |
||||
|
||||
var request = new GetPlayoutItemProcessByChannelNumber( |
||||
channelNumber, |
||||
"segmenter", |
||||
firstProcess ? DateTimeOffset.Now : _transcodedUntil.AddSeconds(1), |
||||
!firstProcess, |
||||
realtime); |
||||
|
||||
// _logger.LogInformation("Request {@Request}", request);
|
||||
|
||||
Either<BaseError, PlayoutItemProcessModel> result = await mediator.Send(request, cancellationToken); |
||||
|
||||
// _logger.LogInformation("Result {Result}", result.ToString());
|
||||
|
||||
foreach (BaseError error in result.LeftAsEnumerable()) |
||||
{ |
||||
_logger.LogWarning( |
||||
"Failed to create process for HLS session on channel {Channel}: {Error}", |
||||
channelNumber, |
||||
error.ToString()); |
||||
|
||||
return false; |
||||
} |
||||
|
||||
foreach (PlayoutItemProcessModel processModel in result.RightAsEnumerable()) |
||||
{ |
||||
await TrimAndDelete(channelNumber, cancellationToken); |
||||
|
||||
Process process = processModel.Process; |
||||
|
||||
_logger.LogDebug( |
||||
"ffmpeg hls arguments {FFmpegArguments}", |
||||
string.Join(" ", process.StartInfo.ArgumentList)); |
||||
|
||||
process.Start(); |
||||
try |
||||
{ |
||||
await process.WaitForExitAsync(cancellationToken); |
||||
process.WaitForExit(); |
||||
} |
||||
catch (TaskCanceledException) |
||||
{ |
||||
_logger.LogInformation("Terminating HLS process for channel {Channel}", channelNumber); |
||||
process.Kill(); |
||||
process.WaitForExit(); |
||||
|
||||
return false; |
||||
} |
||||
|
||||
_logger.LogInformation("HLS process has completed for channel {Channel}", channelNumber); |
||||
|
||||
_transcodedUntil = processModel.Until; |
||||
} |
||||
} |
||||
catch (Exception ex) |
||||
{ |
||||
_logger.LogError(ex, "Error transcoding channel {Channel}", channelNumber); |
||||
return false; |
||||
} |
||||
|
||||
return true; |
||||
} |
||||
|
||||
private async Task TrimAndDelete(string channelNumber, CancellationToken cancellationToken) |
||||
{ |
||||
string playlistFileName = Path.Combine( |
||||
FileSystemLayout.TranscodeFolder, |
||||
channelNumber, |
||||
"live.m3u8"); |
||||
|
||||
if (File.Exists(playlistFileName)) |
||||
{ |
||||
// trim playlist and insert discontinuity before appending with new ffmpeg process
|
||||
string[] lines = await File.ReadAllLinesAsync(playlistFileName, cancellationToken); |
||||
TrimPlaylistResult trimResult = HlsPlaylistFilter.TrimPlaylistWithDiscontinuity( |
||||
_playlistStart, |
||||
DateTimeOffset.Now.AddMinutes(-1), |
||||
lines); |
||||
await File.WriteAllTextAsync(playlistFileName, trimResult.Playlist, cancellationToken); |
||||
|
||||
// delete old segments
|
||||
foreach (string file in Directory.GetFiles( |
||||
Path.Combine(FileSystemLayout.TranscodeFolder, channelNumber), |
||||
"*.ts")) |
||||
{ |
||||
string fileName = Path.GetFileName(file); |
||||
if (fileName.StartsWith("live") && int.Parse(fileName.Replace("live", string.Empty).Split('.')[0]) < |
||||
int.Parse(trimResult.Sequence)) |
||||
{ |
||||
File.Delete(file); |
||||
} |
||||
} |
||||
|
||||
_playlistStart = trimResult.PlaylistStart; |
||||
} |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,7 @@
@@ -0,0 +1,7 @@
|
||||
using System; |
||||
using System.Diagnostics; |
||||
|
||||
namespace ErsatzTV.Application.Streaming |
||||
{ |
||||
public record PlayoutItemProcessModel(Process Process, DateTimeOffset Until); |
||||
} |
||||
@ -1,9 +0,0 @@
@@ -1,9 +0,0 @@
|
||||
namespace ErsatzTV.Core.Errors |
||||
{ |
||||
public class ChannelHasProcess : BaseError |
||||
{ |
||||
public ChannelHasProcess() : base("Channel already has ffmpeg process") |
||||
{ |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,9 @@
@@ -0,0 +1,9 @@
|
||||
namespace ErsatzTV.Core.Errors |
||||
{ |
||||
public class ChannelSessionAlreadyActive : BaseError |
||||
{ |
||||
public ChannelSessionAlreadyActive() : base("Channel already has HLS session") |
||||
{ |
||||
} |
||||
} |
||||
} |
||||
@ -1,98 +1,23 @@
@@ -1,98 +1,23 @@
|
||||
using System; |
||||
using System.Collections.Concurrent; |
||||
using System.Collections.Generic; |
||||
using System.Diagnostics; |
||||
using System.Linq; |
||||
using System.Threading; |
||||
using System.Collections.Concurrent; |
||||
using ErsatzTV.Core.Interfaces.FFmpeg; |
||||
using LanguageExt; |
||||
using Microsoft.Extensions.Logging; |
||||
|
||||
namespace ErsatzTV.Core.FFmpeg |
||||
{ |
||||
public class FFmpegSegmenterService : IFFmpegSegmenterService |
||||
{ |
||||
public static readonly TimeSpan SegmenterDelay = TimeSpan.FromSeconds(3); |
||||
|
||||
private static readonly ConcurrentDictionary<string, ProcessAndToken> Processes = new(); |
||||
|
||||
private readonly ILogger<FFmpegSegmenterService> _logger; |
||||
|
||||
public FFmpegSegmenterService(ILogger<FFmpegSegmenterService> logger) => _logger = logger; |
||||
|
||||
public bool ProcessExistsForChannel(string channelNumber) |
||||
public FFmpegSegmenterService() |
||||
{ |
||||
if (Processes.TryGetValue(channelNumber, out ProcessAndToken processAndToken)) |
||||
{ |
||||
if (!processAndToken.Process.HasExited || !Processes.TryRemove( |
||||
new KeyValuePair<string, ProcessAndToken>(channelNumber, processAndToken))) |
||||
{ |
||||
return true; |
||||
} |
||||
} |
||||
|
||||
return false; |
||||
SessionWorkers = new ConcurrentDictionary<string, IHlsSessionWorker>(); |
||||
} |
||||
|
||||
public bool TryAdd(string channelNumber, Process process) |
||||
{ |
||||
var cts = new CancellationTokenSource(); |
||||
var processAndToken = new ProcessAndToken(process, cts, DateTimeOffset.Now); |
||||
if (Processes.TryAdd(channelNumber, processAndToken)) |
||||
{ |
||||
CancellationToken token = cts.Token; |
||||
token.Register(process.Kill); |
||||
return true; |
||||
} |
||||
|
||||
return false; |
||||
} |
||||
public ConcurrentDictionary<string, IHlsSessionWorker> SessionWorkers { get; } |
||||
|
||||
public void TouchChannel(string channelNumber) |
||||
{ |
||||
if (Processes.TryGetValue(channelNumber, out ProcessAndToken processAndToken)) |
||||
{ |
||||
ProcessAndToken newValue = processAndToken with { LastAccess = DateTimeOffset.Now }; |
||||
if (!Processes.TryUpdate(channelNumber, newValue, processAndToken)) |
||||
{ |
||||
_logger.LogWarning("Failed to update last access for channel {Channel}", channelNumber); |
||||
} |
||||
} |
||||
} |
||||
|
||||
public void CleanUpSessions() |
||||
{ |
||||
foreach ((string key, (_, CancellationTokenSource cts, DateTimeOffset lastAccess)) in Processes.ToList()) |
||||
{ |
||||
// TODO: configure this time span? 5 min?
|
||||
if (DateTimeOffset.Now.Subtract(lastAccess) > TimeSpan.FromMinutes(2)) |
||||
{ |
||||
_logger.LogDebug("Cleaning up ffmpeg session for channel {Channel}", key); |
||||
|
||||
cts.Cancel(); |
||||
Processes.TryRemove(key, out _); |
||||
} |
||||
} |
||||
} |
||||
|
||||
public Unit KillAll() |
||||
{ |
||||
foreach ((string key, ProcessAndToken processAndToken) in Processes.ToList()) |
||||
if (SessionWorkers.TryGetValue(channelNumber, out IHlsSessionWorker worker)) |
||||
{ |
||||
try |
||||
{ |
||||
processAndToken.TokenSource.Cancel(); |
||||
Processes.TryRemove(key, out _); |
||||
} |
||||
catch (Exception ex) |
||||
{ |
||||
_logger.LogInformation(ex, "Error killing process"); |
||||
} |
||||
worker?.Touch(); |
||||
} |
||||
|
||||
return Unit.Default; |
||||
} |
||||
|
||||
private record ProcessAndToken(Process Process, CancellationTokenSource TokenSource, DateTimeOffset LastAccess); |
||||
} |
||||
} |
||||
|
||||
@ -0,0 +1,111 @@
@@ -0,0 +1,111 @@
|
||||
using System; |
||||
using System.Text; |
||||
|
||||
namespace ErsatzTV.Core.FFmpeg |
||||
{ |
||||
public class HlsPlaylistFilter |
||||
{ |
||||
public static TrimPlaylistResult TrimPlaylist( |
||||
DateTimeOffset playlistStart, |
||||
DateTimeOffset filterBefore, |
||||
string[] lines, |
||||
int maxSegments = 10, |
||||
bool endWithDiscontinuity = false) |
||||
{ |
||||
DateTimeOffset currentTime = playlistStart; |
||||
DateTimeOffset nextPlaylistStart = DateTimeOffset.MaxValue; |
||||
|
||||
var discontinuitySequence = 0; |
||||
var startSequence = "0"; |
||||
var output = new StringBuilder(); |
||||
var started = false; |
||||
var i = 0; |
||||
var segments = 0; |
||||
while (!lines[i].StartsWith("#EXTINF:")) |
||||
{ |
||||
if (lines[i].StartsWith("#EXT-X-DISCONTINUITY-SEQUENCE")) |
||||
{ |
||||
discontinuitySequence = int.Parse(lines[i].Split(':')[1]); |
||||
} |
||||
|
||||
i++; |
||||
} |
||||
|
||||
while (i < lines.Length) |
||||
{ |
||||
if (segments >= maxSegments) |
||||
{ |
||||
break; |
||||
} |
||||
|
||||
string line = lines[i]; |
||||
// _logger.LogInformation("Line: {Line}", line);
|
||||
if (line.StartsWith("#EXT-X-DISCONTINUITY")) |
||||
{ |
||||
if (started) |
||||
{ |
||||
output.AppendLine("#EXT-X-DISCONTINUITY"); |
||||
} |
||||
else |
||||
{ |
||||
discontinuitySequence++; |
||||
} |
||||
|
||||
i++; |
||||
continue; |
||||
} |
||||
|
||||
var duration = TimeSpan.FromSeconds(double.Parse(lines[i].TrimEnd(',').Split(':')[1])); |
||||
if (currentTime < filterBefore) |
||||
{ |
||||
currentTime += duration; |
||||
i += 3; |
||||
continue; |
||||
} |
||||
|
||||
nextPlaylistStart = currentTime < nextPlaylistStart ? currentTime : nextPlaylistStart; |
||||
|
||||
if (!started) |
||||
{ |
||||
startSequence = lines[i + 2].Replace("live", string.Empty).Split('.')[0]; |
||||
|
||||
output.AppendLine("#EXTM3U"); |
||||
output.AppendLine("#EXT-X-VERSION:3"); |
||||
output.AppendLine("#EXT-X-TARGETDURATION:4"); |
||||
output.AppendLine($"#EXT-X-MEDIA-SEQUENCE:{startSequence}"); |
||||
output.AppendLine($"#EXT-X-DISCONTINUITY-SEQUENCE:{discontinuitySequence}"); |
||||
output.AppendLine("#EXT-X-INDEPENDENT-SEGMENTS"); |
||||
output.AppendLine("#EXT-X-DISCONTINUITY"); |
||||
|
||||
started = true; |
||||
} |
||||
|
||||
output.AppendLine(lines[i]); |
||||
string offset = currentTime.ToString("zzz").Replace(":", string.Empty); |
||||
output.AppendLine($"#EXT-X-PROGRAM-DATE-TIME:{currentTime:yyyy-MM-ddTHH:mm:ss.fff}{offset}"); |
||||
output.AppendLine(lines[i + 2]); |
||||
|
||||
currentTime += duration; |
||||
segments++; |
||||
i += 3; |
||||
} |
||||
|
||||
if (endWithDiscontinuity) |
||||
{ |
||||
output.AppendLine("#EXT-X-DISCONTINUITY"); |
||||
} |
||||
|
||||
return new TrimPlaylistResult(nextPlaylistStart, startSequence, output.ToString()); |
||||
} |
||||
|
||||
public static TrimPlaylistResult TrimPlaylistWithDiscontinuity( |
||||
DateTimeOffset playlistStart, |
||||
DateTimeOffset filterBefore, |
||||
string[] lines) |
||||
{ |
||||
return TrimPlaylist(playlistStart, filterBefore, lines, int.MaxValue, true); |
||||
} |
||||
} |
||||
|
||||
public record TrimPlaylistResult(DateTimeOffset PlaylistStart, string Sequence, string Playlist); |
||||
} |
||||
@ -1,14 +1,11 @@
@@ -1,14 +1,11 @@
|
||||
using System.Diagnostics; |
||||
using LanguageExt; |
||||
using System.Collections.Concurrent; |
||||
|
||||
namespace ErsatzTV.Core.Interfaces.FFmpeg |
||||
{ |
||||
public interface IFFmpegSegmenterService |
||||
{ |
||||
bool ProcessExistsForChannel(string channelNumber); |
||||
bool TryAdd(string channelNumber, Process process); |
||||
ConcurrentDictionary<string, IHlsSessionWorker> SessionWorkers { get; } |
||||
|
||||
void TouchChannel(string channelNumber); |
||||
void CleanUpSessions(); |
||||
Unit KillAll(); |
||||
} |
||||
} |
||||
|
||||
@ -0,0 +1,10 @@
@@ -0,0 +1,10 @@
|
||||
using System; |
||||
|
||||
namespace ErsatzTV.Core.Interfaces.FFmpeg |
||||
{ |
||||
public interface IHlsSessionWorker |
||||
{ |
||||
DateTimeOffset PlaylistStart { get; } |
||||
void Touch(); |
||||
} |
||||
} |
||||
@ -1,34 +0,0 @@
@@ -1,34 +0,0 @@
|
||||
using System; |
||||
using System.Threading; |
||||
using System.Threading.Channels; |
||||
using System.Threading.Tasks; |
||||
using ErsatzTV.Application; |
||||
using ErsatzTV.Application.Streaming.Commands; |
||||
using Microsoft.Extensions.Hosting; |
||||
using Microsoft.Extensions.Logging; |
||||
|
||||
namespace ErsatzTV.Services |
||||
{ |
||||
public class FFmpegSchedulerService : BackgroundService |
||||
{ |
||||
private readonly ILogger<FFmpegSchedulerService> _logger; |
||||
private readonly ChannelWriter<IFFmpegWorkerRequest> _workerChannel; |
||||
|
||||
public FFmpegSchedulerService( |
||||
ChannelWriter<IFFmpegWorkerRequest> workerChannel, |
||||
ILogger<FFmpegSchedulerService> logger) |
||||
{ |
||||
_workerChannel = workerChannel; |
||||
_logger = logger; |
||||
} |
||||
|
||||
protected override async Task ExecuteAsync(CancellationToken cancellationToken) |
||||
{ |
||||
while (!cancellationToken.IsCancellationRequested) |
||||
{ |
||||
await _workerChannel.WriteAsync(new CleanUpFFmpegSessions(), cancellationToken); |
||||
await Task.Delay(TimeSpan.FromMinutes(1), cancellationToken); |
||||
} |
||||
} |
||||
} |
||||
} |
||||
Loading…
Reference in new issue