Browse Source

rework concurrency (#1199)

pull/1200/head
Jason Dove 2 years ago committed by GitHub
parent
commit
234e93349b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      CHANGELOG.md
  2. 2
      ErsatzTV.Application/Emby/Commands/SynchronizeEmbyLibraries.cs
  3. 2
      ErsatzTV.Application/Emby/Commands/SynchronizeEmbyLibraryById.cs
  4. 8
      ErsatzTV.Application/Emby/Commands/SynchronizeEmbyMediaSourcesHandler.cs
  5. 5
      ErsatzTV.Application/IScannerBackgroundServiceRequest.cs
  6. 2
      ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinAdminUserId.cs
  7. 2
      ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinLibraries.cs
  8. 2
      ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinLibraryById.cs
  9. 10
      ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinMediaSourcesHandler.cs
  10. 8
      ErsatzTV.Application/Libraries/Commands/CreateLocalLibraryHandler.cs
  11. 8
      ErsatzTV.Application/Libraries/Commands/UpdateLocalLibraryHandler.cs
  12. 2
      ErsatzTV.Application/MediaSources/Commands/ScanLocalLibrary.cs
  13. 2
      ErsatzTV.Application/Plex/Commands/SynchronizePlexLibraryById.cs
  14. 2
      ErsatzTV.Application/Search/Commands/RebuildSearchIndex.cs
  15. 6
      ErsatzTV/Pages/EmbyLibrariesEditor.razor
  16. 4
      ErsatzTV/Pages/EmbyMediaSources.razor
  17. 6
      ErsatzTV/Pages/JellyfinLibrariesEditor.razor
  18. 4
      ErsatzTV/Pages/JellyfinMediaSources.razor
  19. 17
      ErsatzTV/Pages/Libraries.razor
  20. 4
      ErsatzTV/Pages/LocalLibraryPathEditor.razor
  21. 5
      ErsatzTV/Pages/PlexLibrariesEditor.razor
  22. 37
      ErsatzTV/Services/EmbyService.cs
  23. 79
      ErsatzTV/Services/JellyfinService.cs
  24. 39
      ErsatzTV/Services/PlexService.cs
  25. 259
      ErsatzTV/Services/ScannerService.cs
  26. 22
      ErsatzTV/Services/SchedulerService.cs
  27. 39
      ErsatzTV/Services/WorkerService.cs
  28. 2
      ErsatzTV/Startup.cs

4
CHANGELOG.md

@ -15,6 +15,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). @@ -15,6 +15,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
- For example, the scanner will now find `movie.MKV` as well as `movie.mkv` on case-sensitive filesystems
- Include multiple `display-name` entries in generated XMLTV
- Plex should now display the channel number instead of the channel id (e.g. `1.2` instead of `1.2.etv`)
- Rework concurrency a bit
- Playouts builds are no longer blocked by library scans
- Adding Trakt lists is no longer blocked by library scans
- All library scans (local and media servers) run sequentially
## [0.7.5-beta] - 2023-03-05
### Added

2
ErsatzTV.Application/Emby/Commands/SynchronizeEmbyLibraries.cs

@ -3,4 +3,4 @@ @@ -3,4 +3,4 @@
namespace ErsatzTV.Application.Emby;
public record SynchronizeEmbyLibraries(int EmbyMediaSourceId) : IRequest<Either<BaseError, Unit>>,
IEmbyBackgroundServiceRequest;
IScannerBackgroundServiceRequest;

2
ErsatzTV.Application/Emby/Commands/SynchronizeEmbyLibraryById.cs

