Browse Source

improvements to plex connection management (#2020)

pull/2023/head
Jason Dove 2 months ago committed by GitHub
parent
commit
2ca722523b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 5
      CHANGELOG.md
  2. 73
      ErsatzTV.Application/Plex/Commands/SynchronizePlexMediaSourcesHandler.cs
  3. 7
      ErsatzTV.Application/Plex/Commands/TryCompletePlexPinFlowHandler.cs
  4. 93
      ErsatzTV.Application/Plex/PlexBaseConnectionHandler.cs
  5. 88
      ErsatzTV.Application/Plex/Queries/GetPlexConnectionParametersHandler.cs
  6. 3
      ErsatzTV.Core/Interfaces/Plex/IPlexServerApiClient.cs
  7. 3
      ErsatzTV.Infrastructure/Plex/IPlexServerApi.cs
  8. 4
      ErsatzTV.Infrastructure/Plex/PlexServerApiClient.cs
  9. 3
      ErsatzTV/Startup.cs

5
CHANGELOG.md

@ -25,6 +25,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). @@ -25,6 +25,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Changed
- Start to make UI minimally responsive (functional on smaller screens)
- Change how ETV determines which address to use for Plex connections
- The active Plex connection (address) will only be cached for 30 seconds
- When the connection is no longer cached, a ping will be sent to the last used address for Plex (the last address that had a successful ping)
- If the ping is successful, the address will be cached for another 30 seconds
- If the ping is not successful, all addresses will be checked again, and the first address to return a successful ping will be cached for 30 seconds
### Fixed
- Fix error message about synchronizing Plex collections from a Plex server that has zero collections

73
ErsatzTV.Application/Plex/Commands/SynchronizePlexMediaSourcesHandler.cs

@ -10,7 +10,7 @@ using Microsoft.Extensions.Logging; @@ -10,7 +10,7 @@ using Microsoft.Extensions.Logging;
namespace ErsatzTV.Application.Plex;
public class SynchronizePlexMediaSourcesHandler : IRequestHandler<SynchronizePlexMediaSources,
public class SynchronizePlexMediaSourcesHandler : PlexBaseConnectionHandler, IRequestHandler<SynchronizePlexMediaSources,
Either<BaseError, List<PlexMediaSource>>>
{
private const string LocalhostUri = "http://localhost:32400";
@ -31,6 +31,7 @@ public class SynchronizePlexMediaSourcesHandler : IRequestHandler<SynchronizePle @@ -31,6 +31,7 @@ public class SynchronizePlexMediaSourcesHandler : IRequestHandler<SynchronizePle
ChannelWriter<IScannerBackgroundServiceRequest> channel,
IEntityLocker entityLocker,
ILogger<SynchronizePlexMediaSourcesHandler> logger)
: base(plexServerApiClient, mediaSourceRepository, logger)
{
_mediaSourceRepository = mediaSourceRepository;
_plexTvApiClient = plexTvApiClient;
@ -102,63 +103,35 @@ public class SynchronizePlexMediaSourcesHandler : IRequestHandler<SynchronizePle @@ -102,63 +103,35 @@ public class SynchronizePlexMediaSourcesHandler : IRequestHandler<SynchronizePle
var toRemove = existing.Connections
.Filter(connection => server.Connections.All(c => c.Uri != connection.Uri)).ToList();
await _mediaSourceRepository.Update(existing, toAdd, toRemove);
await FindConnectionToActivate(existing);
Option<PlexServerAuthToken> maybeToken = await _plexSecretStore.GetServerAuthToken(server.ClientIdentifier);
if (maybeToken.IsNone)
{
_logger.LogError(
"Unable to activate Plex connection for server {Server} without auth token",
server.ServerName);
}
foreach (PlexServerAuthToken token in maybeToken)
{
await FindConnectionToActivate(existing, token);
}
}
if (maybeExisting.IsNone)
{
await _mediaSourceRepository.Add(server);
await FindConnectionToActivate(server);
}
}
private async Task FindConnectionToActivate(PlexMediaSource server)
{
var prioritized = server.Connections
.OrderByDescending(pc => pc.Uri == LocalhostUri)
.ThenByDescending(pc => pc.IsActive)
.ToList();
foreach (PlexConnection connection in server.Connections)
{
connection.IsActive = false;
}
Option<PlexServerAuthToken> maybeToken = await _plexSecretStore.GetServerAuthToken(server.ClientIdentifier);
foreach (PlexServerAuthToken token in maybeToken)
{
foreach (PlexConnection connection in prioritized)
Option<PlexServerAuthToken> maybeToken = await _plexSecretStore.GetServerAuthToken(server.ClientIdentifier);
if (maybeToken.IsNone)
{
try
{
_logger.LogDebug("Attempting to locate to Plex at {Uri}", connection.Uri);
if (await _plexServerApiClient.Ping(connection, token))
{
_logger.LogInformation("Located Plex at {Uri}", connection.Uri);
connection.IsActive = true;
break;
}
}
catch
{
// do nothing
}
_logger.LogError(
"Unable to activate Plex connection for server {Server} without auth token",
server.ServerName);
}
}
if (maybeToken.IsNone)
{
_logger.LogError(
"Unable to activate Plex connection for server {Server} without auth token",
server.ServerName);
}
if (server.Connections.All(c => !c.IsActive))
{
_logger.LogError("Unable to locate Plex");
server.Connections.Head().IsActive = true;
foreach (PlexServerAuthToken token in maybeToken)
{
await FindConnectionToActivate(server, token);
}
}
await _mediaSourceRepository.Update(server, new List<PlexConnection>(), new List<PlexConnection>());
}
}

