From 9d7bdd0cf8d885f4dd8dda706c40dbf7ff0fa49e Mon Sep 17 00:00:00 2001 From: Daniel Grunwald Date: Fri, 9 Mar 2012 20:44:08 +0100 Subject: [PATCH] Allow creating an IObservable<> from a function that invokes a callback and signals termination using a Task. --- .../Project/Src/CSharpContextActionDoozer.cs | 7 ++- .../Src/Gui/Dialogs/AsynchronousWaitDialog.cs | 6 ++ .../Base/Project/Src/Gui/IProgressMonitor.cs | 14 +++++ .../Base/Project/Src/Gui/ProgressCollector.cs | 17 ++++-- .../Project/Src/Util/ReactiveExtensions.cs | 55 ++++++++++++++++++- 5 files changed, 89 insertions(+), 10 deletions(-) diff --git a/src/AddIns/BackendBindings/CSharpBinding/Project/Src/CSharpContextActionDoozer.cs b/src/AddIns/BackendBindings/CSharpBinding/Project/Src/CSharpContextActionDoozer.cs index f0223b680a..d6f3b8361e 100644 --- a/src/AddIns/BackendBindings/CSharpBinding/Project/Src/CSharpContextActionDoozer.cs +++ b/src/AddIns/BackendBindings/CSharpBinding/Project/Src/CSharpContextActionDoozer.cs @@ -59,9 +59,9 @@ namespace CSharpBinding return Task.FromResult(false); return Task.Run( async delegate { - var parseInfo = (await context.GetParseInformationAsync().ConfigureAwait(false)) as CSharpFullParseInformation; - if (parseInfo == null) - return false; +// var parseInfo = (await context.GetParseInformationAsync().ConfigureAwait(false)) as CSharpFullParseInformation; +// if (parseInfo == null) +// return false; lock (this) { if (!contextActionCreated) { contextActionCreated = true; @@ -71,6 +71,7 @@ namespace CSharpBinding if (contextAction == null) return false; CSharpAstResolver resolver = await context.GetAstResolverAsync().ConfigureAwait(false); + //var refactoringContext = new SDRefactoringContext(context, resolver, cancellationToken); return true; }, cancellationToken); } diff --git a/src/Main/Base/Project/Src/Gui/Dialogs/AsynchronousWaitDialog.cs b/src/Main/Base/Project/Src/Gui/Dialogs/AsynchronousWaitDialog.cs index 6404c31ce7..7b1967d2ea 100644 --- a/src/Main/Base/Project/Src/Gui/Dialogs/AsynchronousWaitDialog.cs +++ b/src/Main/Base/Project/Src/Gui/Dialogs/AsynchronousWaitDialog.cs @@ -311,6 +311,12 @@ namespace ICSharpCode.SharpDevelop.Gui return collector.ProgressMonitor.CreateSubTask(workAmount); } + /// + public IProgressMonitor CreateSubTask(double workAmount, CancellationToken cancellationToken) + { + return collector.ProgressMonitor.CreateSubTask(workAmount, cancellationToken); + } + public void Dispose() { collector.ProgressMonitor.Dispose(); diff --git a/src/Main/Base/Project/Src/Gui/IProgressMonitor.cs b/src/Main/Base/Project/Src/Gui/IProgressMonitor.cs index b1b9f11598..a6a3ff7e51 100644 --- a/src/Main/Base/Project/Src/Gui/IProgressMonitor.cs +++ b/src/Main/Base/Project/Src/Gui/IProgressMonitor.cs @@ -34,6 +34,15 @@ namespace ICSharpCode.SharpDevelop.Gui /// Multiple child progress monitors can be used at once; even concurrently on multiple threads. IProgressMonitor CreateSubTask(double workAmount); + /// + /// Creates a nested task. + /// + /// The amount of work this sub-task performs in relation to the work of this task. + /// That means, this parameter is used as a scaling factor for work performed within the subtask. + /// A new progress monitor representing the sub-task. + /// Multiple child progress monitors can be used at once; even concurrently on multiple threads. + IProgressMonitor CreateSubTask(double workAmount, CancellationToken cancellationToken); + /// /// Gets/Sets the name to show while the task is active. /// @@ -101,6 +110,11 @@ namespace ICSharpCode.SharpDevelop.Gui return new DummyProgressMonitor() { CancellationToken = this.CancellationToken }; } + public IProgressMonitor CreateSubTask(double workAmount, CancellationToken cancellationToken) + { + return new DummyProgressMonitor() { CancellationToken = cancellationToken }; + } + public void Dispose() { } diff --git a/src/Main/Base/Project/Src/Gui/ProgressCollector.cs b/src/Main/Base/Project/Src/Gui/ProgressCollector.cs index ed75d416e3..5ea0824838 100644 --- a/src/Main/Base/Project/Src/Gui/ProgressCollector.cs +++ b/src/Main/Base/Project/Src/Gui/ProgressCollector.cs @@ -15,7 +15,6 @@ namespace ICSharpCode.SharpDevelop.Gui public sealed class ProgressCollector : INotifyPropertyChanged { readonly ISynchronizeInvoke eventThread; - readonly CancellationToken cancellationToken; readonly MonitorImpl root; readonly LinkedList namedMonitors = new LinkedList(); readonly object updateLock = new object(); @@ -31,8 +30,7 @@ namespace ICSharpCode.SharpDevelop.Gui if (eventThread == null) throw new ArgumentNullException("eventThread"); this.eventThread = eventThread; - this.cancellationToken = cancellationToken; - this.root = new MonitorImpl(this, null, 1); + this.root = new MonitorImpl(this, null, 1, cancellationToken); } public event EventHandler ProgressMonitorDisposed; @@ -213,16 +211,18 @@ namespace ICSharpCode.SharpDevelop.Gui readonly ProgressCollector collector; readonly MonitorImpl parent; readonly double scaleFactor; + readonly CancellationToken cancellationToken; LinkedListNode nameEntry; double currentProgress; OperationStatus localStatus, currentStatus; int childrenWithWarnings, childrenWithErrors; - public MonitorImpl(ProgressCollector collector, MonitorImpl parent, double scaleFactor) + public MonitorImpl(ProgressCollector collector, MonitorImpl parent, double scaleFactor, CancellationToken cancellationToken) { this.collector = collector; this.parent = parent; this.scaleFactor = scaleFactor; + this.cancellationToken = cancellationToken; } public bool ShowingDialog { @@ -254,7 +254,7 @@ namespace ICSharpCode.SharpDevelop.Gui } public CancellationToken CancellationToken { - get { return collector.cancellationToken; } + get { return cancellationToken; } } public double Progress { @@ -317,7 +317,12 @@ namespace ICSharpCode.SharpDevelop.Gui public IProgressMonitor CreateSubTask(double workAmount) { - return new MonitorImpl(collector, this, workAmount); + return new MonitorImpl(collector, this, workAmount, cancellationToken); + } + + public IProgressMonitor CreateSubTask(double workAmount, CancellationToken cancellationToken) + { + return new MonitorImpl(collector, this, workAmount, cancellationToken); } public void Dispose() diff --git a/src/Main/Base/Project/Src/Util/ReactiveExtensions.cs b/src/Main/Base/Project/Src/Util/ReactiveExtensions.cs index e0ae81b2f3..345864b2d1 100644 --- a/src/Main/Base/Project/Src/Util/ReactiveExtensions.cs +++ b/src/Main/Base/Project/Src/Util/ReactiveExtensions.cs @@ -5,8 +5,8 @@ using System; using System.Collections.Generic; using System.Reflection; using System.Threading; +using System.Threading.Tasks; using System.Windows.Threading; - using ICSharpCode.SharpDevelop.Gui; namespace ICSharpCode.SharpDevelop @@ -29,6 +29,59 @@ namespace ICSharpCode.SharpDevelop ); } + public static IObservable CreateObservable(Func, Task> func) + { + return new AnonymousObservable(observer => new TaskToObserverSubscription(func, observer)); + } + + public static IObservable CreateObservable(Func, Task> func, IProgressMonitor progressMonitor) + { + return new AnonymousObservable(observer => new TaskToObserverSubscription(func, progressMonitor, observer)); + } + + sealed class TaskToObserverSubscription : IDisposable + { + readonly CancellationTokenSource cts = new CancellationTokenSource(); + readonly object syncLock = new object(); + readonly IObserver observer; + readonly IProgressMonitor childProgressMonitor; + + public TaskToObserverSubscription(Func, Task> func, IObserver observer) + { + this.observer = observer; + func(cts.Token, Callback).ContinueWith(TaskCompleted); + } + + public TaskToObserverSubscription(Func, Task> func, IProgressMonitor progressMonitor, IObserver observer) + { + this.observer = observer; + this.childProgressMonitor = progressMonitor.CreateSubTask(1, cts.Token); + func(childProgressMonitor, Callback).ContinueWith(TaskCompleted); + } + + void Callback(T item) + { + // Needs lock because callbacks may be called in parallel, but OnNext may not. + lock (syncLock) + observer.OnNext(item); + } + + void TaskCompleted(Task task) + { + if (childProgressMonitor != null) + childProgressMonitor.Dispose(); + if (task.Exception != null) + observer.OnError(task.Exception.InnerExceptions[0]); + else + observer.OnCompleted(); + } + + public void Dispose() + { + cts.Cancel(); + } + } + public static IDisposable Subscribe(this IObservable source, Action onNext, Action onError, Action onCompleted) { return source.Subscribe(new AnonymousObserver(onNext, onError, onCompleted));