@ -2,7 +2,7 @@ @@ -2,7 +2,7 @@
namespace ErsatzTV.Application.Emby;
public interface ISynchronizeEmbyLibraryById : IRequest<Either<BaseError, string>>, IEmbyBackgroundServiceRequest
public interface ISynchronizeEmbyLibraryById : IRequest<Either<BaseError, string>>, IScannerBackgroundServiceRequest
{
int EmbyLibraryId { get; }
bool ForceScan { get; }

8
ErsatzTV.Application/Emby/Commands/SynchronizeEmbyMediaSourcesHandler.cs

@ -8,15 +8,15 @@ namespace ErsatzTV.Application.Emby; @@ -8,15 +8,15 @@ namespace ErsatzTV.Application.Emby;
public class SynchronizeEmbyMediaSourcesHandler : IRequestHandler<SynchronizeEmbyMediaSources,
Either<BaseError, List<EmbyMediaSource>>>
{
private readonly ChannelWriter<IEmbyBackgroundServiceRequest> _channel;
private readonly ChannelWriter<IScannerBackgroundServiceRequest> _scannerWorkerChannel;
private readonly IMediaSourceRepository _mediaSourceRepository;
public SynchronizeEmbyMediaSourcesHandler(
IMediaSourceRepository mediaSourceRepository,
ChannelWriter<IEmbyBackgroundServiceRequest> channel)
ChannelWriter<IScannerBackgroundServiceRequest> scannerWorkerChannel)
{
_mediaSourceRepository = mediaSourceRepository;
_channel = channel;
_scannerWorkerChannel = scannerWorkerChannel;
}
public async Task<Either<BaseError, List<EmbyMediaSource>>> Handle(
@ -27,7 +27,7 @@ public class SynchronizeEmbyMediaSourcesHandler : IRequestHandler<SynchronizeEmb @@ -27,7 +27,7 @@ public class SynchronizeEmbyMediaSourcesHandler : IRequestHandler<SynchronizeEmb
foreach (EmbyMediaSource mediaSource in mediaSources)
{
// await _channel.WriteAsync(new SynchronizeEmbyAdminUserId(mediaSource.Id), cancellationToken);
await _channel.WriteAsync(new SynchronizeEmbyLibraries(mediaSource.Id), cancellationToken);
await _scannerWorkerChannel.WriteAsync(new SynchronizeEmbyLibraries(mediaSource.Id), cancellationToken);
}
return mediaSources;

5
ErsatzTV.Application/IScannerBackgroundServiceRequest.cs

@ -0,0 +1,5 @@ @@ -0,0 +1,5 @@
namespace ErsatzTV.Application;
public interface IScannerBackgroundServiceRequest
{
}

2
ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinAdminUserId.cs

@ -3,4 +3,4 @@ @@ -3,4 +3,4 @@
namespace ErsatzTV.Application.Jellyfin;
public record SynchronizeJellyfinAdminUserId(int JellyfinMediaSourceId) : IRequest<Either<BaseError, Unit>>,
IJellyfinBackgroundServiceRequest;
IScannerBackgroundServiceRequest;

2
ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinLibraries.cs

@ -3,4 +3,4 @@ @@ -3,4 +3,4 @@
namespace ErsatzTV.Application.Jellyfin;
public record SynchronizeJellyfinLibraries(int JellyfinMediaSourceId) : IRequest<Either<BaseError, Unit>>,
IJellyfinBackgroundServiceRequest;
IScannerBackgroundServiceRequest;

2
ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinLibraryById.cs

@ -3,7 +3,7 @@ @@ -3,7 +3,7 @@
namespace ErsatzTV.Application.Jellyfin;
public interface ISynchronizeJellyfinLibraryById : IRequest<Either<BaseError, string>>,
IJellyfinBackgroundServiceRequest
IScannerBackgroundServiceRequest
{
int JellyfinLibraryId { get; }
bool ForceScan { get; }

10
ErsatzTV.Application/Jellyfin/Commands/SynchronizeJellyfinMediaSourcesHandler.cs

@ -8,15 +8,15 @@ namespace ErsatzTV.Application.Jellyfin; @@ -8,15 +8,15 @@ namespace ErsatzTV.Application.Jellyfin;
public class SynchronizeJellyfinMediaSourcesHandler : IRequestHandler<SynchronizeJellyfinMediaSources,
Either<BaseError, List<JellyfinMediaSource>>>
{
private readonly ChannelWriter<IJellyfinBackgroundServiceRequest> _channel;
private readonly ChannelWriter<IScannerBackgroundServiceRequest> _scannerWorkerChannel;
private readonly IMediaSourceRepository _mediaSourceRepository;
public SynchronizeJellyfinMediaSourcesHandler(
IMediaSourceRepository mediaSourceRepository,
ChannelWriter<IJellyfinBackgroundServiceRequest> channel)
ChannelWriter<IScannerBackgroundServiceRequest> scannerWorkerChannel)
{
_mediaSourceRepository = mediaSourceRepository;
_channel = channel;
_scannerWorkerChannel = scannerWorkerChannel;
}
public async Task<Either<BaseError, List<JellyfinMediaSource>>> Handle(
@ -26,8 +26,8 @@ public class SynchronizeJellyfinMediaSourcesHandler : IRequestHandler<Synchroniz @@ -26,8 +26,8 @@ public class SynchronizeJellyfinMediaSourcesHandler : IRequestHandler<Synchroniz
List<JellyfinMediaSource> mediaSources = await _mediaSourceRepository.GetAllJellyfin();
foreach (JellyfinMediaSource mediaSource in mediaSources)
{
await _channel.WriteAsync(new SynchronizeJellyfinAdminUserId(mediaSource.Id), cancellationToken);
await _channel.WriteAsync(new SynchronizeJellyfinLibraries(mediaSource.Id), cancellationToken);
await _scannerWorkerChannel.WriteAsync(new SynchronizeJellyfinAdminUserId(mediaSource.Id), cancellationToken);
await _scannerWorkerChannel.WriteAsync(new SynchronizeJellyfinLibraries(mediaSource.Id), cancellationToken);
}
return mediaSources;

8
ErsatzTV.Application/Libraries/Commands/CreateLocalLibraryHandler.cs

@ -14,14 +14,14 @@ public class CreateLocalLibraryHandler : LocalLibraryHandlerBase, @@ -14,14 +14,14 @@ public class CreateLocalLibraryHandler : LocalLibraryHandlerBase,
{
private readonly IDbContextFactory<TvContext> _dbContextFactory;
private readonly IEntityLocker _entityLocker;
private readonly ChannelWriter<IBackgroundServiceRequest> _workerChannel;
private readonly ChannelWriter<IScannerBackgroundServiceRequest> _scannerWorkerChannel;
public CreateLocalLibraryHandler(
ChannelWriter<IBackgroundServiceRequest> workerChannel,
ChannelWriter<IScannerBackgroundServiceRequest> scannerWorkerChannel,
IEntityLocker entityLocker,
IDbContextFactory<TvContext> dbContextFactory)
{
_workerChannel = workerChannel;
_scannerWorkerChannel = scannerWorkerChannel;
_entityLocker = entityLocker;
_dbContextFactory = dbContextFactory;
}
@ -44,7 +44,7 @@ public class CreateLocalLibraryHandler : LocalLibraryHandlerBase, @@ -44,7 +44,7 @@ public class CreateLocalLibraryHandler : LocalLibraryHandlerBase,
if (_entityLocker.LockLibrary(localLibrary.Id))
{
await _workerChannel.WriteAsync(new ForceScanLocalLibrary(localLibrary.Id));
await _scannerWorkerChannel.WriteAsync(new ForceScanLocalLibrary(localLibrary.Id));
}
return ProjectToViewModel(localLibrary);

8
ErsatzTV.Application/Libraries/Commands/UpdateLocalLibraryHandler.cs

@ -17,15 +17,15 @@ public class UpdateLocalLibraryHandler : LocalLibraryHandlerBase, @@ -17,15 +17,15 @@ public class UpdateLocalLibraryHandler : LocalLibraryHandlerBase,
private readonly IDbContextFactory<TvContext> _dbContextFactory;
private readonly IEntityLocker _entityLocker;
private readonly ISearchIndex _searchIndex;
private readonly ChannelWriter<IBackgroundServiceRequest> _workerChannel;
private readonly ChannelWriter<IScannerBackgroundServiceRequest> _scannerWorkerChannel;
public UpdateLocalLibraryHandler(
ChannelWriter<IBackgroundServiceRequest> workerChannel,
ChannelWriter<IScannerBackgroundServiceRequest> scannerWorkerChannel,
IEntityLocker entityLocker,
ISearchIndex searchIndex,
IDbContextFactory<TvContext> dbContextFactory)
{
_workerChannel = workerChannel;
_scannerWorkerChannel = scannerWorkerChannel;
_entityLocker = entityLocker;
_searchIndex = searchIndex;
_dbContextFactory = dbContextFactory;
@ -70,7 +70,7 @@ public class UpdateLocalLibraryHandler : LocalLibraryHandlerBase, @@ -70,7 +70,7 @@ public class UpdateLocalLibraryHandler : LocalLibraryHandlerBase,
if ((toAdd.Count > 0 || toRemove.Count > 0) && _entityLocker.LockLibrary(existing.Id))
{
await _workerChannel.WriteAsync(new ForceScanLocalLibrary(existing.Id));
await _scannerWorkerChannel.WriteAsync(new ForceScanLocalLibrary(existing.Id));
}
return ProjectToViewModel(existing);

2
ErsatzTV.Application/MediaSources/Commands/ScanLocalLibrary.cs

@ -2,7 +2,7 @@ @@ -2,7 +2,7 @@
namespace ErsatzTV.Application.MediaSources;
public interface IScanLocalLibrary : IRequest<Either<BaseError, string>>, IBackgroundServiceRequest
public interface IScanLocalLibrary : IRequest<Either<BaseError, string>>, IScannerBackgroundServiceRequest
{
int LibraryId { get; }
bool ForceScan { get; }

2
ErsatzTV.Application/Plex/Commands/SynchronizePlexLibraryById.cs

@ -2,7 +2,7 @@ @@ -2,7 +2,7 @@
namespace ErsatzTV.Application.Plex;
public interface ISynchronizePlexLibraryById : IRequest<Either<BaseError, string>>, IPlexBackgroundServiceRequest
public interface ISynchronizePlexLibraryById : IRequest<Either<BaseError, string>>, IScannerBackgroundServiceRequest
{
int PlexLibraryId { get; }
bool ForceScan { get; }

2
ErsatzTV.Application/Search/Commands/RebuildSearchIndex.cs

@ -1,3 +1,3 @@ @@ -1,3 +1,3 @@
namespace ErsatzTV.Application.Search;
public record RebuildSearchIndex : IRequest, IBackgroundServiceRequest;
public record RebuildSearchIndex : IRequest, IScannerBackgroundServiceRequest;

6
ErsatzTV/Pages/EmbyLibrariesEditor.razor

@ -3,7 +3,7 @@ @@ -3,7 +3,7 @@
@using ErsatzTV.Application.MediaSources
@implements IDisposable
@inject IMediator _mediator
@inject ChannelWriter<IEmbyBackgroundServiceRequest> _channel
@inject ChannelWriter<IScannerBackgroundServiceRequest> _scannerWorkerChannel
<RemoteMediaSourceLibrariesEditor
Id="@Id"
@ -47,8 +47,8 @@ @@ -47,8 +47,8 @@
private async Task<Unit> SynchronizeLibraryByIdIfNeeded(RemoteMediaSourceLibrariesEditor.SynchronizeParameters parameters)
{
await _channel.WriteAsync(new SynchronizeEmbyLibraries(parameters.MediaSourceId), _cts.Token);
await _channel.WriteAsync(new SynchronizeEmbyLibraryByIdIfNeeded(parameters.LibraryId), _cts.Token);
await _scannerWorkerChannel.WriteAsync(new SynchronizeEmbyLibraries(parameters.MediaSourceId), _cts.Token);
await _scannerWorkerChannel.WriteAsync(new SynchronizeEmbyLibraryByIdIfNeeded(parameters.LibraryId), _cts.Token);
return Unit.Default;
}

4
ErsatzTV/Pages/EmbyMediaSources.razor

@ -4,7 +4,7 @@ @@ -4,7 +4,7 @@
@using ErsatzTV.Core.Emby
@implements IDisposable
@inject IEmbySecretStore _embySecretStore
@inject ChannelWriter<IEmbyBackgroundServiceRequest> _channel
@inject ChannelWriter<IScannerBackgroundServiceRequest> _scannerWorkerChannel
<RemoteMediaSources
TViewModel="EmbyMediaSourceViewModel"
@ -26,6 +26,6 @@ @@ -26,6 +26,6 @@
}
private async Task RefreshLibraries(int mediaSourceId) =>
await _channel.WriteAsync(new SynchronizeEmbyLibraries(mediaSourceId), _cts.Token);
await _scannerWorkerChannel.WriteAsync(new SynchronizeEmbyLibraries(mediaSourceId), _cts.Token);
}

6
ErsatzTV/Pages/JellyfinLibrariesEditor.razor

@ -3,7 +3,7 @@ @@ -3,7 +3,7 @@
@using ErsatzTV.Application.MediaSources
@implements IDisposable
@inject IMediator _mediator
@inject ChannelWriter<IJellyfinBackgroundServiceRequest> _channel
@inject ChannelWriter<IScannerBackgroundServiceRequest> _scannerWorkerChannel
<RemoteMediaSourceLibrariesEditor
Id="@Id"
@ -47,8 +47,8 @@ @@ -47,8 +47,8 @@
private async Task<Unit> SynchronizeLibraryByIdIfNeeded(RemoteMediaSourceLibrariesEditor.SynchronizeParameters parameters)
{
await _channel.WriteAsync(new SynchronizeJellyfinLibraries(parameters.MediaSourceId), _cts.Token);
await _channel.WriteAsync(new SynchronizeJellyfinLibraryByIdIfNeeded(parameters.LibraryId), _cts.Token);
await _scannerWorkerChannel.WriteAsync(new SynchronizeJellyfinLibraries(parameters.MediaSourceId), _cts.Token);
await _scannerWorkerChannel.WriteAsync(new SynchronizeJellyfinLibraryByIdIfNeeded(parameters.LibraryId), _cts.Token);
return Unit.Default;
}

4
ErsatzTV/Pages/JellyfinMediaSources.razor

@ -4,7 +4,7 @@ @@ -4,7 +4,7 @@
@using ErsatzTV.Core.Jellyfin
@implements IDisposable
@inject IJellyfinSecretStore _jellyfinSecretStore
@inject ChannelWriter<IJellyfinBackgroundServiceRequest> _channel
@inject ChannelWriter<IScannerBackgroundServiceRequest> _scannerWorkerChannel
<RemoteMediaSources
TViewModel="JellyfinMediaSourceViewModel"
@ -26,6 +26,6 @@ @@ -26,6 +26,6 @@
}
private async Task RefreshLibraries(int mediaSourceId) =>
await _channel.WriteAsync(new SynchronizeJellyfinLibraries(mediaSourceId), _cts.Token);
await _scannerWorkerChannel.WriteAsync(new SynchronizeJellyfinLibraries(mediaSourceId), _cts.Token);
}

17
ErsatzTV/Pages/Libraries.razor

@ -10,10 +10,7 @@ @@ -10,10 +10,7 @@
@implements IDisposable
@inject IMediator _mediator
@inject IEntityLocker _locker
@inject ChannelWriter<IBackgroundServiceRequest> _workerChannel
@inject ChannelWriter<IPlexBackgroundServiceRequest> _plexWorkerChannel
@inject ChannelWriter<IJellyfinBackgroundServiceRequest> _jellyfinWorkerChannel
@inject ChannelWriter<IEmbyBackgroundServiceRequest> _embyWorkerChannel
@inject ChannelWriter<IScannerBackgroundServiceRequest> _scannerWorkerChannel;
@inject ICourier _courier
<MudContainer MaxWidth="MaxWidth.ExtraLarge" Class="pt-8">
@ -113,18 +110,18 @@ @@ -113,18 +110,18 @@
switch (library)
{
case LocalLibraryViewModel:
await _workerChannel.WriteAsync(new ForceScanLocalLibrary(library.Id), _cts.Token);
await _scannerWorkerChannel.WriteAsync(new ForceScanLocalLibrary(library.Id), _cts.Token);
break;
case PlexLibraryViewModel:
await _plexWorkerChannel.WriteAsync(new ForceSynchronizePlexLibraryById(library.Id, deepScan), _cts.Token);
await _scannerWorkerChannel.WriteAsync(new ForceSynchronizePlexLibraryById(library.Id, deepScan), _cts.Token);
break;
case JellyfinLibraryViewModel:
await _jellyfinWorkerChannel.WriteAsync(new SynchronizeJellyfinLibraries(library.MediaSourceId), _cts.Token);
await _jellyfinWorkerChannel.WriteAsync(new ForceSynchronizeJellyfinLibraryById(library.Id, deepScan), _cts.Token);
await _scannerWorkerChannel.WriteAsync(new SynchronizeJellyfinLibraries(library.MediaSourceId), _cts.Token);
await _scannerWorkerChannel.WriteAsync(new ForceSynchronizeJellyfinLibraryById(library.Id, deepScan), _cts.Token);
break;
case EmbyLibraryViewModel:
await _embyWorkerChannel.WriteAsync(new SynchronizeEmbyLibraries(library.MediaSourceId), _cts.Token);
await _embyWorkerChannel.WriteAsync(new ForceSynchronizeEmbyLibraryById(library.Id, deepScan), _cts.Token);
await _scannerWorkerChannel.WriteAsync(new SynchronizeEmbyLibraries(library.MediaSourceId), _cts.Token);
await _scannerWorkerChannel.WriteAsync(new ForceSynchronizeEmbyLibraryById(library.Id, deepScan), _cts.Token);
break;
}

4
ErsatzTV/Pages/LocalLibraryPathEditor.razor

@ -7,7 +7,7 @@ @@ -7,7 +7,7 @@
@inject ISnackbar _snackbar
@inject IMediator _mediator
@inject IEntityLocker _locker
@inject ChannelWriter<IBackgroundServiceRequest> _channel
@inject ChannelWriter<IScannerBackgroundServiceRequest> _scannerWorkerChannel;
<MudContainer MaxWidth="MaxWidth.ExtraLarge" Class="pt-8">
<MudText Typo="Typo.h4" Class="mb-4">@_library.Name - Add Local Library Path</MudText>
@ -80,7 +80,7 @@ @@ -80,7 +80,7 @@
{
if (_locker.LockLibrary(_library.Id))
{
await _channel.WriteAsync(new ScanLocalLibraryIfNeeded(_library.Id), _cts.Token);
await _scannerWorkerChannel.WriteAsync(new ScanLocalLibraryIfNeeded(_library.Id), _cts.Token);
_navigationManager.NavigateTo("media/libraries");
}
});

5
ErsatzTV/Pages/PlexLibrariesEditor.razor

@ -3,7 +3,7 @@ @@ -3,7 +3,7 @@
@using ErsatzTV.Application.MediaSources
@implements IDisposable
@inject IMediator _mediator
@inject ChannelWriter<IPlexBackgroundServiceRequest> _channel
@inject ChannelWriter<IScannerBackgroundServiceRequest> _scannerWorkerChannel
<RemoteMediaSourceLibrariesEditor
Id="@Id"
@ -47,8 +47,7 @@ @@ -47,8 +47,7 @@
private async Task<Unit> SynchronizeLibraryByIdIfNeeded(RemoteMediaSourceLibrariesEditor.SynchronizeParameters parameters)
{
await _channel.WriteAsync(new SynchronizePlexLibraryByIdIfNeeded(parameters.LibraryId), _cts.Token);
await _scannerWorkerChannel.WriteAsync(new SynchronizePlexLibraryByIdIfNeeded(parameters.LibraryId), _cts.Token);
return Unit.Default;
}
}

37
ErsatzTV/Services/EmbyService.cs

@ -4,8 +4,6 @@ using ErsatzTV.Application; @@ -4,8 +4,6 @@ using ErsatzTV.Application;
using ErsatzTV.Application.Emby;
using ErsatzTV.Core;
using ErsatzTV.Core.Domain;
using ErsatzTV.Core.Errors;
using ErsatzTV.Core.Interfaces.Locking;
using MediatR;
namespace ErsatzTV.Services;
@ -55,9 +53,6 @@ public class EmbyService : BackgroundService @@ -55,9 +53,6 @@ public class EmbyService : BackgroundService
case SynchronizeEmbyLibraries synchronizeEmbyLibraries:
requestTask = SynchronizeLibraries(synchronizeEmbyLibraries, cancellationToken);
break;
case ISynchronizeEmbyLibraryById synchronizeEmbyLibraryById:
requestTask = SynchronizeEmbyLibrary(synchronizeEmbyLibraryById, cancellationToken);
break;
default:
throw new NotSupportedException($"Unsupported request type: {request.GetType().Name}");
}
@ -126,36 +121,4 @@ public class EmbyService : BackgroundService @@ -126,36 +121,4 @@ public class EmbyService : BackgroundService
request.EmbyMediaSourceId,
error.Value));
}
private async Task SynchronizeEmbyLibrary(ISynchronizeEmbyLibraryById request, CancellationToken cancellationToken)
{
using IServiceScope scope = _serviceScopeFactory.CreateScope();
IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
IEntityLocker entityLocker = scope.ServiceProvider.GetRequiredService<IEntityLocker>();
Either<BaseError, string> result = await mediator.Send(request, cancellationToken);
result.BiIter(
name => _logger.LogDebug("Done synchronizing emby library {Name}", name),
error =>
{
if (error is ScanIsNotRequired)
{
_logger.LogDebug(
"Scan is not required for emby library {LibraryId} at this time",
request.EmbyLibraryId);
}
else
{
_logger.LogWarning(
"Unable to synchronize emby library {LibraryId}: {Error}",
request.EmbyLibraryId,
error.Value);
}
});
if (entityLocker.IsLibraryLocked(request.EmbyLibraryId))
{
entityLocker.UnlockLibrary(request.EmbyLibraryId);
}
}
}

