C# · 12月 27, 2021

c# – 使用Rx在异步操作上阻止(可能超时)

我正在尝试使用.NET的Reactive Extensions重写一些代码,但是我需要一些关于如何实现我的目标的指导.

我有一个类在一个低级库中封装了一些异步行为.想一想读或写网络的东西.当类启动时,它将尝试连接到环境,并且当成功时,它将通过从工作线程调用来发信号.

我想将这个异步行为变成一个同步调用,并且我已经创建了一个非常简化的例子,下面就是如何实现的:

ManualResetEvent readyEvent = new ManualResetEvent(false);public void Start(TimeSpan timeout) { // Simulate a background process ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1))); // Wait for startup to complete. if (!this.readyEvent.WaitOne(timeout)) throw new TimeoutException();}void AsyncStart(TimeSpan delay) { Thread.Sleep(delay); // Simulate startup delay. this.readyEvent.Set();}

在工作线程上运行AsyncStart只是模拟库的异步行为的一种方法,不是我的真实代码的一部分,低级库提供线程并在回调中调用我的代码.

请注意,如果启动在超时间隔内尚未完成,则Start方法将抛出TimeoutException异常.

我想重写这个代码来使用Rx.这是我的第一个尝试:

Subject<Unit> readySubject = new Subject<Unit>();public void Start(TimeSpan timeout) { ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1))); // Point A – see below this.readySubject.Timeout(timeout).First();}void AsyncStart(TimeSpan delay) { Thread.Sleep(delay); this.readySubject.OnNext(new Unit());}

这是一个体面的尝试,但不幸的是它包含一个种族条件.如果启动快速完成(例如,如果延迟为0),并且如果在A点存在额外的延迟,则OnNext将在First执行之前在readySubject上调用.实质上,IObservable我正在应用超时,首先从来没有看到该启动已经完成,并且将抛出一个TimeoutException.

似乎已经创建了Observable.Defer来处理这样的问题.这是稍微复杂的尝试使用Rx:

Subject<Unit> readySubject = new Subject<Unit>();void Start(TimeSpan timeout) { var ready = Observable.Defer(() => { ThreadPool.QueueUserWorkItem(_ => AsyncStart(TimeSpan.FromSeconds(1))); // Point B – see below return this.readySubject.AsObservable(); }); ready.Timeout(timeout).First();}void AsyncStart(TimeSpan delay) { Thread.Sleep(delay); this.readySubject.OnNext(new Unit());}

现在异步操作不会立即启动,但只有在使用IObservable时才会启动.不幸的是,仍然存在竞争条件,但这次在点B.如果异步操作在Defer lambda返回之前开始调用OnNext,它仍然丢失,并且TimeoutException将被超时抛出.

我知道我可以使用像Replay这样的操作来缓冲事件,但是我没有使用Rx的初始示例不会使用任何缓冲.有没有办法使用Rx来解决我的问题,没有竞争条件?在本例中,在IObservable已连接到本例的情况下才启动异步操作Timeout和First?

基于Paul Betts的答案是工作解决方案:

void Start(TimeSpan timeout) { var readySubject = new AsyncSubject<Unit>(); ThreadPool.QueueUserWorkItem(_ => AsyncStart(readySubject,TimeSpan.FromSeconds(1))); // Point C – see below readySubject.Timeout(timeout).First();}void AsyncStart(ISubject<Unit> readySubject,TimeSpan delay) { Thread.Sleep(delay); readySubject.OnNext(new Unit()); readySubject.OnCompleted();}

有趣的部分是当点C的延迟比AsyncStart完成所需的时间更长时. AsyncSubject保留发送的最后通知,Timeout和First将仍然按预期的方式执行.

解决方法 所以,有一件事要知道Rx我认为很多人首先(包括我自己):如果你使用任何传统的线程功能,如ResetEvents,Thread.Sleeps或任何,你是做错了(tm ) – 这就像在LINQ中将数组投射到数组,因为你知道底层类型恰好是一个数组.

要知道的关键是,异步func由返回IObservable< TResult>的函数表示. – 这是魔术酱,让你在某些事情完成时发出信号.所以这里是如何“Rx-ify”更传统的异步功能,就像您在Silverlight Web服务中看到的那样:

IObservable<byte[]> readFromNetwork(){ var ret = new AsyncSubject(); // Here’s a Traditional async function that you provide a callback to asyncReaderFunc(theFile,buffer => { ret.OnNext(buffer); ret.OnCompleted(); }); return ret;}

This is a decent attempt but unfortunately it contains a race condition.

这就是AsyncSubject所在的地方 – 这样可以确保即使asyncReaderFunc打到“订阅”,AsyncSubject仍然会“重播”发生了什么.

所以,现在我们有了我们的功能,我们可以做很多有趣的事情:

// Make it into a sync functionbyte[] results = readFromNetwork().First();// Keep reading blocks one at a time until we run outreadFromNetwork().Repeat().TakeUntil(x => x == null || x.Length == 0).Subscribe(bytes => { Console.WriteLine(“Read {0} bytes in chunk”,bytes.Length);})// Read the entire stream and get notified when the whole deal is finishedreadFromNetwork() .Repeat().TakeUntil(x => x == null || x.Length == 0) .Aggregate(new MemoryStream(),(ms,bytes) => ms.Write(bytes)) .Subscribe(ms => { Console.WriteLine(“Got {0} bytes in total”,ms.ToArray().Length); });// Or just get the entire thing as a MemoryStream and wait for itvar memoryStream = readFromNetwork() .Repeat().TakeUntil(x => x == null || x.Length == 0) .Aggregate(new MemoryStream(),bytes) => ms.Write(bytes)) .First();