From 234e93349b71363457f079114506d74bd7b63341 Mon Sep 17 00:00:00 2001 From: Jason Dove Date: Wed, 8 Mar 2023 21:23:18 -0600 Subject: [PATCH] rework concurrency (#1199) --- CHANGELOG.md | 4 + .../Emby/Commands/SynchronizeEmbyLibraries.cs | 2 +- .../Commands/SynchronizeEmbyLibraryById.cs | 2 +- .../SynchronizeEmbyMediaSourcesHandler.cs | 8 +- .../IScannerBackgroundServiceRequest.cs | 5 + .../SynchronizeJellyfinAdminUserId.cs | 2 +- .../Commands/SynchronizeJellyfinLibraries.cs | 2 +- .../SynchronizeJellyfinLibraryById.cs | 2 +- .../SynchronizeJellyfinMediaSourcesHandler.cs | 10 +- .../Commands/CreateLocalLibraryHandler.cs | 8 +- .../Commands/UpdateLocalLibraryHandler.cs | 8 +- .../MediaSources/Commands/ScanLocalLibrary.cs | 2 +- .../Commands/SynchronizePlexLibraryById.cs | 2 +- .../Search/Commands/RebuildSearchIndex.cs | 2 +- ErsatzTV/Pages/EmbyLibrariesEditor.razor | 6 +- ErsatzTV/Pages/EmbyMediaSources.razor | 4 +- ErsatzTV/Pages/JellyfinLibrariesEditor.razor | 6 +- ErsatzTV/Pages/JellyfinMediaSources.razor | 4 +- ErsatzTV/Pages/Libraries.razor | 17 +- ErsatzTV/Pages/LocalLibraryPathEditor.razor | 4 +- ErsatzTV/Pages/PlexLibrariesEditor.razor | 5 +- ErsatzTV/Services/EmbyService.cs | 37 --- ErsatzTV/Services/JellyfinService.cs | 79 ------ ErsatzTV/Services/PlexService.cs | 39 --- ErsatzTV/Services/ScannerService.cs | 259 ++++++++++++++++++ ErsatzTV/Services/SchedulerService.cs | 22 +- ErsatzTV/Services/WorkerService.cs | 39 --- ErsatzTV/Startup.cs | 2 + 28 files changed, 323 insertions(+), 259 deletions(-) create mode 100644 ErsatzTV.Application/IScannerBackgroundServiceRequest.cs create mode 100644 ErsatzTV/Services/ScannerService.cs diff --git a/CHANGELOG.md b/CHANGELOG.md index 9f51496d..e7109a9e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,6 +15,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). - For example, the scanner will now find `movie.MKV` as well as `movie.mkv` on case-sensitive filesystems - Include multiple `display-name` entries in generated XMLTV - Plex should now display the channel number instead of the channel id (e.g. `1.2` instead of `1.2.etv`) +- Rework concurrency a bit + - Playouts builds are no longer blocked by library scans + - Adding Trakt lists is no longer blocked by library scans + - All library scans (local and media servers) run sequentially ## [0.7.5-beta] - 2023-03-05 ### Added diff --git a/ErsatzTV.Application/Emby/Commands/SynchronizeEmbyLibraries.cs b/ErsatzTV.Application/Emby/Commands/SynchronizeEmbyLibraries.cs index 8c92328f..4275e922 100644 --- a/ErsatzTV.Application/Emby/Commands/SynchronizeEmbyLibraries.cs +++ b/ErsatzTV.Application/Emby/Commands/SynchronizeEmbyLibraries.cs @@ -3,4 +3,4 @@ namespace ErsatzTV.Application.Emby; public record SynchronizeEmbyLibraries(int EmbyMediaSourceId) : IRequest>, - IEmbyBackgroundServiceRequest; + IScannerBackgroundServiceRequest; diff --git a/ErsatzTV.Application/Emby/Commands/SynchronizeEmbyLibraryById.cs b/ErsatzTV.Application/Emby/Commands/SynchronizeEmbyLibraryById.cs index 913738ab..232af64d 100644 --- a/ErsatzTV.Application/Emby/Commands/SynchronizeEmbyLibraryById.cs +++ b/ErsatzTV.Application/Emby/Commands/SynchronizeEmbyLibraryById.cs @@ -2,7 +2,7 @@ namespace ErsatzTV.Application.Emby; -public interface ISynchronizeEmbyLibraryById : IRequest>, IEmbyBackgroundServiceRequest +public interface ISynchronizeEmbyLibraryById : IRequest>, IScannerBackgroundServiceRequest { int EmbyLibraryId { get; } bool ForceScan { get; } diff --git a/ErsatzTV.Application/Emby/Commands/SynchronizeEmbyMediaSourcesHandler.cs b/ErsatzTV.Application/Emby/Commands/SynchronizeEmbyMediaSourcesHandler.cs index ad1deac0..9793ecf6 100644 --- a/ErsatzTV.Application/Emby/Commands/SynchronizeEmbyMediaSourcesHandler.cs +++ b/ErsatzTV.Application/Emby/Commands/SynchronizeEmbyMediaSourcesHandler.cs @@ -8,15 +8,15 @@ namespace ErsatzTV.Application.Emby; public class SynchronizeEmbyMediaSourcesHandler : IRequestHandler>> { - private readonly ChannelWriter _channel; + private readonly ChannelWriter _scannerWorkerChannel; private readonly IMediaSourceRepository _mediaSourceRepository; public SynchronizeEmbyMediaSourcesHandler( IMediaSourceRepository mediaSourceRepository, - ChannelWriter channel) + ChannelWriter scannerWorkerChannel) { _mediaSourceRepository = mediaSourceRepository; - _channel = channel; + _scannerWorkerChannel = scannerWorkerChannel; } public async Task>> Handle( @@ -27,7 +27,7 @@ public class SynchronizeEmbyMediaSourcesHandler : IRequestHandler>, - IJellyfinBackgroundServiceRequest; + IScannerBackgroundServiceRequest; diff --git a/ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinLibraries.cs b/ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinLibraries.cs index 01459ed6..4bfc8106 100644 --- a/ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinLibraries.cs +++ b/ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinLibraries.cs @@ -3,4 +3,4 @@ namespace ErsatzTV.Application.Jellyfin; public record SynchronizeJellyfinLibraries(int JellyfinMediaSourceId) : IRequest>, - IJellyfinBackgroundServiceRequest; + IScannerBackgroundServiceRequest; diff --git a/ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinLibraryById.cs b/ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinLibraryById.cs index 188585f5..a7f80886 100644 --- a/ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinLibraryById.cs +++ b/ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinLibraryById.cs @@ -3,7 +3,7 @@ namespace ErsatzTV.Application.Jellyfin; public interface ISynchronizeJellyfinLibraryById : IRequest>, - IJellyfinBackgroundServiceRequest + IScannerBackgroundServiceRequest { int JellyfinLibraryId { get; } bool ForceScan { get; } diff --git a/ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinMediaSourcesHandler.cs b/ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinMediaSourcesHandler.cs index 4c1845bf..67eec5d4 100644 --- a/ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinMediaSourcesHandler.cs +++ b/ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinMediaSourcesHandler.cs @@ -8,15 +8,15 @@ namespace ErsatzTV.Application.Jellyfin; public class SynchronizeJellyfinMediaSourcesHandler : IRequestHandler>> { - private readonly ChannelWriter _channel; + private readonly ChannelWriter _scannerWorkerChannel; private readonly IMediaSourceRepository _mediaSourceRepository; public SynchronizeJellyfinMediaSourcesHandler( IMediaSourceRepository mediaSourceRepository, - ChannelWriter channel) + ChannelWriter scannerWorkerChannel) { _mediaSourceRepository = mediaSourceRepository; - _channel = channel; + _scannerWorkerChannel = scannerWorkerChannel; } public async Task>> Handle( @@ -26,8 +26,8 @@ public class SynchronizeJellyfinMediaSourcesHandler : IRequestHandler mediaSources = await _mediaSourceRepository.GetAllJellyfin(); foreach (JellyfinMediaSource mediaSource in mediaSources) { - await _channel.WriteAsync(new SynchronizeJellyfinAdminUserId(mediaSource.Id), cancellationToken); - await _channel.WriteAsync(new SynchronizeJellyfinLibraries(mediaSource.Id), cancellationToken); + await _scannerWorkerChannel.WriteAsync(new SynchronizeJellyfinAdminUserId(mediaSource.Id), cancellationToken); + await _scannerWorkerChannel.WriteAsync(new SynchronizeJellyfinLibraries(mediaSource.Id), cancellationToken); } return mediaSources; diff --git a/ErsatzTV.Application/Libraries/Commands/CreateLocalLibraryHandler.cs b/ErsatzTV.Application/Libraries/Commands/CreateLocalLibraryHandler.cs index d5481a87..fbaca344 100644 --- a/ErsatzTV.Application/Libraries/Commands/CreateLocalLibraryHandler.cs +++ b/ErsatzTV.Application/Libraries/Commands/CreateLocalLibraryHandler.cs @@ -14,14 +14,14 @@ public class CreateLocalLibraryHandler : LocalLibraryHandlerBase, { private readonly IDbContextFactory _dbContextFactory; private readonly IEntityLocker _entityLocker; - private readonly ChannelWriter _workerChannel; + private readonly ChannelWriter _scannerWorkerChannel; public CreateLocalLibraryHandler( - ChannelWriter workerChannel, + ChannelWriter scannerWorkerChannel, IEntityLocker entityLocker, IDbContextFactory dbContextFactory) { - _workerChannel = workerChannel; + _scannerWorkerChannel = scannerWorkerChannel; _entityLocker = entityLocker; _dbContextFactory = dbContextFactory; } @@ -44,7 +44,7 @@ public class CreateLocalLibraryHandler : LocalLibraryHandlerBase, if (_entityLocker.LockLibrary(localLibrary.Id)) { - await _workerChannel.WriteAsync(new ForceScanLocalLibrary(localLibrary.Id)); + await _scannerWorkerChannel.WriteAsync(new ForceScanLocalLibrary(localLibrary.Id)); } return ProjectToViewModel(localLibrary); diff --git a/ErsatzTV.Application/Libraries/Commands/UpdateLocalLibraryHandler.cs b/ErsatzTV.Application/Libraries/Commands/UpdateLocalLibraryHandler.cs index 30d524e6..abffb513 100644 --- a/ErsatzTV.Application/Libraries/Commands/UpdateLocalLibraryHandler.cs +++ b/ErsatzTV.Application/Libraries/Commands/UpdateLocalLibraryHandler.cs @@ -17,15 +17,15 @@ public class UpdateLocalLibraryHandler : LocalLibraryHandlerBase, private readonly IDbContextFactory _dbContextFactory; private readonly IEntityLocker _entityLocker; private readonly ISearchIndex _searchIndex; - private readonly ChannelWriter _workerChannel; + private readonly ChannelWriter _scannerWorkerChannel; public UpdateLocalLibraryHandler( - ChannelWriter workerChannel, + ChannelWriter scannerWorkerChannel, IEntityLocker entityLocker, ISearchIndex searchIndex, IDbContextFactory dbContextFactory) { - _workerChannel = workerChannel; + _scannerWorkerChannel = scannerWorkerChannel; _entityLocker = entityLocker; _searchIndex = searchIndex; _dbContextFactory = dbContextFactory; @@ -70,7 +70,7 @@ public class UpdateLocalLibraryHandler : LocalLibraryHandlerBase, if ((toAdd.Count > 0 || toRemove.Count > 0) && _entityLocker.LockLibrary(existing.Id)) { - await _workerChannel.WriteAsync(new ForceScanLocalLibrary(existing.Id)); + await _scannerWorkerChannel.WriteAsync(new ForceScanLocalLibrary(existing.Id)); } return ProjectToViewModel(existing); diff --git a/ErsatzTV.Application/MediaSources/Commands/ScanLocalLibrary.cs b/ErsatzTV.Application/MediaSources/Commands/ScanLocalLibrary.cs index 260da1ce..55d44753 100644 --- a/ErsatzTV.Application/MediaSources/Commands/ScanLocalLibrary.cs +++ b/ErsatzTV.Application/MediaSources/Commands/ScanLocalLibrary.cs @@ -2,7 +2,7 @@ namespace ErsatzTV.Application.MediaSources; -public interface IScanLocalLibrary : IRequest>, IBackgroundServiceRequest +public interface IScanLocalLibrary : IRequest>, IScannerBackgroundServiceRequest { int LibraryId { get; } bool ForceScan { get; } diff --git a/ErsatzTV.Application/Plex/Commands/SynchronizePlexLibraryById.cs b/ErsatzTV.Application/Plex/Commands/SynchronizePlexLibraryById.cs index f59034e2..c9b704f3 100644 --- a/ErsatzTV.Application/Plex/Commands/SynchronizePlexLibraryById.cs +++ b/ErsatzTV.Application/Plex/Commands/SynchronizePlexLibraryById.cs @@ -2,7 +2,7 @@ namespace ErsatzTV.Application.Plex; -public interface ISynchronizePlexLibraryById : IRequest>, IPlexBackgroundServiceRequest +public interface ISynchronizePlexLibraryById : IRequest>, IScannerBackgroundServiceRequest { int PlexLibraryId { get; } bool ForceScan { get; } diff --git a/ErsatzTV.Application/Search/Commands/RebuildSearchIndex.cs b/ErsatzTV.Application/Search/Commands/RebuildSearchIndex.cs index dc709272..2285e207 100644 --- a/ErsatzTV.Application/Search/Commands/RebuildSearchIndex.cs +++ b/ErsatzTV.Application/Search/Commands/RebuildSearchIndex.cs @@ -1,3 +1,3 @@ namespace ErsatzTV.Application.Search; -public record RebuildSearchIndex : IRequest, IBackgroundServiceRequest; +public record RebuildSearchIndex : IRequest, IScannerBackgroundServiceRequest; diff --git a/ErsatzTV/Pages/EmbyLibrariesEditor.razor b/ErsatzTV/Pages/EmbyLibrariesEditor.razor index bd8c0419..37806a49 100644 --- a/ErsatzTV/Pages/EmbyLibrariesEditor.razor +++ b/ErsatzTV/Pages/EmbyLibrariesEditor.razor @@ -3,7 +3,7 @@ @using ErsatzTV.Application.MediaSources @implements IDisposable @inject IMediator _mediator -@inject ChannelWriter _channel +@inject ChannelWriter _scannerWorkerChannel SynchronizeLibraryByIdIfNeeded(RemoteMediaSourceLibrariesEditor.SynchronizeParameters parameters) { - await _channel.WriteAsync(new SynchronizeEmbyLibraries(parameters.MediaSourceId), _cts.Token); - await _channel.WriteAsync(new SynchronizeEmbyLibraryByIdIfNeeded(parameters.LibraryId), _cts.Token); + await _scannerWorkerChannel.WriteAsync(new SynchronizeEmbyLibraries(parameters.MediaSourceId), _cts.Token); + await _scannerWorkerChannel.WriteAsync(new SynchronizeEmbyLibraryByIdIfNeeded(parameters.LibraryId), _cts.Token); return Unit.Default; } diff --git a/ErsatzTV/Pages/EmbyMediaSources.razor b/ErsatzTV/Pages/EmbyMediaSources.razor index 4f9c4590..50dddfcb 100644 --- a/ErsatzTV/Pages/EmbyMediaSources.razor +++ b/ErsatzTV/Pages/EmbyMediaSources.razor @@ -4,7 +4,7 @@ @using ErsatzTV.Core.Emby @implements IDisposable @inject IEmbySecretStore _embySecretStore -@inject ChannelWriter _channel +@inject ChannelWriter _scannerWorkerChannel - await _channel.WriteAsync(new SynchronizeEmbyLibraries(mediaSourceId), _cts.Token); + await _scannerWorkerChannel.WriteAsync(new SynchronizeEmbyLibraries(mediaSourceId), _cts.Token); } \ No newline at end of file diff --git a/ErsatzTV/Pages/JellyfinLibrariesEditor.razor b/ErsatzTV/Pages/JellyfinLibrariesEditor.razor index bd4ce234..6da9f657 100644 --- a/ErsatzTV/Pages/JellyfinLibrariesEditor.razor +++ b/ErsatzTV/Pages/JellyfinLibrariesEditor.razor @@ -3,7 +3,7 @@ @using ErsatzTV.Application.MediaSources @implements IDisposable @inject IMediator _mediator -@inject ChannelWriter _channel +@inject ChannelWriter _scannerWorkerChannel SynchronizeLibraryByIdIfNeeded(RemoteMediaSourceLibrariesEditor.SynchronizeParameters parameters) { - await _channel.WriteAsync(new SynchronizeJellyfinLibraries(parameters.MediaSourceId), _cts.Token); - await _channel.WriteAsync(new SynchronizeJellyfinLibraryByIdIfNeeded(parameters.LibraryId), _cts.Token); + await _scannerWorkerChannel.WriteAsync(new SynchronizeJellyfinLibraries(parameters.MediaSourceId), _cts.Token); + await _scannerWorkerChannel.WriteAsync(new SynchronizeJellyfinLibraryByIdIfNeeded(parameters.LibraryId), _cts.Token); return Unit.Default; } diff --git a/ErsatzTV/Pages/JellyfinMediaSources.razor b/ErsatzTV/Pages/JellyfinMediaSources.razor index 36d71079..11b63c86 100644 --- a/ErsatzTV/Pages/JellyfinMediaSources.razor +++ b/ErsatzTV/Pages/JellyfinMediaSources.razor @@ -4,7 +4,7 @@ @using ErsatzTV.Core.Jellyfin @implements IDisposable @inject IJellyfinSecretStore _jellyfinSecretStore -@inject ChannelWriter _channel +@inject ChannelWriter _scannerWorkerChannel - await _channel.WriteAsync(new SynchronizeJellyfinLibraries(mediaSourceId), _cts.Token); + await _scannerWorkerChannel.WriteAsync(new SynchronizeJellyfinLibraries(mediaSourceId), _cts.Token); } \ No newline at end of file diff --git a/ErsatzTV/Pages/Libraries.razor b/ErsatzTV/Pages/Libraries.razor index c7add581..3c9c1ab1 100644 --- a/ErsatzTV/Pages/Libraries.razor +++ b/ErsatzTV/Pages/Libraries.razor @@ -10,10 +10,7 @@ @implements IDisposable @inject IMediator _mediator @inject IEntityLocker _locker -@inject ChannelWriter _workerChannel -@inject ChannelWriter _plexWorkerChannel -@inject ChannelWriter _jellyfinWorkerChannel -@inject ChannelWriter _embyWorkerChannel +@inject ChannelWriter _scannerWorkerChannel; @inject ICourier _courier @@ -113,18 +110,18 @@ switch (library) { case LocalLibraryViewModel: - await _workerChannel.WriteAsync(new ForceScanLocalLibrary(library.Id), _cts.Token); + await _scannerWorkerChannel.WriteAsync(new ForceScanLocalLibrary(library.Id), _cts.Token); break; case PlexLibraryViewModel: - await _plexWorkerChannel.WriteAsync(new ForceSynchronizePlexLibraryById(library.Id, deepScan), _cts.Token); + await _scannerWorkerChannel.WriteAsync(new ForceSynchronizePlexLibraryById(library.Id, deepScan), _cts.Token); break; case JellyfinLibraryViewModel: - await _jellyfinWorkerChannel.WriteAsync(new SynchronizeJellyfinLibraries(library.MediaSourceId), _cts.Token); - await _jellyfinWorkerChannel.WriteAsync(new ForceSynchronizeJellyfinLibraryById(library.Id, deepScan), _cts.Token); + await _scannerWorkerChannel.WriteAsync(new SynchronizeJellyfinLibraries(library.MediaSourceId), _cts.Token); + await _scannerWorkerChannel.WriteAsync(new ForceSynchronizeJellyfinLibraryById(library.Id, deepScan), _cts.Token); break; case EmbyLibraryViewModel: - await _embyWorkerChannel.WriteAsync(new SynchronizeEmbyLibraries(library.MediaSourceId), _cts.Token); - await _embyWorkerChannel.WriteAsync(new ForceSynchronizeEmbyLibraryById(library.Id, deepScan), _cts.Token); + await _scannerWorkerChannel.WriteAsync(new SynchronizeEmbyLibraries(library.MediaSourceId), _cts.Token); + await _scannerWorkerChannel.WriteAsync(new ForceSynchronizeEmbyLibraryById(library.Id, deepScan), _cts.Token); break; } diff --git a/ErsatzTV/Pages/LocalLibraryPathEditor.razor b/ErsatzTV/Pages/LocalLibraryPathEditor.razor index fe12a5cd..da56b883 100644 --- a/ErsatzTV/Pages/LocalLibraryPathEditor.razor +++ b/ErsatzTV/Pages/LocalLibraryPathEditor.razor @@ -7,7 +7,7 @@ @inject ISnackbar _snackbar @inject IMediator _mediator @inject IEntityLocker _locker -@inject ChannelWriter _channel +@inject ChannelWriter _scannerWorkerChannel; @_library.Name - Add Local Library Path @@ -80,7 +80,7 @@ { if (_locker.LockLibrary(_library.Id)) { - await _channel.WriteAsync(new ScanLocalLibraryIfNeeded(_library.Id), _cts.Token); + await _scannerWorkerChannel.WriteAsync(new ScanLocalLibraryIfNeeded(_library.Id), _cts.Token); _navigationManager.NavigateTo("media/libraries"); } }); diff --git a/ErsatzTV/Pages/PlexLibrariesEditor.razor b/ErsatzTV/Pages/PlexLibrariesEditor.razor index 6275de47..494e2850 100644 --- a/ErsatzTV/Pages/PlexLibrariesEditor.razor +++ b/ErsatzTV/Pages/PlexLibrariesEditor.razor @@ -3,7 +3,7 @@ @using ErsatzTV.Application.MediaSources @implements IDisposable @inject IMediator _mediator -@inject ChannelWriter _channel +@inject ChannelWriter _scannerWorkerChannel SynchronizeLibraryByIdIfNeeded(RemoteMediaSourceLibrariesEditor.SynchronizeParameters parameters) { - await _channel.WriteAsync(new SynchronizePlexLibraryByIdIfNeeded(parameters.LibraryId), _cts.Token); + await _scannerWorkerChannel.WriteAsync(new SynchronizePlexLibraryByIdIfNeeded(parameters.LibraryId), _cts.Token); return Unit.Default; } - } \ No newline at end of file diff --git a/ErsatzTV/Services/EmbyService.cs b/ErsatzTV/Services/EmbyService.cs index 7d0dc10a..a32b4b88 100644 --- a/ErsatzTV/Services/EmbyService.cs +++ b/ErsatzTV/Services/EmbyService.cs @@ -4,8 +4,6 @@ using ErsatzTV.Application; using ErsatzTV.Application.Emby; using ErsatzTV.Core; using ErsatzTV.Core.Domain; -using ErsatzTV.Core.Errors; -using ErsatzTV.Core.Interfaces.Locking; using MediatR; namespace ErsatzTV.Services; @@ -55,9 +53,6 @@ public class EmbyService : BackgroundService case SynchronizeEmbyLibraries synchronizeEmbyLibraries: requestTask = SynchronizeLibraries(synchronizeEmbyLibraries, cancellationToken); break; - case ISynchronizeEmbyLibraryById synchronizeEmbyLibraryById: - requestTask = SynchronizeEmbyLibrary(synchronizeEmbyLibraryById, cancellationToken); - break; default: throw new NotSupportedException($"Unsupported request type: {request.GetType().Name}"); } @@ -126,36 +121,4 @@ public class EmbyService : BackgroundService request.EmbyMediaSourceId, error.Value)); } - - private async Task SynchronizeEmbyLibrary(ISynchronizeEmbyLibraryById request, CancellationToken cancellationToken) - { - using IServiceScope scope = _serviceScopeFactory.CreateScope(); - IMediator mediator = scope.ServiceProvider.GetRequiredService(); - IEntityLocker entityLocker = scope.ServiceProvider.GetRequiredService(); - - Either result = await mediator.Send(request, cancellationToken); - result.BiIter( - name => _logger.LogDebug("Done synchronizing emby library {Name}", name), - error => - { - if (error is ScanIsNotRequired) - { - _logger.LogDebug( - "Scan is not required for emby library {LibraryId} at this time", - request.EmbyLibraryId); - } - else - { - _logger.LogWarning( - "Unable to synchronize emby library {LibraryId}: {Error}", - request.EmbyLibraryId, - error.Value); - } - }); - - if (entityLocker.IsLibraryLocked(request.EmbyLibraryId)) - { - entityLocker.UnlockLibrary(request.EmbyLibraryId); - } - } } diff --git a/ErsatzTV/Services/JellyfinService.cs b/ErsatzTV/Services/JellyfinService.cs index 988213b7..310f0157 100644 --- a/ErsatzTV/Services/JellyfinService.cs +++ b/ErsatzTV/Services/JellyfinService.cs @@ -4,8 +4,6 @@ using ErsatzTV.Application; using ErsatzTV.Application.Jellyfin; using ErsatzTV.Core; using ErsatzTV.Core.Domain; -using ErsatzTV.Core.Errors; -using ErsatzTV.Core.Interfaces.Locking; using MediatR; namespace ErsatzTV.Services; @@ -52,15 +50,6 @@ public class JellyfinService : BackgroundService case SynchronizeJellyfinMediaSources synchronizeJellyfinMediaSources: requestTask = SynchronizeSources(synchronizeJellyfinMediaSources, cancellationToken); break; - case SynchronizeJellyfinAdminUserId synchronizeJellyfinAdminUserId: - requestTask = SynchronizeAdminUserId(synchronizeJellyfinAdminUserId, cancellationToken); - break; - case SynchronizeJellyfinLibraries synchronizeJellyfinLibraries: - requestTask = SynchronizeLibraries(synchronizeJellyfinLibraries, cancellationToken); - break; - case ISynchronizeJellyfinLibraryById synchronizeJellyfinLibraryById: - requestTask = SynchronizeJellyfinLibrary(synchronizeJellyfinLibraryById, cancellationToken); - break; default: throw new NotSupportedException($"Unsupported request type: {request.GetType().Name}"); } @@ -115,72 +104,4 @@ public class JellyfinService : BackgroundService error.Value); }); } - - private async Task SynchronizeLibraries(SynchronizeJellyfinLibraries request, CancellationToken cancellationToken) - { - using IServiceScope scope = _serviceScopeFactory.CreateScope(); - IMediator mediator = scope.ServiceProvider.GetRequiredService(); - - Either result = await mediator.Send(request, cancellationToken); - result.BiIter( - _ => _logger.LogInformation( - "Successfully synchronized Jellyfin libraries for source {MediaSourceId}", - request.JellyfinMediaSourceId), - error => _logger.LogWarning( - "Unable to synchronize Jellyfin libraries for source {MediaSourceId}: {Error}", - request.JellyfinMediaSourceId, - error.Value)); - } - - private async Task SynchronizeAdminUserId( - SynchronizeJellyfinAdminUserId request, - CancellationToken cancellationToken) - { - using IServiceScope scope = _serviceScopeFactory.CreateScope(); - IMediator mediator = scope.ServiceProvider.GetRequiredService(); - - Either result = await mediator.Send(request, cancellationToken); - result.BiIter( - _ => _logger.LogInformation( - "Successfully synchronized Jellyfin admin user id for source {MediaSourceId}", - request.JellyfinMediaSourceId), - error => _logger.LogWarning( - "Unable to synchronize Jellyfin admin user id for source {MediaSourceId}: {Error}", - request.JellyfinMediaSourceId, - error.Value)); - } - - private async Task SynchronizeJellyfinLibrary( - ISynchronizeJellyfinLibraryById request, - CancellationToken cancellationToken) - { - using IServiceScope scope = _serviceScopeFactory.CreateScope(); - IMediator mediator = scope.ServiceProvider.GetRequiredService(); - IEntityLocker entityLocker = scope.ServiceProvider.GetRequiredService(); - - Either result = await mediator.Send(request, cancellationToken); - result.BiIter( - name => _logger.LogDebug("Done synchronizing jellyfin library {Name}", name), - error => - { - if (error is ScanIsNotRequired) - { - _logger.LogDebug( - "Scan is not required for jellyfin library {LibraryId} at this time", - request.JellyfinLibraryId); - } - else - { - _logger.LogWarning( - "Unable to synchronize jellyfin library {LibraryId}: {Error}", - request.JellyfinLibraryId, - error.Value); - } - }); - - if (entityLocker.IsLibraryLocked(request.JellyfinLibraryId)) - { - entityLocker.UnlockLibrary(request.JellyfinLibraryId); - } - } } diff --git a/ErsatzTV/Services/PlexService.cs b/ErsatzTV/Services/PlexService.cs index 6c596228..0e00adc0 100644 --- a/ErsatzTV/Services/PlexService.cs +++ b/ErsatzTV/Services/PlexService.cs @@ -4,8 +4,6 @@ using ErsatzTV.Application; using ErsatzTV.Application.Plex; using ErsatzTV.Core; using ErsatzTV.Core.Domain; -using ErsatzTV.Core.Errors; -using ErsatzTV.Core.Interfaces.Locking; using MediatR; namespace ErsatzTV.Services; @@ -58,9 +56,6 @@ public class PlexService : BackgroundService case SynchronizePlexLibraries synchronizePlexLibrariesRequest: requestTask = SynchronizeLibraries(synchronizePlexLibrariesRequest, cancellationToken); break; - case ISynchronizePlexLibraryById synchronizePlexLibraryById: - requestTask = SynchronizePlexLibrary(synchronizePlexLibraryById, cancellationToken); - break; default: throw new NotSupportedException($"Unsupported request type: {request.GetType().Name}"); } @@ -148,38 +143,4 @@ public class PlexService : BackgroundService request.PlexMediaSourceId, error.Value)); } - - private async Task SynchronizePlexLibrary( - ISynchronizePlexLibraryById request, - CancellationToken cancellationToken) - { - using IServiceScope scope = _serviceScopeFactory.CreateScope(); - IMediator mediator = scope.ServiceProvider.GetRequiredService(); - IEntityLocker entityLocker = scope.ServiceProvider.GetRequiredService(); - - Either result = await mediator.Send(request, cancellationToken); - result.BiIter( - name => _logger.LogDebug("Done synchronizing plex library {Name}", name), - error => - { - if (error is ScanIsNotRequired) - { - _logger.LogDebug( - "Scan is not required for plex library {LibraryId} at this time", - request.PlexLibraryId); - } - else - { - _logger.LogWarning( - "Unable to synchronize plex library {LibraryId}: {Error}", - request.PlexLibraryId, - error.Value); - } - }); - - if (entityLocker.IsLibraryLocked(request.PlexLibraryId)) - { - entityLocker.UnlockLibrary(request.PlexLibraryId); - } - } } diff --git a/ErsatzTV/Services/ScannerService.cs b/ErsatzTV/Services/ScannerService.cs new file mode 100644 index 00000000..0042f4d5 --- /dev/null +++ b/ErsatzTV/Services/ScannerService.cs @@ -0,0 +1,259 @@ +using System.Threading.Channels; +using Bugsnag; +using ErsatzTV.Application; +using ErsatzTV.Application.Emby; +using ErsatzTV.Application.Jellyfin; +using ErsatzTV.Application.MediaSources; +using ErsatzTV.Application.Plex; +using ErsatzTV.Core; +using ErsatzTV.Core.Errors; +using ErsatzTV.Core.Interfaces.Locking; +using MediatR; + +namespace ErsatzTV.Services; + +public class ScannerService : BackgroundService +{ + private readonly ChannelReader _channel; + private readonly IServiceScopeFactory _serviceScopeFactory; + private readonly ILogger _logger; + + public ScannerService( + ChannelReader channel, + IServiceScopeFactory serviceScopeFactory, + ILogger logger) + { + _channel = channel; + _serviceScopeFactory = serviceScopeFactory; + _logger = logger; + } + + protected override async Task ExecuteAsync(CancellationToken cancellationToken) + { + try + { + _logger.LogInformation("Scanner service started"); + + await foreach (IScannerBackgroundServiceRequest request in _channel.ReadAllAsync(cancellationToken)) + { + try + { + Task requestTask; + switch (request) + { + case ISynchronizePlexLibraryById synchronizePlexLibraryById: + requestTask = SynchronizePlexLibrary(synchronizePlexLibraryById, cancellationToken); + break; + case SynchronizeJellyfinAdminUserId synchronizeJellyfinAdminUserId: + requestTask = SynchronizeAdminUserId(synchronizeJellyfinAdminUserId, cancellationToken); + break; + case SynchronizeJellyfinLibraries synchronizeJellyfinLibraries: + requestTask = SynchronizeLibraries(synchronizeJellyfinLibraries, cancellationToken); + break; + case ISynchronizeJellyfinLibraryById synchronizeJellyfinLibraryById: + requestTask = SynchronizeJellyfinLibrary(synchronizeJellyfinLibraryById, cancellationToken); + break; + case ISynchronizeEmbyLibraryById synchronizeEmbyLibraryById: + requestTask = SynchronizeEmbyLibrary(synchronizeEmbyLibraryById, cancellationToken); + break; + case IScanLocalLibrary scanLocalLibrary: + requestTask = SynchronizeLocalLibrary(scanLocalLibrary, cancellationToken); + break; + default: + throw new NotSupportedException($"Unsupported request type: {request.GetType().Name}"); + } + + await requestTask; + } + catch (Exception ex) + { + _logger.LogWarning(ex, "Failed to process scanner background service request"); + + try + { + using IServiceScope scope = _serviceScopeFactory.CreateScope(); + IClient client = scope.ServiceProvider.GetRequiredService(); + client.Notify(ex); + } + catch (Exception) + { + // do nothing + } + } + } + } + catch (Exception ex) when (ex is TaskCanceledException or OperationCanceledException) + { + _logger.LogInformation("Plex service shutting down"); + } + } + + private async Task SynchronizeLocalLibrary(IScanLocalLibrary request, CancellationToken cancellationToken) + { + using IServiceScope scope = _serviceScopeFactory.CreateScope(); + IMediator mediator = scope.ServiceProvider.GetRequiredService(); + IEntityLocker entityLocker = scope.ServiceProvider.GetRequiredService(); + + Either scanResult = await mediator.Send(request, cancellationToken); + scanResult.BiIter( + name => _logger.LogDebug( + "Done scanning local library {Library}", + name), + error => + { + if (error is ScanIsNotRequired) + { + _logger.LogDebug( + "Scan is not required for local library {LibraryId} at this time", + request.LibraryId); + } + else + { + _logger.LogWarning( + "Unable to scan local library {LibraryId}: {Error}", + request.LibraryId, + error.Value); + } + }); + + if (entityLocker.IsLibraryLocked(request.LibraryId)) + { + entityLocker.UnlockLibrary(request.LibraryId); + } + } + + private async Task SynchronizePlexLibrary( + ISynchronizePlexLibraryById request, + CancellationToken cancellationToken) + { + using IServiceScope scope = _serviceScopeFactory.CreateScope(); + IMediator mediator = scope.ServiceProvider.GetRequiredService(); + IEntityLocker entityLocker = scope.ServiceProvider.GetRequiredService(); + + Either result = await mediator.Send(request, cancellationToken); + result.BiIter( + name => _logger.LogDebug("Done synchronizing plex library {Name}", name), + error => + { + if (error is ScanIsNotRequired) + { + _logger.LogDebug( + "Scan is not required for plex library {LibraryId} at this time", + request.PlexLibraryId); + } + else + { + _logger.LogWarning( + "Unable to synchronize plex library {LibraryId}: {Error}", + request.PlexLibraryId, + error.Value); + } + }); + + if (entityLocker.IsLibraryLocked(request.PlexLibraryId)) + { + entityLocker.UnlockLibrary(request.PlexLibraryId); + } + } + + private async Task SynchronizeAdminUserId( + SynchronizeJellyfinAdminUserId request, + CancellationToken cancellationToken) + { + using IServiceScope scope = _serviceScopeFactory.CreateScope(); + IMediator mediator = scope.ServiceProvider.GetRequiredService(); + + Either result = await mediator.Send(request, cancellationToken); + result.BiIter( + _ => _logger.LogInformation( + "Successfully synchronized Jellyfin admin user id for source {MediaSourceId}", + request.JellyfinMediaSourceId), + error => _logger.LogWarning( + "Unable to synchronize Jellyfin admin user id for source {MediaSourceId}: {Error}", + request.JellyfinMediaSourceId, + error.Value)); + } + + private async Task SynchronizeLibraries(SynchronizeJellyfinLibraries request, CancellationToken cancellationToken) + { + using IServiceScope scope = _serviceScopeFactory.CreateScope(); + IMediator mediator = scope.ServiceProvider.GetRequiredService(); + + Either result = await mediator.Send(request, cancellationToken); + result.BiIter( + _ => _logger.LogInformation( + "Successfully synchronized Jellyfin libraries for source {MediaSourceId}", + request.JellyfinMediaSourceId), + error => _logger.LogWarning( + "Unable to synchronize Jellyfin libraries for source {MediaSourceId}: {Error}", + request.JellyfinMediaSourceId, + error.Value)); + } + + private async Task SynchronizeJellyfinLibrary( + ISynchronizeJellyfinLibraryById request, + CancellationToken cancellationToken) + { + using IServiceScope scope = _serviceScopeFactory.CreateScope(); + IMediator mediator = scope.ServiceProvider.GetRequiredService(); + IEntityLocker entityLocker = scope.ServiceProvider.GetRequiredService(); + + Either result = await mediator.Send(request, cancellationToken); + result.BiIter( + name => _logger.LogDebug("Done synchronizing jellyfin library {Name}", name), + error => + { + if (error is ScanIsNotRequired) + { + _logger.LogDebug( + "Scan is not required for jellyfin library {LibraryId} at this time", + request.JellyfinLibraryId); + } + else + { + _logger.LogWarning( + "Unable to synchronize jellyfin library {LibraryId}: {Error}", + request.JellyfinLibraryId, + error.Value); + } + }); + + if (entityLocker.IsLibraryLocked(request.JellyfinLibraryId)) + { + entityLocker.UnlockLibrary(request.JellyfinLibraryId); + } + } + + private async Task SynchronizeEmbyLibrary(ISynchronizeEmbyLibraryById request, CancellationToken cancellationToken) + { + using IServiceScope scope = _serviceScopeFactory.CreateScope(); + IMediator mediator = scope.ServiceProvider.GetRequiredService(); + IEntityLocker entityLocker = scope.ServiceProvider.GetRequiredService(); + + Either result = await mediator.Send(request, cancellationToken); + result.BiIter( + name => _logger.LogDebug("Done synchronizing emby library {Name}", name), + error => + { + if (error is ScanIsNotRequired) + { + _logger.LogDebug( + "Scan is not required for emby library {LibraryId} at this time", + request.EmbyLibraryId); + } + else + { + _logger.LogWarning( + "Unable to synchronize emby library {LibraryId}: {Error}", + request.EmbyLibraryId, + error.Value); + } + }); + + if (entityLocker.IsLibraryLocked(request.EmbyLibraryId)) + { + entityLocker.UnlockLibrary(request.EmbyLibraryId); + } + } +} + diff --git a/ErsatzTV/Services/SchedulerService.cs b/ErsatzTV/Services/SchedulerService.cs index 70876d99..98c0e89f 100644 --- a/ErsatzTV/Services/SchedulerService.cs +++ b/ErsatzTV/Services/SchedulerService.cs @@ -18,28 +18,22 @@ namespace ErsatzTV.Services; public class SchedulerService : BackgroundService { - private readonly ChannelWriter _embyWorkerChannel; + private readonly ChannelWriter _scannerWorkerChannel; private readonly IEntityLocker _entityLocker; - private readonly ChannelWriter _jellyfinWorkerChannel; private readonly ILogger _logger; - private readonly ChannelWriter _plexWorkerChannel; private readonly IServiceScopeFactory _serviceScopeFactory; private readonly ChannelWriter _workerChannel; public SchedulerService( IServiceScopeFactory serviceScopeFactory, ChannelWriter workerChannel, - ChannelWriter plexWorkerChannel, - ChannelWriter jellyfinWorkerChannel, - ChannelWriter embyWorkerChannel, + ChannelWriter scannerWorkerChannel, IEntityLocker entityLocker, ILogger logger) { _serviceScopeFactory = serviceScopeFactory; _workerChannel = workerChannel; - _plexWorkerChannel = plexWorkerChannel; - _jellyfinWorkerChannel = jellyfinWorkerChannel; - _embyWorkerChannel = embyWorkerChannel; + _scannerWorkerChannel = scannerWorkerChannel; _entityLocker = entityLocker; _logger = logger; } @@ -200,9 +194,7 @@ public class SchedulerService : BackgroundService { if (_entityLocker.LockLibrary(libraryId)) { - await _workerChannel.WriteAsync( - new ScanLocalLibraryIfNeeded(libraryId), - cancellationToken); + await _scannerWorkerChannel.WriteAsync(new ScanLocalLibraryIfNeeded(libraryId), cancellationToken); } } } @@ -216,7 +208,7 @@ public class SchedulerService : BackgroundService { if (_entityLocker.LockLibrary(library.Id)) { - await _plexWorkerChannel.WriteAsync( + await _scannerWorkerChannel.WriteAsync( new SynchronizePlexLibraryByIdIfNeeded(library.Id), cancellationToken); } @@ -232,7 +224,7 @@ public class SchedulerService : BackgroundService { if (_entityLocker.LockLibrary(library.Id)) { - await _jellyfinWorkerChannel.WriteAsync( + await _scannerWorkerChannel.WriteAsync( new SynchronizeJellyfinLibraryByIdIfNeeded(library.Id), cancellationToken); } @@ -248,7 +240,7 @@ public class SchedulerService : BackgroundService { if (_entityLocker.LockLibrary(library.Id)) { - await _embyWorkerChannel.WriteAsync( + await _scannerWorkerChannel.WriteAsync( new SynchronizeEmbyLibraryByIdIfNeeded(library.Id), cancellationToken); } diff --git a/ErsatzTV/Services/WorkerService.cs b/ErsatzTV/Services/WorkerService.cs index 2ee91680..b72509b6 100644 --- a/ErsatzTV/Services/WorkerService.cs +++ b/ErsatzTV/Services/WorkerService.cs @@ -3,13 +3,9 @@ using Bugsnag; using ErsatzTV.Application; using ErsatzTV.Application.Maintenance; using ErsatzTV.Application.MediaCollections; -using ErsatzTV.Application.MediaSources; using ErsatzTV.Application.Playouts; -using ErsatzTV.Application.Search; using ErsatzTV.Application.Subtitles; using ErsatzTV.Core; -using ErsatzTV.Core.Errors; -using ErsatzTV.Core.Interfaces.Locking; using MediatR; namespace ErsatzTV.Services; @@ -48,7 +44,6 @@ public class WorkerService : BackgroundService try { IMediator mediator = scope.ServiceProvider.GetRequiredService(); - IEntityLocker entityLocker = scope.ServiceProvider.GetRequiredService(); switch (request) { @@ -63,40 +58,6 @@ public class WorkerService : BackgroundService buildPlayout.PlayoutId, error.Value)); break; - case IScanLocalLibrary scanLocalLibrary: - Either scanResult = await mediator.Send( - scanLocalLibrary, - cancellationToken); - - scanResult.BiIter( - name => _logger.LogDebug( - "Done scanning local library {Library}", - name), - error => - { - if (error is ScanIsNotRequired) - { - _logger.LogDebug( - "Scan is not required for local library {LibraryId} at this time", - scanLocalLibrary.LibraryId); - } - else - { - _logger.LogWarning( - "Unable to scan local library {LibraryId}: {Error}", - scanLocalLibrary.LibraryId, - error.Value); - } - }); - - if (entityLocker.IsLibraryLocked(scanLocalLibrary.LibraryId)) - { - entityLocker.UnlockLibrary(scanLocalLibrary.LibraryId); - } - break; - case RebuildSearchIndex rebuildSearchIndex: - await mediator.Send(rebuildSearchIndex, cancellationToken); - break; case DeleteOrphanedArtwork deleteOrphanedArtwork: _logger.LogInformation("Deleting orphaned artwork from the database"); await mediator.Send(deleteOrphanedArtwork, cancellationToken); diff --git a/ErsatzTV/Startup.cs b/ErsatzTV/Startup.cs index 3e737421..b9c7021f 100644 --- a/ErsatzTV/Startup.cs +++ b/ErsatzTV/Startup.cs @@ -459,6 +459,7 @@ public class Startup AddChannel(services); AddChannel(services); AddChannel(services); + AddChannel(services); services.AddScoped(); services.AddScoped(); @@ -552,6 +553,7 @@ public class Startup services.AddHostedService(); services.AddHostedService(); services.AddHostedService(); + services.AddHostedService(); #endif services.AddHostedService(); services.AddHostedService();