79
ErsatzTV/Services/JellyfinService.cs

@ -4,8 +4,6 @@ using ErsatzTV.Application; @@ -4,8 +4,6 @@ using ErsatzTV.Application;
using ErsatzTV.Application.Jellyfin;
using ErsatzTV.Core;
using ErsatzTV.Core.Domain;
using ErsatzTV.Core.Errors;
using ErsatzTV.Core.Interfaces.Locking;
using MediatR;
namespace ErsatzTV.Services;
@ -52,15 +50,6 @@ public class JellyfinService : BackgroundService @@ -52,15 +50,6 @@ public class JellyfinService : BackgroundService
case SynchronizeJellyfinMediaSources synchronizeJellyfinMediaSources:
requestTask = SynchronizeSources(synchronizeJellyfinMediaSources, cancellationToken);
break;
case SynchronizeJellyfinAdminUserId synchronizeJellyfinAdminUserId:
requestTask = SynchronizeAdminUserId(synchronizeJellyfinAdminUserId, cancellationToken);
break;
case SynchronizeJellyfinLibraries synchronizeJellyfinLibraries:
requestTask = SynchronizeLibraries(synchronizeJellyfinLibraries, cancellationToken);
break;
case ISynchronizeJellyfinLibraryById synchronizeJellyfinLibraryById:
requestTask = SynchronizeJellyfinLibrary(synchronizeJellyfinLibraryById, cancellationToken);
break;
default:
throw new NotSupportedException($"Unsupported request type: {request.GetType().Name}");
}
@ -115,72 +104,4 @@ public class JellyfinService : BackgroundService @@ -115,72 +104,4 @@ public class JellyfinService : BackgroundService
error.Value);
});
}
private async Task SynchronizeLibraries(SynchronizeJellyfinLibraries request, CancellationToken cancellationToken)
{
using IServiceScope scope = _serviceScopeFactory.CreateScope();
IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
Either<BaseError, Unit> result = await mediator.Send(request, cancellationToken);
result.BiIter(
_ => _logger.LogInformation(
"Successfully synchronized Jellyfin libraries for source {MediaSourceId}",
request.JellyfinMediaSourceId),
error => _logger.LogWarning(
"Unable to synchronize Jellyfin libraries for source {MediaSourceId}: {Error}",
request.JellyfinMediaSourceId,
error.Value));
}
private async Task SynchronizeAdminUserId(
SynchronizeJellyfinAdminUserId request,
CancellationToken cancellationToken)
{
using IServiceScope scope = _serviceScopeFactory.CreateScope();
IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
Either<BaseError, Unit> result = await mediator.Send(request, cancellationToken);
result.BiIter(
_ => _logger.LogInformation(
"Successfully synchronized Jellyfin admin user id for source {MediaSourceId}",
request.JellyfinMediaSourceId),
error => _logger.LogWarning(
"Unable to synchronize Jellyfin admin user id for source {MediaSourceId}: {Error}",
request.JellyfinMediaSourceId,
error.Value));
}
private async Task SynchronizeJellyfinLibrary(
ISynchronizeJellyfinLibraryById request,
CancellationToken cancellationToken)
{
using IServiceScope scope = _serviceScopeFactory.CreateScope();
IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
IEntityLocker entityLocker = scope.ServiceProvider.GetRequiredService<IEntityLocker>();
Either<BaseError, string> result = await mediator.Send(request, cancellationToken);
result.BiIter(
name => _logger.LogDebug("Done synchronizing jellyfin library {Name}", name),
error =>
{
if (error is ScanIsNotRequired)
{
_logger.LogDebug(
"Scan is not required for jellyfin library {LibraryId} at this time",
request.JellyfinLibraryId);
}
else
{
_logger.LogWarning(
"Unable to synchronize jellyfin library {LibraryId}: {Error}",
request.JellyfinLibraryId,
error.Value);
}
});
if (entityLocker.IsLibraryLocked(request.JellyfinLibraryId))
{
entityLocker.UnlockLibrary(request.JellyfinLibraryId);
}
}
}

