无需手动实现 IObservable<T> 接口就可以创建可观测序列。 同样,无需实现 IObserver<T> 来订阅序列。 通过安装反应扩展程序集,可以利用 可观测 类型,该类型提供了许多静态 LINQ 运算符,以便创建包含零个、一个或多个元素的简单序列。 此外,Rx 还提供订阅扩展方法,这些方法在委托方面采用 OnNext、OnError 和 OnCompleted 处理程序的各种组合。
创建和订阅简单序列
以下示例使用 Observable 类型的 Range 运算符创建简单的可观测数字集合。 观察者使用 Observable 类的 Subscribe 方法订阅此集合,并提供作为处理 OnNext、OnError 和 OnCompleted 的委托的操作。
Range 运算符有多个重载。 在我们的示例中,它创建一个整数序列,该序列以 x 开头,然后生成 y 顺序数。
订阅发生后,值将立即发送到观察程序。 然后,OnNext 委托输出值。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace SimpleSequence
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<int> source = Observable.Range(1, 10);
            IDisposable subscription = source.Subscribe(
                x => Console.WriteLine("OnNext: {0}", x),
                ex => Console.WriteLine("OnError: {0}", ex.Message),
                () => Console.WriteLine("OnCompleted"));
            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();
            subscription.Dispose();
        }
    }
}
当观察者订阅可观测序列时,调用 Subscribe 方法的线程可能不同于运行序列直到完成的线程。 因此,订阅调用是异步的,因为直到对序列的观察完成,调用方才会被阻止。 使用计划程序主题中将详细介绍这一点。
请注意,Subscribe 方法返回 IDisposable,以便你可以取消订阅序列并轻松释放它。 在可观测序列上调用 Dispose 方法时,观察程序将停止侦听可观测数据。  通常,无需显式调用 Dispose,除非需要提前取消订阅,或者源可观测序列的生存期比观察者更长。 Rx 中的订阅设计用于无需使用终结器即可触发即忘记方案。 当垃圾回收器收集 IDisposable 实例时,Rx 不会自动释放订阅。 但请注意,可观测运算符的默认行为是尽快释放订阅 (即,当 OnCompleted 或 OnError 消息发布) 。 例如,代码 var x = Observable.Zip(a,b).Subscribe(); 会将 x 订阅到序列 a 和 b。 如果 引发错误,x 将立即取消订阅 b。
还可以调整代码示例以使用 Observable 类型的 Create 运算符,该运算符从指定的 OnNext、OnError 和 OnCompleted 操作委托创建并返回观察程序。 然后,可以将此观察程序传递到 可观测 类型的 Subscribe 方法。 以下示例演示如何执行此操作。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
namespace SimpleSequence
{
    class Program
    {
        static void Main(string[] args)
        {
            IObservable<int> source = Observable.Range(1, 10);
            IObserver<int> obsvr = Observer.Create<int>(
                x => Console.WriteLine("OnNext: {0}", x),
                ex => Console.WriteLine("OnError: {0}", ex.Message),
                () => Console.WriteLine("OnCompleted"));
            IDisposable subscription = source.Subscribe(obsvr);
            Console.WriteLine("Press ENTER to unsubscribe...");
            Console.ReadLine();
            subscription.Dispose();
       }
    }
}
除了从头开始创建可观察序列外,还可以将现有枚举器、.NET 事件和异步模式转换为可观察序列。 本部分中的其他主题将介绍如何执行此操作。
请注意,本主题仅介绍一些可以从头开始创建可观察序列的运算符。 若要详细了解其他 LINQ 运算符,请参阅 使用 LINQ 运算符查询可观测序列。
使用计时器
以下示例使用 Timer 运算符创建序列。 序列将在 5 秒过后推送第一个值,然后每隔 1 秒推送一次后续值。 出于说明目的,我们将 Timestamp 运算符链接到查询,以便推送出的每个值都将在发布时追加。 通过这样做,当我们订阅此源序列时,我们可以同时接收其值和时间戳。
Console.WriteLine(“Current Time: “ + DateTime.Now);
var source = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1))
                       .Timestamp();
