mirror of https://github.com/ErsatzTV/ErsatzTV.git
Browse Source
* initial plumbing * scan for plex networks * save network contents to db as tags * eliminate network tag id churn * add network and show_network to search index * update last networks scan * show networks on tv show page * update changelogpull/2086/head
42 changed files with 24134 additions and 33 deletions
@ -0,0 +1,83 @@
@@ -0,0 +1,83 @@
|
||||
using System.Globalization; |
||||
using System.Threading.Channels; |
||||
using ErsatzTV.Application.Libraries; |
||||
using ErsatzTV.Core; |
||||
using ErsatzTV.Core.Errors; |
||||
using ErsatzTV.Core.Interfaces.Repositories; |
||||
using ErsatzTV.FFmpeg.Runtime; |
||||
using ErsatzTV.Infrastructure.Data; |
||||
using ErsatzTV.Infrastructure.Extensions; |
||||
using Microsoft.EntityFrameworkCore; |
||||
|
||||
namespace ErsatzTV.Application.Plex; |
||||
|
||||
public class CallPlexNetworkScannerHandler : CallLibraryScannerHandler<SynchronizePlexNetworks>, |
||||
IRequestHandler<SynchronizePlexNetworks, Either<BaseError, Unit>> |
||||
{ |
||||
public CallPlexNetworkScannerHandler( |
||||
IDbContextFactory<TvContext> dbContextFactory, |
||||
IConfigElementRepository configElementRepository, |
||||
ChannelWriter<ISearchIndexBackgroundServiceRequest> channel, |
||||
IMediator mediator, |
||||
IRuntimeInfo runtimeInfo) : base(dbContextFactory, configElementRepository, channel, mediator, runtimeInfo) |
||||
{ |
||||
} |
||||
|
||||
public async Task<Either<BaseError, Unit>> |
||||
Handle(SynchronizePlexNetworks request, CancellationToken cancellationToken) |
||||
{ |
||||
Validation<BaseError, string> validation = await Validate(request); |
||||
return await validation.Match( |
||||
scanner => PerformScan(scanner, request, cancellationToken), |
||||
error => |
||||
{ |
||||
foreach (ScanIsNotRequired scanIsNotRequired in error.OfType<ScanIsNotRequired>()) |
||||
{ |
||||
return Task.FromResult<Either<BaseError, Unit>>(scanIsNotRequired); |
||||
} |
||||
|
||||
return Task.FromResult<Either<BaseError, Unit>>(error.Join()); |
||||
}); |
||||
} |
||||
|
||||
protected override async Task<DateTimeOffset> GetLastScan(TvContext dbContext, SynchronizePlexNetworks request) |
||||
{ |
||||
DateTime minDateTime = await dbContext.PlexLibraries |
||||
.SelectOneAsync(l => l.Id, l => l.Id == request.PlexLibraryId) |
||||
.Match(l => l.LastNetworksScan ?? SystemTime.MinValueUtc, () => SystemTime.MaxValueUtc); |
||||
|
||||
return new DateTimeOffset(minDateTime, TimeSpan.Zero); |
||||
} |
||||
|
||||
protected override bool ScanIsRequired( |
||||
DateTimeOffset lastScan, |
||||
int libraryRefreshInterval, |
||||
SynchronizePlexNetworks request) |
||||
{ |
||||
if (lastScan == SystemTime.MaxValueUtc) |
||||
{ |
||||
return false; |
||||
} |
||||
|
||||
DateTimeOffset nextScan = lastScan + TimeSpan.FromHours(libraryRefreshInterval); |
||||
return request.ForceScan || libraryRefreshInterval > 0 && nextScan < DateTimeOffset.Now; |
||||
} |
||||
|
||||
private async Task<Either<BaseError, Unit>> PerformScan( |
||||
string scanner, |
||||
SynchronizePlexNetworks request, |
||||
CancellationToken cancellationToken) |
||||
{ |
||||
var arguments = new List<string> |
||||
{ |
||||
"scan-plex-networks", request.PlexLibraryId.ToString(CultureInfo.InvariantCulture) |
||||
}; |
||||
|
||||
if (request.ForceScan) |
||||
{ |
||||
arguments.Add("--force"); |
||||
} |
||||
|
||||
return await base.PerformScan(scanner, arguments, cancellationToken).MapT(_ => Unit.Default); |
||||
} |
||||
} |
@ -0,0 +1,6 @@
@@ -0,0 +1,6 @@
|
||||
using ErsatzTV.Core; |
||||
|
||||
namespace ErsatzTV.Application.Plex; |
||||
|
||||
public record SynchronizePlexNetworks(int PlexLibraryId, bool ForceScan) : IRequest<Either<BaseError, Unit>>, |
||||
IScannerBackgroundServiceRequest; |
@ -0,0 +1,9 @@
@@ -0,0 +1,9 @@
|
||||
namespace ErsatzTV.Core.Domain; |
||||
|
||||
public class PlexTag |
||||
{ |
||||
public int Id { get; set; } |
||||
public string Filter { get; set; } |
||||
public string Tag { get; set; } |
||||
public int TagType { get; set; } |
||||
} |
@ -0,0 +1,13 @@
@@ -0,0 +1,13 @@
|
||||
using ErsatzTV.Core.Domain; |
||||
using ErsatzTV.Core.Plex; |
||||
|
||||
namespace ErsatzTV.Core.Interfaces.Plex; |
||||
|
||||
public interface IPlexNetworkScanner |
||||
{ |
||||
Task<Either<BaseError, Unit>> ScanNetworks( |
||||
PlexLibrary library, |
||||
PlexConnection connection, |
||||
PlexServerAuthToken token, |
||||
CancellationToken cancellationToken); |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,29 @@
@@ -0,0 +1,29 @@
|
||||
using Microsoft.EntityFrameworkCore.Migrations; |
||||
|
||||
#nullable disable |
||||
|
||||
namespace ErsatzTV.Infrastructure.MySql.Migrations |
||||
{ |
||||
/// <inheritdoc />
|
||||
public partial class Add_TagExternalTypeId : Migration |
||||
{ |
||||
/// <inheritdoc />
|
||||
protected override void Up(MigrationBuilder migrationBuilder) |
||||
{ |
||||
migrationBuilder.AddColumn<string>( |
||||
name: "ExternalTypeId", |
||||
table: "Tag", |
||||
type: "longtext", |
||||
nullable: true) |
||||
.Annotation("MySql:CharSet", "utf8mb4"); |
||||
} |
||||
|
||||
/// <inheritdoc />
|
||||
protected override void Down(MigrationBuilder migrationBuilder) |
||||
{ |
||||
migrationBuilder.DropColumn( |
||||
name: "ExternalTypeId", |
||||
table: "Tag"); |
||||
} |
||||
} |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,29 @@
@@ -0,0 +1,29 @@
|
||||
using System; |
||||
using Microsoft.EntityFrameworkCore.Migrations; |
||||
|
||||
#nullable disable |
||||
|
||||
namespace ErsatzTV.Infrastructure.MySql.Migrations |
||||
{ |
||||
/// <inheritdoc />
|
||||
public partial class Add_PlexLibraryLastNetworksScan : Migration |
||||
{ |
||||
/// <inheritdoc />
|
||||
protected override void Up(MigrationBuilder migrationBuilder) |
||||
{ |
||||
migrationBuilder.AddColumn<DateTime>( |
||||
name: "LastNetworksScan", |
||||
table: "PlexLibrary", |
||||
type: "datetime(6)", |
||||
nullable: true); |
||||
} |
||||
|
||||
/// <inheritdoc />
|
||||
protected override void Down(MigrationBuilder migrationBuilder) |
||||
{ |
||||
migrationBuilder.DropColumn( |
||||
name: "LastNetworksScan", |
||||
table: "PlexLibrary"); |
||||
} |
||||
} |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,28 @@
@@ -0,0 +1,28 @@
|
||||
using Microsoft.EntityFrameworkCore.Migrations; |
||||
|
||||
#nullable disable |
||||
|
||||
namespace ErsatzTV.Infrastructure.Sqlite.Migrations |
||||
{ |
||||
/// <inheritdoc />
|
||||
public partial class Add_TagExternalTypeId : Migration |
||||
{ |
||||
/// <inheritdoc />
|
||||
protected override void Up(MigrationBuilder migrationBuilder) |
||||
{ |
||||
migrationBuilder.AddColumn<string>( |
||||
name: "ExternalTypeId", |
||||
table: "Tag", |
||||
type: "TEXT", |
||||
nullable: true); |
||||
} |
||||
|
||||
/// <inheritdoc />
|
||||
protected override void Down(MigrationBuilder migrationBuilder) |
||||
{ |
||||
migrationBuilder.DropColumn( |
||||
name: "ExternalTypeId", |
||||
table: "Tag"); |
||||
} |
||||
} |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,29 @@
@@ -0,0 +1,29 @@
|
||||
using System; |
||||
using Microsoft.EntityFrameworkCore.Migrations; |
||||
|
||||
#nullable disable |
||||
|
||||
namespace ErsatzTV.Infrastructure.Sqlite.Migrations |
||||
{ |
||||
/// <inheritdoc />
|
||||
public partial class Add_PlexLibraryLastNetworksScan : Migration |
||||
{ |
||||
/// <inheritdoc />
|
||||
protected override void Up(MigrationBuilder migrationBuilder) |
||||
{ |
||||
migrationBuilder.AddColumn<DateTime>( |
||||
name: "LastNetworksScan", |
||||
table: "PlexLibrary", |
||||
type: "TEXT", |
||||
nullable: true); |
||||
} |
||||
|
||||
/// <inheritdoc />
|
||||
protected override void Down(MigrationBuilder migrationBuilder) |
||||
{ |
||||
migrationBuilder.DropColumn( |
||||
name: "LastNetworksScan", |
||||
table: "PlexLibrary"); |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,18 @@
@@ -0,0 +1,18 @@
|
||||
using System.Xml.Serialization; |
||||
|
||||
namespace ErsatzTV.Infrastructure.Plex.Models; |
||||
|
||||
public class PlexTagMetadataResponse |
||||
{ |
||||
[XmlAttribute("id")] |
||||
public int Id { get; set; } |
||||
|
||||
[XmlAttribute("filter")] |
||||
public string Filter { get; set; } |
||||
|
||||
[XmlAttribute("tag")] |
||||
public string Tag { get; set; } |
||||
|
||||
[XmlAttribute("tagType")] |
||||
public int TagType { get; set; } |
||||
} |
@ -0,0 +1,9 @@
@@ -0,0 +1,9 @@
|
||||
using Refit; |
||||
|
||||
namespace ErsatzTV.Infrastructure.Plex; |
||||
|
||||
public class NetworkFilter |
||||
{ |
||||
[AliasAs("network")] |
||||
public string Network { get; set; } |
||||
} |
@ -0,0 +1,5 @@
@@ -0,0 +1,5 @@
|
||||
using ErsatzTV.Core; |
||||
|
||||
namespace ErsatzTV.Scanner.Application.Plex; |
||||
|
||||
public record SynchronizePlexNetworks(int PlexLibraryId, bool ForceScan) : IRequest<Either<BaseError, Unit>>; |
@ -0,0 +1,126 @@
@@ -0,0 +1,126 @@
|
||||
using ErsatzTV.Core; |
||||
using ErsatzTV.Core.Domain; |
||||
using ErsatzTV.Core.Interfaces.Plex; |
||||
using ErsatzTV.Core.Interfaces.Repositories; |
||||
using ErsatzTV.Core.Plex; |
||||
|
||||
namespace ErsatzTV.Scanner.Application.Plex; |
||||
|
||||
public class SynchronizePlexNetworksHandler : IRequestHandler<SynchronizePlexNetworks, Either<BaseError, Unit>> |
||||
{ |
||||
private readonly IConfigElementRepository _configElementRepository; |
||||
private readonly IPlexTelevisionRepository _plexTelevisionRepository; |
||||
private readonly IMediaSourceRepository _mediaSourceRepository; |
||||
private readonly IPlexSecretStore _plexSecretStore; |
||||
private readonly IPlexNetworkScanner _scanner; |
||||
|
||||
public SynchronizePlexNetworksHandler( |
||||
IMediaSourceRepository mediaSourceRepository, |
||||
IPlexSecretStore plexSecretStore, |
||||
IPlexNetworkScanner scanner, |
||||
IConfigElementRepository configElementRepository, |
||||
IPlexTelevisionRepository plexTelevisionRepository) |
||||
{ |
||||
_mediaSourceRepository = mediaSourceRepository; |
||||
_plexSecretStore = plexSecretStore; |
||||
_scanner = scanner; |
||||
_configElementRepository = configElementRepository; |
||||
_plexTelevisionRepository = plexTelevisionRepository; |
||||
} |
||||
|
||||
public async Task<Either<BaseError, Unit>> Handle( |
||||
SynchronizePlexNetworks request, |
||||
CancellationToken cancellationToken) |
||||
{ |
||||
Validation<BaseError, RequestParameters> validation = await Validate(request); |
||||
return await validation.Match( |
||||
p => SynchronizeNetworks(p, cancellationToken), |
||||
error => Task.FromResult<Either<BaseError, Unit>>(error.Join())); |
||||
} |
||||
|
||||
private async Task<Validation<BaseError, RequestParameters>> Validate(SynchronizePlexNetworks request) |
||||
{ |
||||
Task<Validation<BaseError, ConnectionParameters>> mediaSource = MediaSourceMustExist(request) |
||||
.BindT(MediaSourceMustHaveActiveConnection) |
||||
.BindT(MediaSourceMustHaveToken); |
||||
|
||||
return (await mediaSource, await PlexLibraryMustExist(request), await ValidateLibraryRefreshInterval()) |
||||
.Apply( |
||||
(connectionParameters, plexLibrary, libraryRefreshInterval) => new RequestParameters( |
||||
connectionParameters, |
||||
plexLibrary, |
||||
request.ForceScan, |
||||
libraryRefreshInterval)); |
||||
} |
||||
|
||||
private Task<Validation<BaseError, PlexLibrary>> PlexLibraryMustExist( |
||||
SynchronizePlexNetworks request) => |
||||
_mediaSourceRepository.GetPlexLibrary(request.PlexLibraryId) |
||||
.Map(v => v.ToValidation<BaseError>($"Plex library {request.PlexLibraryId} does not exist.")); |
||||
|
||||
private Task<Validation<BaseError, int>> ValidateLibraryRefreshInterval() => |
||||
_configElementRepository.GetValue<int>(ConfigElementKey.LibraryRefreshInterval) |
||||
.FilterT(lri => lri is >= 0 and < 1_000_000) |
||||
.Map(lri => lri.ToValidation<BaseError>("Library refresh interval is invalid")); |
||||
|
||||
private Task<Validation<BaseError, PlexMediaSource>> MediaSourceMustExist( |
||||
SynchronizePlexNetworks request) => |
||||
_mediaSourceRepository.GetPlexByLibraryId(request.PlexLibraryId) |
||||
.Map(o => o.ToValidation<BaseError>($"Plex media source for library {request.PlexLibraryId} does not exist.")); |
||||
|
||||
private static Validation<BaseError, ConnectionParameters> MediaSourceMustHaveActiveConnection( |
||||
PlexMediaSource plexMediaSource) |
||||
{ |
||||
Option<PlexConnection> maybeConnection = plexMediaSource.Connections.SingleOrDefault(c => c.IsActive); |
||||
return maybeConnection.Map(connection => new ConnectionParameters(plexMediaSource, connection)) |
||||
.ToValidation<BaseError>("Plex media source requires an active connection"); |
||||
} |
||||
|
||||
private async Task<Validation<BaseError, ConnectionParameters>> MediaSourceMustHaveToken( |
||||
ConnectionParameters connectionParameters) |
||||
{ |
||||
Option<PlexServerAuthToken> maybeToken = await |
||||
_plexSecretStore.GetServerAuthToken(connectionParameters.PlexMediaSource.ClientIdentifier); |
||||
return maybeToken.Map(token => connectionParameters with { PlexServerAuthToken = token }) |
||||
.ToValidation<BaseError>("Plex media source requires a token"); |
||||
} |
||||
|
||||
private async Task<Either<BaseError, Unit>> SynchronizeNetworks( |
||||
RequestParameters parameters, |
||||
CancellationToken cancellationToken) |
||||
{ |
||||
var lastScan = new DateTimeOffset( |
||||
parameters.Library.LastNetworksScan ?? SystemTime.MinValueUtc, |
||||
TimeSpan.Zero); |
||||
DateTimeOffset nextScan = lastScan + TimeSpan.FromHours(parameters.LibraryRefreshInterval); |
||||
if (parameters.ForceScan || parameters.LibraryRefreshInterval > 0 && nextScan < DateTimeOffset.Now) |
||||
{ |
||||
Either<BaseError, Unit> result = await _scanner.ScanNetworks( |
||||
parameters.Library, |
||||
parameters.ConnectionParameters.ActiveConnection, |
||||
parameters.ConnectionParameters.PlexServerAuthToken, |
||||
cancellationToken); |
||||
|
||||
if (result.IsRight) |
||||
{ |
||||
parameters.Library.LastNetworksScan = DateTime.UtcNow; |
||||
await _plexTelevisionRepository.UpdateLastNetworksScan(parameters.Library); |
||||
} |
||||
|
||||
return result; |
||||
} |
||||
|
||||
return Unit.Default; |
||||
} |
||||
|
||||
private record RequestParameters( |
||||
ConnectionParameters ConnectionParameters, |
||||
PlexLibrary Library, |
||||
bool ForceScan, |
||||
int LibraryRefreshInterval); |
||||
|
||||
private record ConnectionParameters(PlexMediaSource PlexMediaSource, PlexConnection ActiveConnection) |
||||
{ |
||||
public PlexServerAuthToken? PlexServerAuthToken { get; set; } |
||||
} |
||||
} |
@ -0,0 +1,99 @@
@@ -0,0 +1,99 @@
|
||||
using ErsatzTV.Core; |
||||
using ErsatzTV.Core.Domain; |
||||
using ErsatzTV.Core.Interfaces.Plex; |
||||
using ErsatzTV.Core.Interfaces.Repositories; |
||||
using ErsatzTV.Core.MediaSources; |
||||
using ErsatzTV.Core.Plex; |
||||
using Microsoft.Extensions.Logging; |
||||
|
||||
namespace ErsatzTV.Scanner.Core.Plex; |
||||
|
||||
public class PlexNetworkScanner( |
||||
IPlexServerApiClient plexServerApiClient, |
||||
IPlexTelevisionRepository plexTelevisionRepository, |
||||
ITelevisionRepository televisionRepository, |
||||
IMediator mediator, |
||||
ILogger<PlexNetworkScanner> logger) : IPlexNetworkScanner |
||||
{ |
||||
public async Task<Either<BaseError, Unit>> ScanNetworks( |
||||
PlexLibrary library, |
||||
PlexConnection connection, |
||||
PlexServerAuthToken token, |
||||
CancellationToken cancellationToken) |
||||
{ |
||||
// logger.LogDebug("Scanning Plex networks...");
|
||||
|
||||
await foreach ((PlexTag tag, int _) in plexServerApiClient.GetAllTags( |
||||
connection, |
||||
token, |
||||
319, |
||||
cancellationToken)) |
||||
{ |
||||
// logger.LogDebug("Found Plex network {Tag}", tag.Tag);
|
||||
|
||||
await SyncNetworkItems(library, connection, token, tag, cancellationToken); |
||||
} |
||||
|
||||
return Either<BaseError, Unit>.Right(Unit.Default); |
||||
} |
||||
|
||||
private async Task SyncNetworkItems( |
||||
PlexLibrary library, |
||||
PlexConnection connection, |
||||
PlexServerAuthToken token, |
||||
PlexTag tag, |
||||
CancellationToken cancellationToken) |
||||
{ |
||||
try |
||||
{ |
||||
// get network items from Plex
|
||||
IAsyncEnumerable<Tuple<PlexShow, int>> items = plexServerApiClient.GetTagShowContents( |
||||
library, |
||||
connection, |
||||
token, |
||||
tag); |
||||
|
||||
// sync tags (networks) on items
|
||||
var addedIds = new System.Collections.Generic.HashSet<int>(); |
||||
var keepIds = new System.Collections.Generic.HashSet<int>(); |
||||
await foreach ((PlexShow item, int _) in items) |
||||
{ |
||||
PlexShowAddTagResult result = await plexTelevisionRepository.AddTag(item, tag); |
||||
|
||||
foreach (int existing in result.Existing) |
||||
{ |
||||
keepIds.Add(existing); |
||||
} |
||||
|
||||
foreach (int added in result.Added) |
||||
{ |
||||
addedIds.Add(added); |
||||
keepIds.Add(added); |
||||
} |
||||
|
||||
cancellationToken.ThrowIfCancellationRequested(); |
||||
} |
||||
|
||||
List<int> removedIds = await plexTelevisionRepository.RemoveAllTags(library, tag, keepIds); |
||||
var changedIds = removedIds.Concat(addedIds).Distinct().ToList(); |
||||
|
||||
if (changedIds.Count > 0) |
||||
{ |
||||
logger.LogDebug("Plex network {Name} contains {Count} changed items", tag.Tag, changedIds.Count); |
||||
} |
||||
|
||||
foreach (int showId in changedIds.ToArray()) |
||||
{ |
||||
changedIds.AddRange(await televisionRepository.GetEpisodeIdsForShow(showId)); |
||||
} |
||||
|
||||
await mediator.Publish( |
||||
new ScannerProgressUpdate(0, null, null, changedIds.ToArray(), []), |
||||
CancellationToken.None); |
||||
} |
||||
catch (Exception ex) |
||||
{ |
||||
logger.LogWarning(ex, "Failed to synchronize Plex network {Name}", tag.Tag); |
||||
} |
||||
} |
||||
} |
Loading…
Reference in new issue