39
ErsatzTV/Services/PlexService.cs

@ -4,8 +4,6 @@ using ErsatzTV.Application; @@ -4,8 +4,6 @@ using ErsatzTV.Application;
using ErsatzTV.Application.Plex;
using ErsatzTV.Core;
using ErsatzTV.Core.Domain;
using ErsatzTV.Core.Errors;
using ErsatzTV.Core.Interfaces.Locking;
using MediatR;
namespace ErsatzTV.Services;
@ -58,9 +56,6 @@ public class PlexService : BackgroundService @@ -58,9 +56,6 @@ public class PlexService : BackgroundService
case SynchronizePlexLibraries synchronizePlexLibrariesRequest:
requestTask = SynchronizeLibraries(synchronizePlexLibrariesRequest, cancellationToken);
break;
case ISynchronizePlexLibraryById synchronizePlexLibraryById:
requestTask = SynchronizePlexLibrary(synchronizePlexLibraryById, cancellationToken);
break;
default:
throw new NotSupportedException($"Unsupported request type: {request.GetType().Name}");
}
@ -148,38 +143,4 @@ public class PlexService : BackgroundService @@ -148,38 +143,4 @@ public class PlexService : BackgroundService
request.PlexMediaSourceId,
error.Value));
}
private async Task SynchronizePlexLibrary(
ISynchronizePlexLibraryById request,
CancellationToken cancellationToken)
{
using IServiceScope scope = _serviceScopeFactory.CreateScope();
IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
IEntityLocker entityLocker = scope.ServiceProvider.GetRequiredService<IEntityLocker>();
Either<BaseError, string> result = await mediator.Send(request, cancellationToken);
result.BiIter(
name => _logger.LogDebug("Done synchronizing plex library {Name}", name),
error =>
{
if (error is ScanIsNotRequired)
{
_logger.LogDebug(
"Scan is not required for plex library {LibraryId} at this time",
request.PlexLibraryId);
}
else
{
_logger.LogWarning(
"Unable to synchronize plex library {LibraryId}: {Error}",
request.PlexLibraryId,
error.Value);
}
});
if (entityLocker.IsLibraryLocked(request.PlexLibraryId))
{
entityLocker.UnlockLibrary(request.PlexLibraryId);
}
}
}