using (source.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
      {
           Console.WriteLine("Press any key to unsubscribe");
           Console.ReadKey();
      }
Console.WriteLine("Press any key to exit");
Console.ReadKey();
输出将类似于下面:
Current Time: 5/31/2011 5:35:08 PM
Press any key to unsubscribe
0: 5/31/2011 5:35:13 PM -07:00
1: 5/31/2011 5:35:14 PM -07:00
2: 5/31/2011 5:35:15 PM -07:00
通过使用 Timestamp 运算符,我们已验证第一项确实在序列开始 5 秒后推送,并且每个项在 1 秒后发布。
将可枚举集合转换为可观测序列
使用 ToObservable 运算符,可以将泛型可枚举集合转换为可观测序列并订阅该集合。
IEnumerable<int> e = new List<int> { 1, 2, 3, 4, 5 };
IObservable<int> source = e.ToObservable();
IDisposable subscription = source.Subscribe(
                            x => Console.WriteLine("OnNext: {0}", x),
                            ex => Console.WriteLine("OnError: {0}", ex.Message),
                            () => Console.WriteLine("OnCompleted"));
Console.ReadKey();
冷与热可观测值
冷可观测值在订阅时开始运行,即可观测序列仅在调用 Subscription 时开始向观察器推送值。 值也不在订阅者之间共享。 这不同于鼠标移动事件或股票股票代码等热门可观测值,它们甚至在订阅处于活动状态之前就已生成值。 当观察者订阅热可观测序列时,它将在流中获取当前值。 热可观测序列在所有订阅服务器之间共享,每个订阅服务器将推送到序列中的下一个值。 例如,即使没有人订阅特定的股票代码,股票代码将继续根据市场走势更新其价值。 当订阅者注册此时钟周期的兴趣时,它将自动获取最新的时钟周期。
以下示例演示了一个冷可观测序列。 在此示例中,我们使用 Interval 运算符创建一个简单的可观察序列,这些数字按特定间隔抽出一次,在本例中,每隔 1 秒抽出一次。
然后,两个观察者订阅此序列并输出其值。 你会注意到,将为每个订阅服务器重置序列,其中第二个订阅将从第一个值重启序列。
IObservable<int> source = Observable.Interval(TimeSpan.FromSeconds(1));   
IDisposable subscription1 = source.Subscribe(
                x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 1: OnCompleted"));
IDisposable subscription2 = source.Subscribe(
                x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                () => Console.WriteLine("Observer 2: OnCompleted"));
Console.WriteLine("Press any key to unsubscribe");
Console.ReadLine();
subscription1.Dispose();
subscription2.Dispose();
在以下示例中,我们使用 Publish 运算符将上一个冷可观测序列转换为热序列 source ,该运算符返回名为 hot的 IConnectableObservable 实例。 Publish 运算符通过向多个订阅者广播单个订阅来提供共享订阅的机制。 
              hot 充当代理并订阅 source,然后在它从 source接收值时将它们推送到其自己的订阅者。 为了建立支持 source 订阅并开始接收值,我们使用 IConnectableObservable.Connect () 方法。 由于 IConnectableObservable 继承 IObservable,因此我们可以使用订阅来订阅此热序列,甚至在它开始运行之前。 请注意,在示例中,热序列在订阅时 subscription1 尚未启动。 因此,不会将任何值推送到订阅服务器。 调用 Connect 后,值随后推送到 subscription1。 延迟 3 秒后, subscription2 订阅 hot 并立即开始接收当前位置 (3 的值,在本例中) 直到结束。 输出如下所示:
Current Time: 6/1/2011 3:38:49 PM
Current Time after 1st subscription: 6/1/2011 3:38:49 PM
Current Time after Connect: 6/1/2011 3:38:52 PM
Observer 1: OnNext: 0
Observer 1: OnNext: 1
Current Time just before 2nd subscription: 6/1/2011 3:38:55 PM 
Observer 1: OnNext: 2
Observer 1: OnNext: 3
Observer 2: OnNext: 3
Observer 1: OnNext: 4
Observer 2: OnNext: 4
       
Console.WriteLine("Current Time: " + DateTime.Now);
var source = Observable.Interval(TimeSpan.FromSeconds(1));            //creates a sequence
IConnectableObservable<long> hot = Observable.Publish<long>(source);  // convert the sequence into a hot sequence
IDisposable subscription1 = hot.Subscribe(                        // no value is pushed to 1st subscription at this point
                            x => Console.WriteLine("Observer 1: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 1: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 1: OnCompleted"));
Console.WriteLine("Current Time after 1st subscription: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
hot.Connect();       // hot is connected to source and starts pushing value to subscribers 
Console.WriteLine("Current Time after Connect: " + DateTime.Now);
Thread.Sleep(3000);  //idle for 3 seconds
Console.WriteLine("Current Time just before 2nd subscription: " + DateTime.Now);
IDisposable subscription2 = hot.Subscribe(     // value will immediately be pushed to 2nd subscription
                            x => Console.WriteLine("Observer 2: OnNext: {0}", x),
                            ex => Console.WriteLine("Observer 2: OnError: {0}", ex.Message),
                            () => Console.WriteLine("Observer 2: OnCompleted"));
Console.ReadKey();