除 .NET 事件外,.NET Framework中还存在其他异步数据源。 其中一个是异步方法模式。 在此设计模式中,提供了两种方法。 (通常名为 BeginX) 的方法用于启动计算,并返回传递给第二个方法的 IAsyncResult 句柄 (通常名为 EndX) ,然后检索计算结果。 通常通过实现 AsyncCallback 委托或轮询 IAsyncResult.IsCompleted 来发出完成信号。 遵循此模式的代码通常难以阅读和维护。 在本主题中,我们将演示如何使用 Rx 工厂方法将此类异步数据源转换为可观测序列。
将异步模式转换为可观测序列
.NET 中的许多异步方法都是使用 BeginX 和 EndX 等签名编写的,其中 X 是异步执行的方法名称。 BeginX 采用参数来执行方法,即 AsyncCallback,它是采用 IAsyncResult 且不返回任何内容的操作,最后是对象状态。 EndX 采用从 AsyncCallback 传入的 IAsyncResult 来检索异步调用的值。
Observable 类型的 FromAsyncPattern 运算符包装 Begin 和 End 方法 (作为参数传递给运算符) ,并返回一个函数,该函数采用与 Begin 相同的参数并返回可观测对象。 此可观测对象表示发布单个值的序列,该值是刚刚指定的调用的异步结果。
在以下示例中,我们将使用 IAsyncResult 模式的 Stream 对象的 BeginRead 和 EndRead 转换为返回可观察序列的函数。 对于 FromAsyncPattern 运算符的泛型参数,我们将 BeginRead 的参数类型指定到回调。 由于 EndRead 方法返回值,因此将此类型追加为 FromAsyncPattern 的最终泛型参数。 如果将鼠标悬停在 上 var , read将注意到 FromAsyncPattern 的返回值是具有以下签名的函数委托: Func<byte[], int32,int32, IObservable<int32>>,这意味着此函数采用 3 个参数 (BeginRead) 相同的参数,并返回 IObservable<Int32>。 此 IObservable 包含一个值(EndRead 返回的整数),包含从流中读取的字节数(介于 0 (0) 和请求的字节数之间)。 由于我们现在获取的是 IObservable 而不是 IAsyncResult,因此可以使用所有可用于 Observables 的 LINQ 运算符并订阅、分析或撰写它。
Stream inputStream = Console.OpenStandardInput();
var read = Observable.FromAsyncPattern<byte[], int, int, int>(inputStream.BeginRead, inputStream.EndRead);
byte[] someBytes = new byte[10];
IObservable<int> source = read(someBytes, 0, 10);
IDisposable subscription = source.Subscribe(
                            x => Console.WriteLine("OnNext: {0}", x),
                            ex => Console.WriteLine("OnError: {0}", ex.Message),
                            () => Console.WriteLine("OnCompleted"));
Console.ReadKey();