259
ErsatzTV/Services/ScannerService.cs

@ -0,0 +1,259 @@ @@ -0,0 +1,259 @@
using System.Threading.Channels;
using Bugsnag;
using ErsatzTV.Application;
using ErsatzTV.Application.Emby;
using ErsatzTV.Application.Jellyfin;
using ErsatzTV.Application.MediaSources;
using ErsatzTV.Application.Plex;
using ErsatzTV.Core;
using ErsatzTV.Core.Errors;
using ErsatzTV.Core.Interfaces.Locking;
using MediatR;
namespace ErsatzTV.Services;
public class ScannerService : BackgroundService
{
private readonly ChannelReader<IScannerBackgroundServiceRequest> _channel;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly ILogger<ScannerService> _logger;
public ScannerService(
ChannelReader<IScannerBackgroundServiceRequest> channel,
IServiceScopeFactory serviceScopeFactory,
ILogger<ScannerService> logger)
{
_channel = channel;
_serviceScopeFactory = serviceScopeFactory;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken cancellationToken)
{
try
{
_logger.LogInformation("Scanner service started");
await foreach (IScannerBackgroundServiceRequest request in _channel.ReadAllAsync(cancellationToken))
{
try
{
Task requestTask;
switch (request)
{
case ISynchronizePlexLibraryById synchronizePlexLibraryById:
requestTask = SynchronizePlexLibrary(synchronizePlexLibraryById, cancellationToken);
break;
case SynchronizeJellyfinAdminUserId synchronizeJellyfinAdminUserId:
requestTask = SynchronizeAdminUserId(synchronizeJellyfinAdminUserId, cancellationToken);
break;
case SynchronizeJellyfinLibraries synchronizeJellyfinLibraries:
requestTask = SynchronizeLibraries(synchronizeJellyfinLibraries, cancellationToken);
break;
case ISynchronizeJellyfinLibraryById synchronizeJellyfinLibraryById:
requestTask = SynchronizeJellyfinLibrary(synchronizeJellyfinLibraryById, cancellationToken);
break;
case ISynchronizeEmbyLibraryById synchronizeEmbyLibraryById:
requestTask = SynchronizeEmbyLibrary(synchronizeEmbyLibraryById, cancellationToken);
break;
case IScanLocalLibrary scanLocalLibrary:
requestTask = SynchronizeLocalLibrary(scanLocalLibrary, cancellationToken);
break;
default:
throw new NotSupportedException($"Unsupported request type: {request.GetType().Name}");
}
await requestTask;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "Failed to process scanner background service request");
try
{
using IServiceScope scope = _serviceScopeFactory.CreateScope();
IClient client = scope.ServiceProvider.GetRequiredService<IClient>();
client.Notify(ex);
}
catch (Exception)
{
// do nothing
}
}
}
}
catch (Exception ex) when (ex is TaskCanceledException or OperationCanceledException)
{
_logger.LogInformation("Plex service shutting down");
}
}
private async Task SynchronizeLocalLibrary(IScanLocalLibrary request, CancellationToken cancellationToken)
{
using IServiceScope scope = _serviceScopeFactory.CreateScope();
IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
IEntityLocker entityLocker = scope.ServiceProvider.GetRequiredService<IEntityLocker>();
Either<BaseError, string> scanResult = await mediator.Send(request, cancellationToken);
scanResult.BiIter(
name => _logger.LogDebug(
"Done scanning local library {Library}",
name),
error =>
{
if (error is ScanIsNotRequired)
{
_logger.LogDebug(
"Scan is not required for local library {LibraryId} at this time",
request.LibraryId);
}
else
{
_logger.LogWarning(
"Unable to scan local library {LibraryId}: {Error}",
request.LibraryId,
error.Value);
}
});
if (entityLocker.IsLibraryLocked(request.LibraryId))
{
entityLocker.UnlockLibrary(request.LibraryId);
}
}
private async Task SynchronizePlexLibrary(
ISynchronizePlexLibraryById request,
CancellationToken cancellationToken)
{
using IServiceScope scope = _serviceScopeFactory.CreateScope();
IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
IEntityLocker entityLocker = scope.ServiceProvider.GetRequiredService<IEntityLocker>();
Either<BaseError, string> result = await mediator.Send(request, cancellationToken);
result.BiIter(
name => _logger.LogDebug("Done synchronizing plex library {Name}", name),
error =>
{
if (error is ScanIsNotRequired)
{
_logger.LogDebug(
"Scan is not required for plex library {LibraryId} at this time",
request.PlexLibraryId);
}
else
{
_logger.LogWarning(
"Unable to synchronize plex library {LibraryId}: {Error}",
request.PlexLibraryId,
error.Value);
}
});
if (entityLocker.IsLibraryLocked(request.PlexLibraryId))
{
entityLocker.UnlockLibrary(request.PlexLibraryId);
}
}
private async Task SynchronizeAdminUserId(
SynchronizeJellyfinAdminUserId request,
CancellationToken cancellationToken)
{
using IServiceScope scope = _serviceScopeFactory.CreateScope();
IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
Either<BaseError, Unit> result = await mediator.Send(request, cancellationToken);
result.BiIter(
_ => _logger.LogInformation(
"Successfully synchronized Jellyfin admin user id for source {MediaSourceId}",
request.JellyfinMediaSourceId),
error => _logger.LogWarning(
"Unable to synchronize Jellyfin admin user id for source {MediaSourceId}: {Error}",
request.JellyfinMediaSourceId,
error.Value));
}
private async Task SynchronizeLibraries(SynchronizeJellyfinLibraries request, CancellationToken cancellationToken)
{
using IServiceScope scope = _serviceScopeFactory.CreateScope();
IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
Either<BaseError, Unit> result = await mediator.Send(request, cancellationToken);
result.BiIter(
_ => _logger.LogInformation(
"Successfully synchronized Jellyfin libraries for source {MediaSourceId}",
request.JellyfinMediaSourceId),
error => _logger.LogWarning(
"Unable to synchronize Jellyfin libraries for source {MediaSourceId}: {Error}",
request.JellyfinMediaSourceId,
error.Value));
}
private async Task SynchronizeJellyfinLibrary(
ISynchronizeJellyfinLibraryById request,
CancellationToken cancellationToken)
{
using IServiceScope scope = _serviceScopeFactory.CreateScope();
IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
IEntityLocker entityLocker = scope.ServiceProvider.GetRequiredService<IEntityLocker>();
Either<BaseError, string> result = await mediator.Send(request, cancellationToken);
result.BiIter(
name => _logger.LogDebug("Done synchronizing jellyfin library {Name}", name),
error =>
{
if (error is ScanIsNotRequired)
{
_logger.LogDebug(
"Scan is not required for jellyfin library {LibraryId} at this time",
request.JellyfinLibraryId);
}
else
{
_logger.LogWarning(
"Unable to synchronize jellyfin library {LibraryId}: {Error}",
request.JellyfinLibraryId,
error.Value);
}
});
if (entityLocker.IsLibraryLocked(request.JellyfinLibraryId))
{
entityLocker.UnlockLibrary(request.JellyfinLibraryId);
}
}
private async Task SynchronizeEmbyLibrary(ISynchronizeEmbyLibraryById request, CancellationToken cancellationToken)
{
using IServiceScope scope = _serviceScopeFactory.CreateScope();
IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
IEntityLocker entityLocker = scope.ServiceProvider.GetRequiredService<IEntityLocker>();
Either<BaseError, string> result = await mediator.Send(request, cancellationToken);
result.BiIter(
name => _logger.LogDebug("Done synchronizing emby library {Name}", name),
error =>
{
if (error is ScanIsNotRequired)
{
_logger.LogDebug(
"Scan is not required for emby library {LibraryId} at this time",
request.EmbyLibraryId);
}
else
{
_logger.LogWarning(
"Unable to synchronize emby library {LibraryId}: {Error}",
request.EmbyLibraryId,
error.Value);
}
});
if (entityLocker.IsLibraryLocked(request.EmbyLibraryId))
{
entityLocker.UnlockLibrary(request.EmbyLibraryId);
}
}
}