7
ErsatzTV.Application/Plex/Commands/TryCompletePlexPinFlowHandler.cs

@ -20,14 +20,15 @@ public class TryCompletePlexPinFlowHandler : IRequestHandler<TryCompletePlexPinF @@ -20,14 +20,15 @@ public class TryCompletePlexPinFlowHandler : IRequestHandler<TryCompletePlexPinF
public async Task<Either<BaseError, bool>>
Handle(TryCompletePlexPinFlow request, CancellationToken cancellationToken)
{
var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2));
CancellationToken token = cts.Token;
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(2));
using var linkedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cts.Token, cancellationToken);
CancellationToken token = linkedTokenSource.Token;
while (!token.IsCancellationRequested)
{
bool result = await _plexTvApiClient.TryCompletePinFlow(request.AuthPin);
if (result)
{
await _channel.WriteAsync(new SynchronizePlexMediaSources(), cancellationToken);
await _channel.WriteAsync(new SynchronizePlexMediaSources(), token);
return true;
}

93
ErsatzTV.Application/Plex/PlexBaseConnectionHandler.cs

@ -0,0 +1,93 @@ @@ -0,0 +1,93 @@
using System.Collections.Concurrent;
using ErsatzTV.Core.Domain;
using ErsatzTV.Core.Interfaces.Plex;
using ErsatzTV.Core.Interfaces.Repositories;
using ErsatzTV.Core.Plex;
using Microsoft.Extensions.Logging;
namespace ErsatzTV.Application.Plex;
public abstract class PlexBaseConnectionHandler(
IPlexServerApiClient plexServerApiClient,
IMediaSourceRepository mediaSourceRepository,
ILogger logger)
{
protected async Task<Option<PlexConnection>> FindConnectionToActivate(PlexMediaSource server, PlexServerAuthToken token)
{
Option<PlexConnection> result = Option<PlexConnection>.None;
foreach (PlexConnection connection in server.Connections)
{
connection.IsActive = false;
}
using var cts = new CancellationTokenSource();
ConcurrentDictionary<PlexConnection, TimeSpan> successfulTimes = new();
var tasks = server.Connections
.Map(connection => PingPlexConnection(connection, token, successfulTimes, cts.Token))
.ToList();
while (tasks.Count > 0)
{
Task completed = await Task.WhenAny(tasks);
if (completed.IsCompletedSuccessfully)
{
if (!successfulTimes.IsEmpty)
{
await cts.CancelAsync();
break;
}
}
tasks.Remove(completed);
}
Option<PlexConnection> maybeBest = successfulTimes.OrderByDescending(kv => kv.Value).Select(kvp => kvp.Key).HeadOrNone();
foreach (PlexConnection connection in maybeBest)
{
connection.IsActive = true;
}
if (server.Connections.All(c => !c.IsActive))
{
logger.LogError("Unable to locate Plex");
server.Connections.Head().IsActive = true;
}
await mediaSourceRepository.Update(server, [], []);
return result;
}
private async Task PingPlexConnection(PlexConnection connection, PlexServerAuthToken token, ConcurrentDictionary<PlexConnection, TimeSpan> successfulTimes, CancellationToken cancellationToken)
{
try
{
logger.LogDebug("Attempting to locate to Plex at {Uri}", connection.Uri);
var sw = new System.Diagnostics.Stopwatch();
sw.Start();
bool pingResult = await plexServerApiClient.Ping(connection, token, cancellationToken);
sw.Stop();
if (pingResult)
{
logger.LogInformation(
"Located Plex at {Uri} in {Milliseconds} ms",
connection.Uri,
sw.ElapsedMilliseconds);
successfulTimes.TryAdd(connection, sw.Elapsed);
}
else
{
logger.LogDebug(
"Unable to locate Plex at {Uri} after {Milliseconds} ms",
connection.Uri,
sw.ElapsedMilliseconds);
}
}
catch
{
// do nothing
}
}
}

88
ErsatzTV.Application/Plex/Queries/GetPlexConnectionParametersHandler.cs

@ -4,24 +4,32 @@ using ErsatzTV.Core.Interfaces.Plex; @@ -4,24 +4,32 @@ using ErsatzTV.Core.Interfaces.Plex;
using ErsatzTV.Core.Interfaces.Repositories;
using ErsatzTV.Core.Plex;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.Logging;
namespace ErsatzTV.Application.Plex;
public class GetPlexConnectionParametersHandler : IRequestHandler<GetPlexConnectionParameters,
public class GetPlexConnectionParametersHandler : PlexBaseConnectionHandler, IRequestHandler<GetPlexConnectionParameters,
Either<BaseError, PlexConnectionParametersViewModel>>
{
private readonly IMediaSourceRepository _mediaSourceRepository;
private readonly IMemoryCache _memoryCache;
private readonly IPlexServerApiClient _plexServerApiClient;
private readonly IPlexSecretStore _plexSecretStore;
private readonly ILogger<GetPlexConnectionParametersHandler> _logger;
public GetPlexConnectionParametersHandler(
IMemoryCache memoryCache,
IPlexServerApiClient plexServerApiClient,
IMediaSourceRepository mediaSourceRepository,
IPlexSecretStore plexSecretStore)
IPlexSecretStore plexSecretStore,
ILogger<GetPlexConnectionParametersHandler> logger)
: base(plexServerApiClient, mediaSourceRepository, logger)
{
_memoryCache = memoryCache;
_plexServerApiClient = plexServerApiClient;
_mediaSourceRepository = mediaSourceRepository;
_plexSecretStore = plexSecretStore;
_logger = logger;
}
public async Task<Either<BaseError, PlexConnectionParametersViewModel>> Handle(
@ -33,55 +41,43 @@ public class GetPlexConnectionParametersHandler : IRequestHandler<GetPlexConnect @@ -33,55 +41,43 @@ public class GetPlexConnectionParametersHandler : IRequestHandler<GetPlexConnect
return parameters;
}
Either<BaseError, PlexConnectionParametersViewModel> maybeParameters =
await Validate(request)
.MapT(
cp => new PlexConnectionParametersViewModel(
new Uri(cp.ActiveConnection.Uri),
cp.PlexServerAuthToken.AuthToken))
.Map(v => v.ToEither<PlexConnectionParametersViewModel>());
return maybeParameters.Match(
p =>
Option<PlexMediaSource> maybeMediaSource = await _mediaSourceRepository.GetPlex(request.PlexMediaSourceId);
foreach (PlexMediaSource mediaSource in maybeMediaSource)
{
Option<PlexServerAuthToken> maybeToken =
await _plexSecretStore.GetServerAuthToken(mediaSource.ClientIdentifier);
foreach (PlexServerAuthToken token in maybeToken)
{
_memoryCache.Set(request, p, TimeSpan.FromHours(1));
return maybeParameters;
},
error => error);
}
// try to keep the same connection
Option<PlexConnection> maybeActiveConnection = mediaSource.Connections.Filter(c => c.IsActive).HeadOrNone();
foreach (PlexConnection activeConnection in maybeActiveConnection)
{
if (await _plexServerApiClient.Ping(activeConnection, token, cancellationToken))
{
_logger.LogDebug("Plex connection is still active at {Uri}", activeConnection.Uri);
var p = new PlexConnectionParametersViewModel(new Uri(activeConnection.Uri), token.AuthToken);
_memoryCache.Set(request, p, TimeSpan.FromSeconds(30));
return p;
}
}
private Task<Validation<BaseError, ConnectionParameters>> Validate(GetPlexConnectionParameters request) =>
PlexMediaSourceMustExist(request)
.BindT(MediaSourceMustHaveActiveConnection)
.BindT(MediaSourceMustHaveToken);
_logger.LogInformation("Plex connection is no longer active, searching for a new connection");
private Task<Validation<BaseError, PlexMediaSource>> PlexMediaSourceMustExist(
GetPlexConnectionParameters request) =>
_mediaSourceRepository.GetPlex(request.PlexMediaSourceId)
.Map(
v => v.ToValidation<BaseError>(
$"Plex media source {request.PlexMediaSourceId} does not exist."));
// check all connections for a working one
Option<PlexConnection> maybeConnection = await FindConnectionToActivate(mediaSource, token);
foreach (PlexConnection connection in maybeConnection)
{
var p = new PlexConnectionParametersViewModel(new Uri(connection.Uri), token.AuthToken);
_memoryCache.Set(request, p, TimeSpan.FromMinutes(30));
return p;
}
private Validation<BaseError, ConnectionParameters> MediaSourceMustHaveActiveConnection(
PlexMediaSource plexMediaSource)
{
Option<PlexConnection> maybeConnection =
plexMediaSource.Connections.SingleOrDefault(c => c.IsActive);
return maybeConnection.Map(connection => new ConnectionParameters(plexMediaSource, connection))
.ToValidation<BaseError>("Plex media source requires an active connection");
}
return BaseError.New($"Plex media source {request.PlexMediaSourceId} requires an active connection");
}
private async Task<Validation<BaseError, ConnectionParameters>> MediaSourceMustHaveToken(
ConnectionParameters connectionParameters)
{
Option<PlexServerAuthToken> maybeToken = await
_plexSecretStore.GetServerAuthToken(connectionParameters.PlexMediaSource.ClientIdentifier);
return maybeToken.Map(token => connectionParameters with { PlexServerAuthToken = token })
.ToValidation<BaseError>("Plex media source requires a token");
}
return BaseError.New($"Plex media source {request.PlexMediaSourceId} requires a token");
}
private sealed record ConnectionParameters(PlexMediaSource PlexMediaSource, PlexConnection ActiveConnection)
{
public PlexServerAuthToken PlexServerAuthToken { get; set; }
return BaseError.New($"Plex media source {request.PlexMediaSourceId} does not exist.");
}
}

