mirror of https://github.com/ErsatzTV/ErsatzTV.git
Browse Source
* start to rework plex collection scanning * sync plex collections to db * sync plex collection items * update changelogpull/1504/head
40 changed files with 9986 additions and 103 deletions
@ -0,0 +1,83 @@ |
|||||||
|
using System.Globalization; |
||||||
|
using System.Threading.Channels; |
||||||
|
using ErsatzTV.Application.Libraries; |
||||||
|
using ErsatzTV.Core; |
||||||
|
using ErsatzTV.Core.Errors; |
||||||
|
using ErsatzTV.Core.Interfaces.Repositories; |
||||||
|
using ErsatzTV.FFmpeg.Runtime; |
||||||
|
using ErsatzTV.Infrastructure.Data; |
||||||
|
using ErsatzTV.Infrastructure.Extensions; |
||||||
|
using Microsoft.EntityFrameworkCore; |
||||||
|
|
||||||
|
namespace ErsatzTV.Application.Plex; |
||||||
|
|
||||||
|
public class CallPlexCollectionScannerHandler : CallLibraryScannerHandler<SynchronizePlexCollections>, |
||||||
|
IRequestHandler<SynchronizePlexCollections, Either<BaseError, Unit>> |
||||||
|
{ |
||||||
|
public CallPlexCollectionScannerHandler( |
||||||
|
IDbContextFactory<TvContext> dbContextFactory, |
||||||
|
IConfigElementRepository configElementRepository, |
||||||
|
ChannelWriter<ISearchIndexBackgroundServiceRequest> channel, |
||||||
|
IMediator mediator, |
||||||
|
IRuntimeInfo runtimeInfo) : base(dbContextFactory, configElementRepository, channel, mediator, runtimeInfo) |
||||||
|
{ |
||||||
|
} |
||||||
|
|
||||||
|
public async Task<Either<BaseError, Unit>> |
||||||
|
Handle(SynchronizePlexCollections request, CancellationToken cancellationToken) |
||||||
|
{ |
||||||
|
Validation<BaseError, string> validation = await Validate(request); |
||||||
|
return await validation.Match( |
||||||
|
scanner => PerformScan(scanner, request, cancellationToken), |
||||||
|
error => |
||||||
|
{ |
||||||
|
foreach (ScanIsNotRequired scanIsNotRequired in error.OfType<ScanIsNotRequired>()) |
||||||
|
{ |
||||||
|
return Task.FromResult<Either<BaseError, Unit>>(scanIsNotRequired); |
||||||
|
} |
||||||
|
|
||||||
|
return Task.FromResult<Either<BaseError, Unit>>(error.Join()); |
||||||
|
}); |
||||||
|
} |
||||||
|
|
||||||
|
protected override async Task<DateTimeOffset> GetLastScan(TvContext dbContext, SynchronizePlexCollections request) |
||||||
|
{ |
||||||
|
DateTime minDateTime = await dbContext.PlexMediaSources |
||||||
|
.SelectOneAsync(l => l.Id, l => l.Id == request.PlexMediaSourceId) |
||||||
|
.Match(l => l.LastCollectionsScan ?? SystemTime.MinValueUtc, () => SystemTime.MaxValueUtc); |
||||||
|
|
||||||
|
return new DateTimeOffset(minDateTime, TimeSpan.Zero); |
||||||
|
} |
||||||
|
|
||||||
|
protected override bool ScanIsRequired( |
||||||
|
DateTimeOffset lastScan, |
||||||
|
int libraryRefreshInterval, |
||||||
|
SynchronizePlexCollections request) |
||||||
|
{ |
||||||
|
if (lastScan == SystemTime.MaxValueUtc) |
||||||
|
{ |
||||||
|
return false; |
||||||
|
} |
||||||
|
|
||||||
|
DateTimeOffset nextScan = lastScan + TimeSpan.FromHours(libraryRefreshInterval); |
||||||
|
return request.ForceScan || libraryRefreshInterval > 0 && nextScan < DateTimeOffset.Now; |
||||||
|
} |
||||||
|
|
||||||
|
private async Task<Either<BaseError, Unit>> PerformScan( |
||||||
|
string scanner, |
||||||
|
SynchronizePlexCollections request, |
||||||
|
CancellationToken cancellationToken) |
||||||
|
{ |
||||||
|
var arguments = new List<string> |
||||||
|
{ |
||||||
|
"scan-plex-collections", request.PlexMediaSourceId.ToString(CultureInfo.InvariantCulture) |
||||||
|
}; |
||||||
|
|
||||||
|
if (request.ForceScan) |
||||||
|
{ |
||||||
|
arguments.Add("--force"); |
||||||
|
} |
||||||
|
|
||||||
|
return await base.PerformScan(scanner, arguments, cancellationToken).MapT(_ => Unit.Default); |
||||||
|
} |
||||||
|
} |
||||||
@ -0,0 +1,6 @@ |
|||||||
|
using ErsatzTV.Core; |
||||||
|
|
||||||
|
namespace ErsatzTV.Application.Plex; |
||||||
|
|
||||||
|
public record SynchronizePlexCollections(int PlexMediaSourceId, bool ForceScan) : IRequest<Either<BaseError, Unit>>, |
||||||
|
IScannerBackgroundServiceRequest; |
||||||
@ -0,0 +1,12 @@ |
|||||||
|
using System.Diagnostics.CodeAnalysis; |
||||||
|
|
||||||
|
namespace ErsatzTV.Core.Domain; |
||||||
|
|
||||||
|
[SuppressMessage("Naming", "CA1711:Identifiers should not have incorrect suffix")] |
||||||
|
public class PlexCollection |
||||||
|
{ |
||||||
|
public int Id { get; set; } |
||||||
|
public string Key { get; set; } |
||||||
|
public string Etag { get; set; } |
||||||
|
public string Name { get; set; } |
||||||
|
} |
||||||
@ -0,0 +1,12 @@ |
|||||||
|
using ErsatzTV.Core.Domain; |
||||||
|
using ErsatzTV.Core.Plex; |
||||||
|
|
||||||
|
namespace ErsatzTV.Core.Interfaces.Plex; |
||||||
|
|
||||||
|
public interface IPlexCollectionScanner |
||||||
|
{ |
||||||
|
Task<Either<BaseError, Unit>> ScanCollections( |
||||||
|
PlexConnection connection, |
||||||
|
PlexServerAuthToken token, |
||||||
|
CancellationToken cancellationToken); |
||||||
|
} |
||||||
@ -0,0 +1,13 @@ |
|||||||
|
using ErsatzTV.Core.Domain; |
||||||
|
|
||||||
|
namespace ErsatzTV.Core.Interfaces.Repositories; |
||||||
|
|
||||||
|
public interface IPlexCollectionRepository |
||||||
|
{ |
||||||
|
Task<List<PlexCollection>> GetCollections(); |
||||||
|
Task<bool> AddCollection(PlexCollection collection); |
||||||
|
Task<bool> RemoveCollection(PlexCollection collection); |
||||||
|
Task<List<int>> RemoveAllTags(PlexCollection collection); |
||||||
|
Task<int> AddTag(MediaItem item, PlexCollection collection); |
||||||
|
Task<bool> SetEtag(PlexCollection collection); |
||||||
|
} |
||||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,52 @@ |
|||||||
|
using System; |
||||||
|
using Microsoft.EntityFrameworkCore.Metadata; |
||||||
|
using Microsoft.EntityFrameworkCore.Migrations; |
||||||
|
|
||||||
|
#nullable disable |
||||||
|
|
||||||
|
namespace ErsatzTV.Infrastructure.MySql.Migrations |
||||||
|
{ |
||||||
|
/// <inheritdoc />
|
||||||
|
public partial class Add_PlexCollection : Migration |
||||||
|
{ |
||||||
|
/// <inheritdoc />
|
||||||
|
protected override void Up(MigrationBuilder migrationBuilder) |
||||||
|
{ |
||||||
|
migrationBuilder.AddColumn<DateTime>( |
||||||
|
name: "LastCollectionsScan", |
||||||
|
table: "PlexMediaSource", |
||||||
|
type: "datetime(6)", |
||||||
|
nullable: true); |
||||||
|
|
||||||
|
migrationBuilder.CreateTable( |
||||||
|
name: "PlexCollection", |
||||||
|
columns: table => new |
||||||
|
{ |
||||||
|
Id = table.Column<int>(type: "int", nullable: false) |
||||||
|
.Annotation("MySql:ValueGenerationStrategy", MySqlValueGenerationStrategy.IdentityColumn), |
||||||
|
Key = table.Column<string>(type: "longtext", nullable: true) |
||||||
|
.Annotation("MySql:CharSet", "utf8mb4"), |
||||||
|
Etag = table.Column<string>(type: "longtext", nullable: true) |
||||||
|
.Annotation("MySql:CharSet", "utf8mb4"), |
||||||
|
Name = table.Column<string>(type: "longtext", nullable: true) |
||||||
|
.Annotation("MySql:CharSet", "utf8mb4") |
||||||
|
}, |
||||||
|
constraints: table => |
||||||
|
{ |
||||||
|
table.PrimaryKey("PK_PlexCollection", x => x.Id); |
||||||
|
}) |
||||||
|
.Annotation("MySql:CharSet", "utf8mb4"); |
||||||
|
} |
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
protected override void Down(MigrationBuilder migrationBuilder) |
||||||
|
{ |
||||||
|
migrationBuilder.DropTable( |
||||||
|
name: "PlexCollection"); |
||||||
|
|
||||||
|
migrationBuilder.DropColumn( |
||||||
|
name: "LastCollectionsScan", |
||||||
|
table: "PlexMediaSource"); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,47 @@ |
|||||||
|
using System; |
||||||
|
using Microsoft.EntityFrameworkCore.Migrations; |
||||||
|
|
||||||
|
#nullable disable |
||||||
|
|
||||||
|
namespace ErsatzTV.Infrastructure.Sqlite.Migrations |
||||||
|
{ |
||||||
|
/// <inheritdoc />
|
||||||
|
public partial class Add_PlexCollection : Migration |
||||||
|
{ |
||||||
|
/// <inheritdoc />
|
||||||
|
protected override void Up(MigrationBuilder migrationBuilder) |
||||||
|
{ |
||||||
|
migrationBuilder.AddColumn<DateTime>( |
||||||
|
name: "LastCollectionsScan", |
||||||
|
table: "PlexMediaSource", |
||||||
|
type: "TEXT", |
||||||
|
nullable: true); |
||||||
|
|
||||||
|
migrationBuilder.CreateTable( |
||||||
|
name: "PlexCollection", |
||||||
|
columns: table => new |
||||||
|
{ |
||||||
|
Id = table.Column<int>(type: "INTEGER", nullable: false) |
||||||
|
.Annotation("Sqlite:Autoincrement", true), |
||||||
|
Key = table.Column<string>(type: "TEXT", nullable: true), |
||||||
|
Etag = table.Column<string>(type: "TEXT", nullable: true), |
||||||
|
Name = table.Column<string>(type: "TEXT", nullable: true) |
||||||
|
}, |
||||||
|
constraints: table => |
||||||
|
{ |
||||||
|
table.PrimaryKey("PK_PlexCollection", x => x.Id); |
||||||
|
}); |
||||||
|
} |
||||||
|
|
||||||
|
/// <inheritdoc />
|
||||||
|
protected override void Down(MigrationBuilder migrationBuilder) |
||||||
|
{ |
||||||
|
migrationBuilder.DropTable( |
||||||
|
name: "PlexCollection"); |
||||||
|
|
||||||
|
migrationBuilder.DropColumn( |
||||||
|
name: "LastCollectionsScan", |
||||||
|
table: "PlexMediaSource"); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
@ -0,0 +1,10 @@ |
|||||||
|
using ErsatzTV.Core.Domain; |
||||||
|
using Microsoft.EntityFrameworkCore; |
||||||
|
using Microsoft.EntityFrameworkCore.Metadata.Builders; |
||||||
|
|
||||||
|
namespace ErsatzTV.Infrastructure.Data.Configurations; |
||||||
|
|
||||||
|
public class PlexCollectionConfiguration : IEntityTypeConfiguration<PlexCollection> |
||||||
|
{ |
||||||
|
public void Configure(EntityTypeBuilder<PlexCollection> builder) => builder.ToTable("PlexCollection"); |
||||||
|
} |
||||||
@ -0,0 +1,148 @@ |
|||||||
|
using Dapper; |
||||||
|
using ErsatzTV.Core.Domain; |
||||||
|
using ErsatzTV.Core.Interfaces.Repositories; |
||||||
|
using Microsoft.EntityFrameworkCore; |
||||||
|
|
||||||
|
namespace ErsatzTV.Infrastructure.Data.Repositories; |
||||||
|
|
||||||
|
public class PlexCollectionRepository : IPlexCollectionRepository |
||||||
|
{ |
||||||
|
private readonly IDbContextFactory<TvContext> _dbContextFactory; |
||||||
|
|
||||||
|
public PlexCollectionRepository(IDbContextFactory<TvContext> dbContextFactory) => |
||||||
|
_dbContextFactory = dbContextFactory; |
||||||
|
|
||||||
|
public async Task<List<PlexCollection>> GetCollections() |
||||||
|
{ |
||||||
|
await using TvContext dbContext = await _dbContextFactory.CreateDbContextAsync(); |
||||||
|
return await dbContext.PlexCollections.ToListAsync(); |
||||||
|
} |
||||||
|
|
||||||
|
public async Task<bool> AddCollection(PlexCollection collection) |
||||||
|
{ |
||||||
|
await using TvContext dbContext = await _dbContextFactory.CreateDbContextAsync(); |
||||||
|
await dbContext.AddAsync(collection); |
||||||
|
return await dbContext.SaveChangesAsync() > 0; |
||||||
|
} |
||||||
|
|
||||||
|
public async Task<bool> RemoveCollection(PlexCollection collection) |
||||||
|
{ |
||||||
|
await using TvContext dbContext = await _dbContextFactory.CreateDbContextAsync(); |
||||||
|
dbContext.Remove(collection); |
||||||
|
|
||||||
|
// remove all tags that reference this collection
|
||||||
|
await dbContext.Connection.ExecuteAsync( |
||||||
|
@"DELETE FROM Tag WHERE Name = @Name AND ExternalCollectionId = @Key", |
||||||
|
new { collection.Name, collection.Key }); |
||||||
|
|
||||||
|
return await dbContext.SaveChangesAsync() > 0; |
||||||
|
} |
||||||
|
|
||||||
|
public async Task<List<int>> RemoveAllTags(PlexCollection collection) |
||||||
|
{ |
||||||
|
var result = new List<int>(); |
||||||
|
|
||||||
|
await using TvContext dbContext = await _dbContextFactory.CreateDbContextAsync(); |
||||||
|
|
||||||
|
// movies
|
||||||
|
result.AddRange( |
||||||
|
await dbContext.Connection.QueryAsync<int>( |
||||||
|
@"SELECT JM.Id FROM Tag T
|
||||||
|
INNER JOIN MovieMetadata MM on T.MovieMetadataId = MM.Id |
||||||
|
INNER JOIN PlexMovie JM on JM.Id = MM.MovieId |
||||||
|
WHERE T.ExternalCollectionId = @Key",
|
||||||
|
new { collection.Key })); |
||||||
|
|
||||||
|
// shows
|
||||||
|
result.AddRange( |
||||||
|
await dbContext.Connection.QueryAsync<int>( |
||||||
|
@"SELECT JS.Id FROM Tag T
|
||||||
|
INNER JOIN ShowMetadata SM on T.ShowMetadataId = SM.Id |
||||||
|
INNER JOIN PlexShow JS on JS.Id = SM.ShowId |
||||||
|
WHERE T.ExternalCollectionId = @Key",
|
||||||
|
new { collection.Key })); |
||||||
|
|
||||||
|
// seasons
|
||||||
|
result.AddRange( |
||||||
|
await dbContext.Connection.QueryAsync<int>( |
||||||
|
@"SELECT JS.Id FROM Tag T
|
||||||
|
INNER JOIN SeasonMetadata SM on T.SeasonMetadataId = SM.Id |
||||||
|
INNER JOIN PlexSeason JS on JS.Id = SM.SeasonId |
||||||
|
WHERE T.ExternalCollectionId = @Key",
|
||||||
|
new { collection.Key })); |
||||||
|
|
||||||
|
// episodes
|
||||||
|
result.AddRange( |
||||||
|
await dbContext.Connection.QueryAsync<int>( |
||||||
|
@"SELECT JE.Id FROM Tag T
|
||||||
|
INNER JOIN EpisodeMetadata EM on T.EpisodeMetadataId = EM.Id |
||||||
|
INNER JOIN PlexEpisode JE on JE.Id = EM.EpisodeId |
||||||
|
WHERE T.ExternalCollectionId = @Key",
|
||||||
|
new { collection.Key })); |
||||||
|
|
||||||
|
// delete all tags
|
||||||
|
await dbContext.Connection.ExecuteAsync( |
||||||
|
@"DELETE FROM Tag WHERE Name = @Name AND ExternalCollectionId = @Key", |
||||||
|
new { collection.Name, collection.Key }); |
||||||
|
|
||||||
|
return result; |
||||||
|
} |
||||||
|
|
||||||
|
public async Task<int> AddTag(MediaItem item, PlexCollection collection) |
||||||
|
{ |
||||||
|
await using TvContext dbContext = await _dbContextFactory.CreateDbContextAsync(); |
||||||
|
switch (item) |
||||||
|
{ |
||||||
|
case PlexMovie movie: |
||||||
|
int movieId = await dbContext.Connection.ExecuteScalarAsync<int>( |
||||||
|
@"SELECT Id FROM PlexMovie WHERE `Key` = @Key", |
||||||
|
new { movie.Key }); |
||||||
|
await dbContext.Connection.ExecuteAsync( |
||||||
|
@"INSERT INTO Tag (Name, ExternalCollectionId, MovieMetadataId)
|
||||||
|
SELECT @Name, @Key, Id FROM |
||||||
|
(SELECT Id FROM MovieMetadata WHERE MovieId = @MovieId) AS A",
|
||||||
|
new { collection.Name, collection.Key, MovieId = movieId }); |
||||||
|
return movieId; |
||||||
|
case PlexShow show: |
||||||
|
int showId = await dbContext.Connection.ExecuteScalarAsync<int>( |
||||||
|
@"SELECT Id FROM PlexShow WHERE `Key` = @Key", |
||||||
|
new { show.Key }); |
||||||
|
await dbContext.Connection.ExecuteAsync( |
||||||
|
@"INSERT INTO Tag (Name, ExternalCollectionId, ShowMetadataId)
|
||||||
|
SELECT @Name, @Key, Id FROM |
||||||
|
(SELECT Id FROM ShowMetadata WHERE ShowId = @ShowId) AS A",
|
||||||
|
new { collection.Name, collection.Key, ShowId = showId }); |
||||||
|
return showId; |
||||||
|
case PlexSeason season: |
||||||
|
int seasonId = await dbContext.Connection.ExecuteScalarAsync<int>( |
||||||
|
@"SELECT Id FROM PlexSeason WHERE `Key` = @Key", |
||||||
|
new { season.Key }); |
||||||
|
await dbContext.Connection.ExecuteAsync( |
||||||
|
@"INSERT INTO Tag (Name, ExternalCollectionId, SeasonMetadataId)
|
||||||
|
SELECT @Name, @Key, Id FROM |
||||||
|
(SELECT Id FROM SeasonMetadata WHERE SeasonId = @SeasonId) AS A",
|
||||||
|
new { collection.Name, collection.Key, SeasonId = seasonId }); |
||||||
|
return seasonId; |
||||||
|
case PlexEpisode episode: |
||||||
|
int episodeId = await dbContext.Connection.ExecuteScalarAsync<int>( |
||||||
|
@"SELECT Id FROM PlexEpisode WHERE `Key` = @Key", |
||||||
|
new { episode.Key }); |
||||||
|
await dbContext.Connection.ExecuteAsync( |
||||||
|
@"INSERT INTO Tag (Name, ExternalCollectionId, EpisodeMetadataId)
|
||||||
|
SELECT @Name, @Key, Id FROM |
||||||
|
(SELECT Id FROM EpisodeMetadata WHERE EpisodeId = @EpisodeId) AS A",
|
||||||
|
new { collection.Name, collection.Key, EpisodeId = episodeId }); |
||||||
|
return episodeId; |
||||||
|
default: |
||||||
|
return 0; |
||||||
|
} |
||||||
|
} |
||||||
|
|
||||||
|
public async Task<bool> SetEtag(PlexCollection collection) |
||||||
|
{ |
||||||
|
await using TvContext dbContext = await _dbContextFactory.CreateDbContextAsync(); |
||||||
|
return await dbContext.Connection.ExecuteAsync( |
||||||
|
@"UPDATE PlexCollection SET Etag = @Etag WHERE `Key` = @Key", |
||||||
|
new { collection.Etag, collection.Key }) > 0; |
||||||
|
} |
||||||
|
} |
||||||
@ -0,0 +1,24 @@ |
|||||||
|
using System.Xml.Serialization; |
||||||
|
|
||||||
|
namespace ErsatzTV.Infrastructure.Plex.Models; |
||||||
|
|
||||||
|
public class PlexCollectionItemMetadataResponse |
||||||
|
{ |
||||||
|
[XmlAttribute("key")] |
||||||
|
public string Key { get; set; } |
||||||
|
|
||||||
|
[XmlAttribute("ratingKey")] |
||||||
|
public string RatingKey { get; set; } |
||||||
|
|
||||||
|
[XmlAttribute("title")] |
||||||
|
public string Title { get; set; } |
||||||
|
|
||||||
|
[XmlAttribute("addedAt")] |
||||||
|
public long AddedAt { get; set; } |
||||||
|
|
||||||
|
[XmlAttribute("updatedAt")] |
||||||
|
public long UpdatedAt { get; set; } |
||||||
|
|
||||||
|
[XmlAttribute("type")] |
||||||
|
public string Type { get; set; } |
||||||
|
} |
||||||
@ -0,0 +1,5 @@ |
|||||||
|
using ErsatzTV.Core; |
||||||
|
|
||||||
|
namespace ErsatzTV.Scanner.Application.Plex; |
||||||
|
|
||||||
|
public record SynchronizePlexCollections(int PlexMediaSourceId, bool ForceScan) : IRequest<Either<BaseError, Unit>>; |
||||||
@ -0,0 +1,117 @@ |
|||||||
|
using ErsatzTV.Core; |
||||||
|
using ErsatzTV.Core.Domain; |
||||||
|
using ErsatzTV.Core.Interfaces.Plex; |
||||||
|
using ErsatzTV.Core.Interfaces.Repositories; |
||||||
|
using ErsatzTV.Core.Plex; |
||||||
|
|
||||||
|
namespace ErsatzTV.Scanner.Application.Plex; |
||||||
|
|
||||||
|
public class SynchronizePlexCollectionsHandler : IRequestHandler<SynchronizePlexCollections, Either<BaseError, Unit>> |
||||||
|
{ |
||||||
|
private readonly IConfigElementRepository _configElementRepository; |
||||||
|
private readonly IPlexSecretStore _plexSecretStore; |
||||||
|
private readonly IMediaSourceRepository _mediaSourceRepository; |
||||||
|
private readonly IPlexCollectionScanner _scanner; |
||||||
|
|
||||||
|
public SynchronizePlexCollectionsHandler( |
||||||
|
IMediaSourceRepository mediaSourceRepository, |
||||||
|
IPlexSecretStore plexSecretStore, |
||||||
|
IPlexCollectionScanner scanner, |
||||||
|
IConfigElementRepository configElementRepository) |
||||||
|
{ |
||||||
|
_mediaSourceRepository = mediaSourceRepository; |
||||||
|
_plexSecretStore = plexSecretStore; |
||||||
|
_scanner = scanner; |
||||||
|
_configElementRepository = configElementRepository; |
||||||
|
} |
||||||
|
|
||||||
|
public async Task<Either<BaseError, Unit>> Handle( |
||||||
|
SynchronizePlexCollections request, |
||||||
|
CancellationToken cancellationToken) |
||||||
|
{ |
||||||
|
Validation<BaseError, RequestParameters> validation = await Validate(request); |
||||||
|
return await validation.Match( |
||||||
|
p => SynchronizeCollections(p, cancellationToken), |
||||||
|
error => Task.FromResult<Either<BaseError, Unit>>(error.Join())); |
||||||
|
} |
||||||
|
|
||||||
|
private async Task<Validation<BaseError, RequestParameters>> Validate(SynchronizePlexCollections request) |
||||||
|
{ |
||||||
|
Task<Validation<BaseError, ConnectionParameters>> mediaSource = MediaSourceMustExist(request) |
||||||
|
.BindT(MediaSourceMustHaveActiveConnection) |
||||||
|
.BindT(MediaSourceMustHaveToken); |
||||||
|
|
||||||
|
return (await mediaSource, await ValidateLibraryRefreshInterval()) |
||||||
|
.Apply( |
||||||
|
(connectionParameters, libraryRefreshInterval) => new RequestParameters( |
||||||
|
connectionParameters, |
||||||
|
connectionParameters.PlexMediaSource, |
||||||
|
request.ForceScan, |
||||||
|
libraryRefreshInterval)); |
||||||
|
} |
||||||
|
|
||||||
|
private Task<Validation<BaseError, int>> ValidateLibraryRefreshInterval() => |
||||||
|
_configElementRepository.GetValue<int>(ConfigElementKey.LibraryRefreshInterval) |
||||||
|
.FilterT(lri => lri is >= 0 and < 1_000_000) |
||||||
|
.Map(lri => lri.ToValidation<BaseError>("Library refresh interval is invalid")); |
||||||
|
|
||||||
|
private Task<Validation<BaseError, PlexMediaSource>> MediaSourceMustExist( |
||||||
|
SynchronizePlexCollections request) => |
||||||
|
_mediaSourceRepository.GetPlex(request.PlexMediaSourceId) |
||||||
|
.Map(o => o.ToValidation<BaseError>("Plex media source does not exist.")); |
||||||
|
|
||||||
|
private static Validation<BaseError, ConnectionParameters> MediaSourceMustHaveActiveConnection( |
||||||
|
PlexMediaSource plexMediaSource) |
||||||
|
{ |
||||||
|
Option<PlexConnection> maybeConnection = plexMediaSource.Connections.SingleOrDefault(c => c.IsActive); |
||||||
|
return maybeConnection.Map(connection => new ConnectionParameters(plexMediaSource, connection)) |
||||||
|
.ToValidation<BaseError>("Plex media source requires an active connection"); |
||||||
|
} |
||||||
|
|
||||||
|
private async Task<Validation<BaseError, ConnectionParameters>> MediaSourceMustHaveToken( |
||||||
|
ConnectionParameters connectionParameters) |
||||||
|
{ |
||||||
|
Option<PlexServerAuthToken> maybeToken = await |
||||||
|
_plexSecretStore.GetServerAuthToken(connectionParameters.PlexMediaSource.ClientIdentifier); |
||||||
|
return maybeToken.Map(token => connectionParameters with { PlexServerAuthToken = token }) |
||||||
|
.ToValidation<BaseError>("Plex media source requires a token"); |
||||||
|
} |
||||||
|
|
||||||
|
private async Task<Either<BaseError, Unit>> SynchronizeCollections( |
||||||
|
RequestParameters parameters, |
||||||
|
CancellationToken cancellationToken) |
||||||
|
{ |
||||||
|
var lastScan = new DateTimeOffset( |
||||||
|
parameters.MediaSource.LastCollectionsScan ?? SystemTime.MinValueUtc, |
||||||
|
TimeSpan.Zero); |
||||||
|
DateTimeOffset nextScan = lastScan + TimeSpan.FromHours(parameters.LibraryRefreshInterval); |
||||||
|
if (parameters.ForceScan || parameters.LibraryRefreshInterval > 0 && nextScan < DateTimeOffset.Now) |
||||||
|
{ |
||||||
|
Either<BaseError, Unit> result = await _scanner.ScanCollections( |
||||||
|
parameters.ConnectionParameters.ActiveConnection, |
||||||
|
parameters.ConnectionParameters.PlexServerAuthToken, |
||||||
|
cancellationToken); |
||||||
|
|
||||||
|
if (result.IsRight) |
||||||
|
{ |
||||||
|
parameters.MediaSource.LastCollectionsScan = DateTime.UtcNow; |
||||||
|
await _mediaSourceRepository.UpdateLastCollectionScan(parameters.MediaSource); |
||||||
|
} |
||||||
|
|
||||||
|
return result; |
||||||
|
} |
||||||
|
|
||||||
|
return Unit.Default; |
||||||
|
} |
||||||
|
|
||||||
|
private record RequestParameters( |
||||||
|
ConnectionParameters ConnectionParameters, |
||||||
|
PlexMediaSource MediaSource, |
||||||
|
bool ForceScan, |
||||||
|
int LibraryRefreshInterval); |
||||||
|
|
||||||
|
private record ConnectionParameters(PlexMediaSource PlexMediaSource, PlexConnection ActiveConnection) |
||||||
|
{ |
||||||
|
public PlexServerAuthToken? PlexServerAuthToken { get; set; } |
||||||
|
} |
||||||
|
} |
||||||
@ -0,0 +1,125 @@ |
|||||||
|
using ErsatzTV.Core; |
||||||
|
using ErsatzTV.Core.Domain; |
||||||
|
using ErsatzTV.Core.Interfaces.Plex; |
||||||
|
using ErsatzTV.Core.Interfaces.Repositories; |
||||||
|
using ErsatzTV.Core.MediaSources; |
||||||
|
using ErsatzTV.Core.Plex; |
||||||
|
using Microsoft.Extensions.Logging; |
||||||
|
|
||||||
|
namespace ErsatzTV.Scanner.Core.Plex; |
||||||
|
|
||||||
|
public class PlexCollectionScanner : IPlexCollectionScanner |
||||||
|
{ |
||||||
|
private readonly IPlexServerApiClient _plexServerApiClient; |
||||||
|
private readonly IPlexCollectionRepository _plexCollectionRepository; |
||||||
|
private readonly ILogger<PlexCollectionScanner> _logger; |
||||||
|
private readonly IMediator _mediator; |
||||||
|
|
||||||
|
public PlexCollectionScanner( |
||||||
|
IMediator mediator, |
||||||
|
IPlexCollectionRepository plexCollectionRepository, |
||||||
|
IPlexServerApiClient plexServerApiClient, |
||||||
|
ILogger<PlexCollectionScanner> logger) |
||||||
|
{ |
||||||
|
_mediator = mediator; |
||||||
|
_plexCollectionRepository = plexCollectionRepository; |
||||||
|
_plexServerApiClient = plexServerApiClient; |
||||||
|
_logger = logger; |
||||||
|
} |
||||||
|
|
||||||
|
public async Task<Either<BaseError, Unit>> ScanCollections( |
||||||
|
PlexConnection connection, |
||||||
|
PlexServerAuthToken token, |
||||||
|
CancellationToken cancellationToken) |
||||||
|
{ |
||||||
|
try |
||||||
|
{ |
||||||
|
var incomingKeys = new List<string>(); |
||||||
|
|
||||||
|
// get all collections from db (key, etag)
|
||||||
|
List<PlexCollection> existingCollections = await _plexCollectionRepository.GetCollections(); |
||||||
|
|
||||||
|
await foreach (PlexCollection collection in _plexServerApiClient.GetAllCollections( |
||||||
|
connection, |
||||||
|
token, |
||||||
|
cancellationToken)) |
||||||
|
{ |
||||||
|
incomingKeys.Add(collection.Key); |
||||||
|
|
||||||
|
Option<PlexCollection> maybeExisting = existingCollections.Find(c => c.Key == collection.Key); |
||||||
|
|
||||||
|
// skip if unchanged (etag)
|
||||||
|
if (await maybeExisting.Map(e => e.Etag ?? string.Empty).IfNoneAsync(string.Empty) == |
||||||
|
collection.Etag) |
||||||
|
{ |
||||||
|
_logger.LogDebug("Plex collection {Name} is unchanged", collection.Name); |
||||||
|
continue; |
||||||
|
} |
||||||
|
|
||||||
|
// add if new
|
||||||
|
if (maybeExisting.IsNone) |
||||||
|
{ |
||||||
|
_logger.LogDebug("Plex collection {Name} is new", collection.Name); |
||||||
|
await _plexCollectionRepository.AddCollection(collection); |
||||||
|
} |
||||||
|
|
||||||
|
await SyncCollectionItems(connection, token, collection, cancellationToken); |
||||||
|
|
||||||
|
// save collection etag
|
||||||
|
await _plexCollectionRepository.SetEtag(collection); |
||||||
|
} |
||||||
|
|
||||||
|
// remove missing collections (and remove any lingering tags from those collections)
|
||||||
|
foreach (PlexCollection collection in existingCollections.Filter(e => !incomingKeys.Contains(e.Key))) |
||||||
|
{ |
||||||
|
await _plexCollectionRepository.RemoveCollection(collection); |
||||||
|
} |
||||||
|
} |
||||||
|
catch (Exception ex) |
||||||
|
{ |
||||||
|
_logger.LogWarning(ex, "Failed to get collections from Plex"); |
||||||
|
return BaseError.New(ex.Message); |
||||||
|
} |
||||||
|
|
||||||
|
return Unit.Default; |
||||||
|
} |
||||||
|
|
||||||
|
private async Task SyncCollectionItems( |
||||||
|
PlexConnection connection, |
||||||
|
PlexServerAuthToken token, |
||||||
|
PlexCollection collection, |
||||||
|
CancellationToken cancellationToken) |
||||||
|
{ |
||||||
|
try |
||||||
|
{ |
||||||
|
// get collection items from Plex
|
||||||
|
IAsyncEnumerable<MediaItem> items = _plexServerApiClient.GetCollectionItems( |
||||||
|
connection, |
||||||
|
token, |
||||||
|
collection.Key, |
||||||
|
cancellationToken); |
||||||
|
|
||||||
|
List<int> removedIds = await _plexCollectionRepository.RemoveAllTags(collection); |
||||||
|
|
||||||
|
// sync tags on items
|
||||||
|
var addedIds = new List<int>(); |
||||||
|
await foreach (MediaItem item in items) |
||||||
|
{ |
||||||
|
addedIds.Add(await _plexCollectionRepository.AddTag(item, collection)); |
||||||
|
cancellationToken.ThrowIfCancellationRequested(); |
||||||
|
} |
||||||
|
|
||||||
|
_logger.LogDebug("Plex collection {Name} contains {Count} items", collection.Name, addedIds.Count); |
||||||
|
|
||||||
|
int[] changedIds = removedIds.Concat(addedIds).Distinct().ToArray(); |
||||||
|
|
||||||
|
await _mediator.Publish( |
||||||
|
new ScannerProgressUpdate(0, null, null, changedIds.ToArray(), Array.Empty<int>()), |
||||||
|
CancellationToken.None); |
||||||
|
} |
||||||
|
catch (Exception ex) |
||||||
|
{ |
||||||
|
_logger.LogWarning(ex, "Failed to synchronize Plex collection {Name}", collection.Name); |
||||||
|
} |
||||||
|
} |
||||||
|
} |
||||||
Loading…
Reference in new issue