C# · 12月 27, 2021

c# – 如何在没有引入竞争条件的情况下等待RX主题的回复?

我有一个服务允许来电者发送命令和异步接收响应.在一个实际的应用中,这些动作相当断开(一些动作会发送一个命令,响应将被独立处理).

但是,在我的测试中,我需要发送一个命令,然后在继续测试之前等待(第一个)响应.

响应使用RX发布,我的第一次尝试代码是这样的:

service.SendCommand(“BLAH”);await service.Responses.FirstAsync();

这个问题是,FirstAsync只有在响应到来之后才能工作,等待已经被打了.如果服务流程非常快,那么测试就会等待着.

我的下一次尝试解决这个问题是在发送命令之前调用FirstAsync(),以便即使在等待之前到达,也会产生结果:

var firstResponse = service.Responses.FirstAsync();service.SendCommand(“BLAH”);await firstResponse;

然而,这仍然以同样的方式失败.似乎只有当等待命中(GetAwaiter被调用)它才开始收听;所以存在完全相同的竞争条件.

如果我将主题改为具有缓冲区(或计时器)的ReplaySubject,那么我可以“解决”这个;然而在我的生产课上这样做是没有意义的;它只会用于测试.

在RX中能做到这一点的“正确”方法是什么?如何设置一个将不会引入竞争条件的方式在流上接收第一个事件的东西?

这是一个小测试,以“单线程”的方式说明问题.这个测试会不定期地停止:

[Fact]public async Task Mytest(){ var x = new Subject<bool>(); // Subscribe to the first bool (but don’t await it yet) var firstBool = x.FirstAsync(); // Send the first bool x.OnNext(true); // Await the task that receives the first bool var b = await firstBool; // <– hangs here; presumably because firstBool didn’t start monitoring until GetAwaiter was called? Assert.Equal(true,b);}

我甚至尝试在我的测试中调用Replay(),它会缓冲结果;但这并不改变任何事情:

[Fact]public async Task Mytest(){ var x = new Subject<bool>(); var firstBool = x.Replay(); // Send the first bool x.OnNext(true); // Await the task that receives the first bool var b = await firstBool.FirstAsync(); // <– Still hangs here Assert.Equal(true,b);}解决方法 您可以使用AsyncSubject来执行此操作 [Fact]public async Task Mytest(){ var x = new Subject<bool>(); var firstBool = x.FirstAsync().PublishLast(); // PublishLast wraps an AsyncSubject firstBool.Connect(); // Send the first bool x.OnNext(true); // Await the task that receives the first bool var b = await firstBool; Assert.Equal(true,b);}

AsyncSubject基本上缓存在调用OnComplete之前的最后接收的值,然后重播它.