3
ErsatzTV.Core/Interfaces/Plex/IPlexServerApiClient.cs

@ -7,7 +7,8 @@ public interface IPlexServerApiClient @@ -7,7 +7,8 @@ public interface IPlexServerApiClient
{
Task<bool> Ping(
PlexConnection connection,
PlexServerAuthToken token);
PlexServerAuthToken token,
CancellationToken cancellationToken);
Task<Either<BaseError, List<PlexLibrary>>> GetLibraries(
PlexConnection connection,

3
ErsatzTV.Infrastructure/Plex/IPlexServerApi.cs

@ -9,7 +9,8 @@ public interface IPlexServerApi @@ -9,7 +9,8 @@ public interface IPlexServerApi
[Headers("Accept: text/xml")]
public Task<PlexXmlMediaContainerPingResponse> Ping(
[Query] [AliasAs("X-Plex-Token")]
string token);
string token,
CancellationToken cancellationToken);
[Get("/library/sections")]
[Headers("Accept: application/json")]

4
ErsatzTV.Infrastructure/Plex/PlexServerApiClient.cs

@ -23,12 +23,12 @@ public class PlexServerApiClient : IPlexServerApiClient @@ -23,12 +23,12 @@ public class PlexServerApiClient : IPlexServerApiClient
_logger = logger;
}
public async Task<bool> Ping(PlexConnection connection, PlexServerAuthToken token)
public async Task<bool> Ping(PlexConnection connection, PlexServerAuthToken token, CancellationToken cancellationToken)
{
try
{
IPlexServerApi service = XmlServiceFor(connection.Uri, TimeSpan.FromSeconds(5));
PlexXmlMediaContainerPingResponse pingResult = await service.Ping(token.AuthToken);
PlexXmlMediaContainerPingResponse pingResult = await service.Ping(token.AuthToken, cancellationToken);
return token.ClientIdentifier == pingResult.MachineIdentifier;
}
catch (Exception)

3
ErsatzTV/Startup.cs

@ -559,7 +559,8 @@ public class Startup @@ -559,7 +559,8 @@ public class Startup
app.Use(
async (context, next) =>
{
if (!context.Request.Path.StartsWithSegments("/iptv") &&
if (!context.Request.Host.Value.StartsWith("localhost", StringComparison.OrdinalIgnoreCase) &&
!context.Request.Path.StartsWithSegments("/iptv") &&
context.Connection.LocalPort != Settings.UiPort)
{
context.Response.StatusCode = 404;

Loading…
Cancel
Save