@ -1,18 +1,37 @@
@@ -1,18 +1,37 @@
using System.Diagnostics ;
using System.Text ;
using Bugsnag ;
using ErsatzTV.Core ;
using ErsatzTV.Core.Domain ;
using ErsatzTV.Core.FFmpeg ;
using ErsatzTV.Core.Interfaces.FFmpeg ;
using ErsatzTV.Core.Interfaces.Metadata ;
using ErsatzTV.Core.Interfaces.Repositories ;
using Microsoft.Extensions.Logging ;
using Newtonsoft.Json ;
namespace ErsatzTV.Application.Streaming ;
public class GetLastPtsDurationHandler : IRequestHandler < GetLastPtsDuration , Either < BaseError , PtsAndDuration > >
{
private readonly IClient _ client ;
private readonly ILocalFileSystem _l ocalFileSystem ;
private readonly ITempFilePool _ tempFilePool ;
private readonly IConfigElementRepository _ configElementRepository ;
private readonly ILogger < GetLastPtsDurationHandler > _l ogger ;
public GetLastPtsDurationHandler ( IConfigElementRepository configElementRepository )
public GetLastPtsDurationHandler (
IClient client ,
ILocalFileSystem localFileSystem ,
ITempFilePool tempFilePool ,
IConfigElementRepository configElementRepository ,
ILogger < GetLastPtsDurationHandler > logger )
{
_ client = client ;
_l ocalFileSystem = localFileSystem ;
_ tempFilePool = tempFilePool ;
_ configElementRepository = configElementRepository ;
_l ogger = logger ;
}
public async Task < Either < BaseError , PtsAndDuration > > Handle (
@ -21,61 +40,123 @@ public class GetLastPtsDurationHandler : IRequestHandler<GetLastPtsDuration, Eit
@@ -21,61 +40,123 @@ public class GetLastPtsDurationHandler : IRequestHandler<GetLastPtsDuration, Eit
{
Validation < BaseError , RequestParameters > validation = await Validate ( request ) ;
return await validation . Match (
Handle ,
parameters = > Handle ( parameters , cancellationToken ) ,
error = > Task . FromResult < Either < BaseError , PtsAndDuration > > ( error . Join ( ) ) ) ;
}
private async Task < Validation < BaseError , RequestParameters > > Validate ( GetLastPtsDuration request ) = >
await ValidateFFprobePath ( )
. MapT (
ffprobePath = > new RequestParameters (
request . FileName ,
ffprobePath ) ) ;
await ValidateFFprobePath ( ) . MapT ( ffprobePath = > new RequestParameters ( request . ChannelNumber , ffprobePath ) ) ;
private async Task < Either < BaseError , PtsAndDuration > > Handle ( RequestParameters parameters )
private async Task < Either < BaseError , PtsAndDuration > > Handle (
RequestParameters parameters ,
CancellationToken cancellationToken )
{
var startInfo = new ProcessStartInfo
{
FileName = parameters . FFprobePath ,
RedirectStandardOutput = true ,
RedirectStandardError = true ,
UseShellExecute = false ,
StandardOutputEncoding = Encoding . UTF8 ,
StandardErrorEncoding = Encoding . UTF8
} ;
startInfo . ArgumentList . Add ( "-v" ) ;
startInfo . ArgumentList . Add ( "0" ) ;
startInfo . ArgumentList . Add ( "-show_entries" ) ;
startInfo . ArgumentList . Add ( "packet=pts,duration" ) ;
startInfo . ArgumentList . Add ( "-of" ) ;
startInfo . ArgumentList . Add ( "compact=p=0:nk=1" ) ;
startInfo . ArgumentList . Add ( "-read_intervals" ) ;
startInfo . ArgumentList . Add ( "-999999" ) ;
startInfo . ArgumentList . Add ( parameters . FileName ) ;
var probe = new Process
Option < FileInfo > maybeLastSegment = GetLastSegment ( parameters . ChannelNumber ) ;
foreach ( FileInfo segment in maybeLastSegment )
{
StartInfo = startInfo
} ;
var startInfo = new ProcessStartInfo
{
FileName = parameters . FFprobePath ,
RedirectStandardOutput = true ,
RedirectStandardError = true ,
UseShellExecute = false ,
StandardOutputEncoding = Encoding . UTF8 ,
StandardErrorEncoding = Encoding . UTF8
} ;
startInfo . ArgumentList . Add ( "-v" ) ;
startInfo . ArgumentList . Add ( "0" ) ;
startInfo . ArgumentList . Add ( "-show_entries" ) ;
startInfo . ArgumentList . Add ( "packet=pts,duration" ) ;
startInfo . ArgumentList . Add ( "-of" ) ;
startInfo . ArgumentList . Add ( "compact=p=0:nk=1" ) ;
startInfo . ArgumentList . Add ( "-read_intervals" ) ;
startInfo . ArgumentList . Add ( "-999999" ) ;
startInfo . ArgumentList . Add ( segment . FullName ) ;
var probe = new Process
{
StartInfo = startInfo
} ;
probe . Start ( ) ;
string output = await probe . StandardOutput . ReadToEndAsync ( ) ;
await probe . WaitForExitAsync ( cancellationToken ) ;
if ( probe . ExitCode ! = 0 )
{
return BaseError . New ( $"FFprobe at {parameters.FFprobePath} exited with code {probe.ExitCode}" ) ;
}
probe . Start ( ) ;
return await probe . StandardOutput . ReadToEndAsync ( ) . MapAsync < string , Either < BaseError , PtsAndDuration > > (
async output = >
try
{
string [ ] lines = output . Split ( "\n" ) ;
IEnumerable < string > nonEmptyLines = lines . Filter ( s = > ! string . IsNullOrWhiteSpace ( s ) ) . Map ( l = > l . Trim ( ) ) ;
return PtsAndDuration . From ( nonEmptyLines . Last ( ) ) ;
}
catch ( Exception ex )
{
await probe . WaitForExitAsync ( ) ;
return probe . ExitCode = = 0
? PtsAndDuration . From ( output . Split ( "\n" ) . Filter ( s = > ! string . IsNullOrWhiteSpace ( s ) ) . Last ( ) . Trim ( ) )
: BaseError . New ( $"FFprobe at {parameters.FFprobePath} exited with code {probe.ExitCode}" ) ;
} ) ;
_ client . Notify ( ex ) ;
await SaveTroubleshootingData ( parameters . ChannelNumber , output ) ;
}
}
return BaseError . New ( $"Failed to determine last pts duration for channel {parameters.ChannelNumber}" ) ;
}
private static Option < FileInfo > GetLastSegment ( string channelNumber )
{
var directory = new DirectoryInfo ( Path . Combine ( FileSystemLayout . TranscodeFolder , channelNumber ) ) ;
return Optional ( directory . GetFiles ( "*.ts" ) . OrderByDescending ( f = > f . Name ) . FirstOrDefault ( ) ) ;
}
private Task < Validation < BaseError , string > > ValidateFFprobePath ( ) = >
_ configElementRepository . GetValue < string > ( ConfigElementKey . FFprobePath )
. FilterT ( File . Exists )
. Map (
ffprobePath = >
ffprobePath . ToValidation < BaseError > ( "FFprobe path does not exist on the file system" ) ) ;
. Map ( ffprobePath = > ffprobePath . ToValidation < BaseError > ( "FFprobe path does not exist on the file system" ) ) ;
private async Task SaveTroubleshootingData ( string channelNumber , string output )
{
try
{
var directory = new DirectoryInfo ( Path . Combine ( FileSystemLayout . TranscodeFolder , channelNumber ) ) ;
FileInfo [ ] allFiles = directory . GetFiles ( ) ;
string playlistFileName = Path . Combine ( FileSystemLayout . TranscodeFolder , channelNumber , "live.m3u8" ) ;
string playlistContents = string . Empty ;
if ( _l ocalFileSystem . FileExists ( playlistFileName ) )
{
playlistContents = await File . ReadAllTextAsync ( playlistFileName ) ;
}
var data = new TroubleshootingData ( allFiles , playlistContents , output ) ;
string serialized = data . Serialize ( ) ;
string file = _ tempFilePool . GetNextTempFile ( TempFileCategory . BadTranscodeFolder ) ;
await File . WriteAllTextAsync ( file , serialized ) ;
_l ogger . LogWarning ( "Transcode folder is in bad state; troubleshooting info saved to {File}" , file ) ;
}
catch ( Exception ex )
{
_ client . Notify ( ex ) ;
}
}
private record RequestParameters ( string ChannelNumber , string FFprobePath ) ;
private record TroubleshootingData ( IEnumerable < FileInfo > Files , string Playlist , string ProbeOutput )
{
private record FileData ( string FileName , long Bytes , DateTime LastWriteTimeUtc ) ;
private record InternalData ( List < FileData > Files , string EncodedPlaylist , string EncodedProbeOutput ) ;
private record RequestParameters ( string FileName , string FFprobePath ) ;
public string Serialize ( )
{
var data = new InternalData (
Files . Map ( f = > new FileData ( f . FullName , f . Length , f . LastWriteTimeUtc ) ) . ToList ( ) ,
Convert . ToBase64String ( Encoding . UTF8 . GetBytes ( Playlist ) ) ,
Convert . ToBase64String ( Encoding . UTF8 . GetBytes ( ProbeOutput ) ) ) ;
return JsonConvert . SerializeObject ( data ) ;
}
}
}