22
ErsatzTV/Services/SchedulerService.cs

@ -18,28 +18,22 @@ namespace ErsatzTV.Services; @@ -18,28 +18,22 @@ namespace ErsatzTV.Services;
public class SchedulerService : BackgroundService
{
private readonly ChannelWriter<IEmbyBackgroundServiceRequest> _embyWorkerChannel;
private readonly ChannelWriter<IScannerBackgroundServiceRequest> _scannerWorkerChannel;
private readonly IEntityLocker _entityLocker;
private readonly ChannelWriter<IJellyfinBackgroundServiceRequest> _jellyfinWorkerChannel;
private readonly ILogger<SchedulerService> _logger;
private readonly ChannelWriter<IPlexBackgroundServiceRequest> _plexWorkerChannel;
private readonly IServiceScopeFactory _serviceScopeFactory;
private readonly ChannelWriter<IBackgroundServiceRequest> _workerChannel;
public SchedulerService(
IServiceScopeFactory serviceScopeFactory,
ChannelWriter<IBackgroundServiceRequest> workerChannel,
ChannelWriter<IPlexBackgroundServiceRequest> plexWorkerChannel,
ChannelWriter<IJellyfinBackgroundServiceRequest> jellyfinWorkerChannel,
ChannelWriter<IEmbyBackgroundServiceRequest> embyWorkerChannel,
ChannelWriter<IScannerBackgroundServiceRequest> scannerWorkerChannel,
IEntityLocker entityLocker,
ILogger<SchedulerService> logger)
{
_serviceScopeFactory = serviceScopeFactory;
_workerChannel = workerChannel;
_plexWorkerChannel = plexWorkerChannel;
_jellyfinWorkerChannel = jellyfinWorkerChannel;
_embyWorkerChannel = embyWorkerChannel;
_scannerWorkerChannel = scannerWorkerChannel;
_entityLocker = entityLocker;
_logger = logger;
}
@ -200,9 +194,7 @@ public class SchedulerService : BackgroundService @@ -200,9 +194,7 @@ public class SchedulerService : BackgroundService
{
if (_entityLocker.LockLibrary(libraryId))
{
await _workerChannel.WriteAsync(
new ScanLocalLibraryIfNeeded(libraryId),
cancellationToken);
await _scannerWorkerChannel.WriteAsync(new ScanLocalLibraryIfNeeded(libraryId), cancellationToken);
}
}
}
@ -216,7 +208,7 @@ public class SchedulerService : BackgroundService @@ -216,7 +208,7 @@ public class SchedulerService : BackgroundService
{
if (_entityLocker.LockLibrary(library.Id))
{
await _plexWorkerChannel.WriteAsync(
await _scannerWorkerChannel.WriteAsync(
new SynchronizePlexLibraryByIdIfNeeded(library.Id),
cancellationToken);
}
@ -232,7 +224,7 @@ public class SchedulerService : BackgroundService @@ -232,7 +224,7 @@ public class SchedulerService : BackgroundService
{
if (_entityLocker.LockLibrary(library.Id))
{
await _jellyfinWorkerChannel.WriteAsync(
await _scannerWorkerChannel.WriteAsync(
new SynchronizeJellyfinLibraryByIdIfNeeded(library.Id),
cancellationToken);
}
@ -248,7 +240,7 @@ public class SchedulerService : BackgroundService @@ -248,7 +240,7 @@ public class SchedulerService : BackgroundService
{
if (_entityLocker.LockLibrary(library.Id))
{
await _embyWorkerChannel.WriteAsync(
await _scannerWorkerChannel.WriteAsync(
new SynchronizeEmbyLibraryByIdIfNeeded(library.Id),
cancellationToken);
}

39
ErsatzTV/Services/WorkerService.cs

@ -3,13 +3,9 @@ using Bugsnag; @@ -3,13 +3,9 @@ using Bugsnag;
using ErsatzTV.Application;
using ErsatzTV.Application.Maintenance;
using ErsatzTV.Application.MediaCollections;
using ErsatzTV.Application.MediaSources;
using ErsatzTV.Application.Playouts;
using ErsatzTV.Application.Search;
using ErsatzTV.Application.Subtitles;
using ErsatzTV.Core;
using ErsatzTV.Core.Errors;
using ErsatzTV.Core.Interfaces.Locking;
using MediatR;
namespace ErsatzTV.Services;
@ -48,7 +44,6 @@ public class WorkerService : BackgroundService @@ -48,7 +44,6 @@ public class WorkerService : BackgroundService
try
{
IMediator mediator = scope.ServiceProvider.GetRequiredService<IMediator>();
IEntityLocker entityLocker = scope.ServiceProvider.GetRequiredService<IEntityLocker>();
switch (request)
{
@ -63,40 +58,6 @@ public class WorkerService : BackgroundService @@ -63,40 +58,6 @@ public class WorkerService : BackgroundService
buildPlayout.PlayoutId,
error.Value));
break;
case IScanLocalLibrary scanLocalLibrary:
Either<BaseError, string> scanResult = await mediator.Send(
scanLocalLibrary,
cancellationToken);
scanResult.BiIter(
name => _logger.LogDebug(
"Done scanning local library {Library}",
name),
error =>
{
if (error is ScanIsNotRequired)
{
_logger.LogDebug(
"Scan is not required for local library {LibraryId} at this time",
scanLocalLibrary.LibraryId);
}
else
{
_logger.LogWarning(
"Unable to scan local library {LibraryId}: {Error}",
scanLocalLibrary.LibraryId,
error.Value);
}
});
if (entityLocker.IsLibraryLocked(scanLocalLibrary.LibraryId))
{
entityLocker.UnlockLibrary(scanLocalLibrary.LibraryId);
}
break;
case RebuildSearchIndex rebuildSearchIndex:
await mediator.Send(rebuildSearchIndex, cancellationToken);
break;
case DeleteOrphanedArtwork deleteOrphanedArtwork:
_logger.LogInformation("Deleting orphaned artwork from the database");
await mediator.Send(deleteOrphanedArtwork, cancellationToken);

2
ErsatzTV/Startup.cs

@ -459,6 +459,7 @@ public class Startup @@ -459,6 +459,7 @@ public class Startup
AddChannel<IEmbyBackgroundServiceRequest>(services);
AddChannel<IFFmpegWorkerRequest>(services);
AddChannel<ISearchIndexBackgroundServiceRequest>(services);
AddChannel<IScannerBackgroundServiceRequest>(services);
services.AddScoped<IFFmpegVersionHealthCheck, FFmpegVersionHealthCheck>();
services.AddScoped<IFFmpegReportsHealthCheck, FFmpegReportsHealthCheck>();
@ -552,6 +553,7 @@ public class Startup @@ -552,6 +553,7 @@ public class Startup
services.AddHostedService<EmbyService>();
services.AddHostedService<JellyfinService>();
services.AddHostedService<PlexService>();
services.AddHostedService<ScannerService>();
#endif
services.AddHostedService<FFmpegLocatorService>();
services.AddHostedService<WorkerService>();

Loading…
Cancel
Save