Browse Source

introduce ReactiveExtensions to make working with IObservable<T> easier

pull/23/head
Siegfried Pammer 14 years ago
parent
commit
90f6482267
  1. 46
      src/AddIns/Misc/SearchAndReplace/Project/Engine/SearchManager.cs
  2. 13
      src/AddIns/Misc/SearchAndReplace/Project/Gui/DefaultSearchResult.cs
  3. 1
      src/Main/Base/Project/ICSharpCode.SharpDevelop.csproj
  4. 83
      src/Main/Base/Project/Src/Util/ReactiveExtensions.cs

46
src/AddIns/Misc/SearchAndReplace/Project/Engine/SearchManager.cs

@ -134,7 +134,7 @@ namespace SearchAndReplace @@ -134,7 +134,7 @@ namespace SearchAndReplace
delegate {
var list = fileList.ToList();
this.count = list.Count;
Parallel.ForEach(fileList, new ParallelOptions { CancellationToken = monitor.CancellationToken, MaxDegreeOfParallelism = Environment.ProcessorCount }, fileName => SearchFile(fileName, strategy, monitor.CancellationToken));
Parallel.ForEach(list, new ParallelOptions { CancellationToken = monitor.CancellationToken, MaxDegreeOfParallelism = Environment.ProcessorCount }, fileName => SearchFile(fileName, strategy, monitor.CancellationToken));
});
task.ContinueWith(t => { if (t.Exception != null) observer.OnError(t.Exception); else observer.OnCompleted(); this.Dispose(); });
task.Start();
@ -363,8 +363,13 @@ namespace SearchAndReplace @@ -363,8 +363,13 @@ namespace SearchAndReplace
public static void MarkAll(IObservable<SearchResultMatch> results)
{
var marker = new SearchResultMarker();
marker.Registration = results.Subscribe(marker);
int count = 0;
results.ObserveOnUIThread()
.Subscribe(
match => { count++; MarkResult(match, false); },
error => MessageService.ShowException(error),
() => ShowMarkDoneMessage(count)
);
}
static void MarkResult(SearchResultMatch result, bool switchToOpenedView = true)
@ -416,41 +421,6 @@ namespace SearchAndReplace @@ -416,41 +421,6 @@ namespace SearchAndReplace
MessageService.ShowMessage("${res:Dialog.NewProject.SearchReplace.SearchStringNotFound}", "${res:Dialog.NewProject.SearchReplace.SearchStringNotFound.Title}");
}
class SearchResultMarker : IObserver<SearchResultMatch>
{
public IDisposable Registration { get; set; }
int count;
void IObserver<SearchResultMatch>.OnNext(SearchResultMatch value)
{
Interlocked.Increment(ref count);
WorkbenchSingleton.SafeThreadAsyncCall((Action)delegate { MarkResult(value, false); });
}
void IObserver<SearchResultMatch>.OnError(Exception error)
{
MessageService.ShowException(error);
OnCompleted(false);
}
void OnCompleted(bool success)
{
WorkbenchSingleton.SafeThreadCall(
(Action)delegate {
if (Registration != null)
Registration.Dispose();
if (success)
ShowMarkDoneMessage(count);
});
}
void IObserver<SearchResultMatch>.OnCompleted()
{
OnCompleted(true);
}
}
public static void SelectResult(SearchResultMatch result)
{
if (result == null) {

13
src/AddIns/Misc/SearchAndReplace/Project/Gui/DefaultSearchResult.cs

@ -123,7 +123,7 @@ namespace SearchAndReplace @@ -123,7 +123,7 @@ namespace SearchAndReplace
public ISearchResult CreateSearchResult(string title, IObservable<SearchResultMatch> matches)
{
var osr = new ObserverSearchResult(title);
osr.Registration = matches.Subscribe(osr);
osr.Registration = matches.ObserveOnUIThread().Subscribe(osr);
return osr;
}
}
@ -223,7 +223,7 @@ namespace SearchAndReplace @@ -223,7 +223,7 @@ namespace SearchAndReplace
void IObserver<SearchResultMatch>.OnNext(SearchResultMatch value)
{
WorkbenchSingleton.SafeThreadCall((Action)delegate { rootNode.Add(value); });
rootNode.Add(value);
}
void IObserver<SearchResultMatch>.OnError(Exception error)
@ -234,12 +234,9 @@ namespace SearchAndReplace @@ -234,12 +234,9 @@ namespace SearchAndReplace
void OnCompleted()
{
WorkbenchSingleton.SafeThreadCall(
(Action)delegate {
stopButton.Visibility = Visibility.Collapsed;
if (Registration != null)
Registration.Dispose();
});
stopButton.Visibility = Visibility.Collapsed;
if (Registration != null)
Registration.Dispose();
}
void IObserver<SearchResultMatch>.OnCompleted()

1
src/Main/Base/Project/ICSharpCode.SharpDevelop.csproj

@ -740,6 +740,7 @@ @@ -740,6 +740,7 @@
<Compile Include="Src\Services\ProjectService\ProjectLoader.cs" />
<Compile Include="Src\Services\WebProjectService\WebProjectService.cs" />
<Compile Include="Src\Util\FakeXmlViewContent.cs" />
<Compile Include="Src\Util\ReactiveExtensions.cs" />
<Compile Include="Src\Util\TreeNode.cs" />
<Compile Include="Src\Util\NativeMethods.cs" />
<Compile Include="Src\Util\ProcessRunnerException.cs" />

83
src/Main/Base/Project/Src/Util/ReactiveExtensions.cs

@ -0,0 +1,83 @@ @@ -0,0 +1,83 @@
// Copyright (c) AlphaSierraPapa for the SharpDevelop Team (for details please see \doc\copyright.txt)
// This code is distributed under the GNU LGPL (for details please see \doc\license.txt)
using System;
using System.Windows.Threading;
using ICSharpCode.SharpDevelop.Gui;
namespace ICSharpCode.SharpDevelop
{
/// <summary>
/// Provides extension methods for IObservable&lt;T&gt;.
/// </summary>
public static class ReactiveExtensions
{
public static IObservable<T> ObserveOnUIThread<T>(this IObservable<T> source)
{
return new AnonymousObservable<T>(
observer => source.Subscribe(
new AnonymousObserver<T>(
value => WorkbenchSingleton.SafeThreadAsyncCall(delegate { observer.OnNext(value); }),
exception => WorkbenchSingleton.SafeThreadAsyncCall(delegate { observer.OnError(exception); }),
() => WorkbenchSingleton.SafeThreadAsyncCall(delegate { observer.OnCompleted(); })
)
)
);
}
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
return source.Subscribe(new AnonymousObserver<T>(onNext, onError, onCompleted));
}
class AnonymousObservable<T> : IObservable<T>
{
Func<IObserver<T>, IDisposable> subscribe;
public AnonymousObservable(Func<IObserver<T>, IDisposable> subscribe)
{
this.subscribe = subscribe;
}
public IDisposable Subscribe(IObserver<T> observer)
{
return subscribe(observer);
}
}
class AnonymousObserver<T> : IObserver<T>
{
Action<T> onNext;
Action<Exception> onError;
Action onCompleted;
public AnonymousObserver(Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
if (onNext == null)
throw new ArgumentNullException("onNext");
if (onError == null)
throw new ArgumentNullException("onError");
if (onCompleted == null)
throw new ArgumentNullException("onCompleted");
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
}
public void OnNext(T value)
{
onNext(value);
}
public void OnError(Exception error)
{
onError(error);
}
public void OnCompleted()
{
onCompleted();
}
}
}
}
Loading…
Cancel
Save