使用使用者

Subject<T> 类型实现 IObservable<T> 和 IObserver<T>,从某种意义上说,它既是观察者又是可观测者。 可以使用主题来订阅所有观察程序,然后将主体订阅到后端数据源。 这样,使用者就可以充当一组订阅者和源的代理。 可以使用主题来实现具有缓存、缓冲和时间转移的自定义可观测对象。 此外,还可以使用主题将数据广播到多个订阅者。

默认情况下,使用者不会跨线程执行任何同步。 它们不采用计划程序,而是假定所有序列化和语法正确性都由主题的调用方处理。  主题只是广播到订阅者线程安全列表中所有订阅的观察者。 这样做有减少开销和提高性能的优点。 但是,如果要使用计划程序同步对观察器的传出调用,则可以使用 Synchronize 方法执行此操作。

使用使用者

在以下示例中,我们将创建一个主题,订阅该主题,然后使用同一主题将值发布到观察程序。 通过这样做,我们会将发布和订阅合并到同一源中。

除了采用 IObserver<T> 外,Subscribe 方法还具有一个重载,该重载对 onNext 执行操作<T> ,这意味着每次发布项时都会执行该操作。 在我们的示例中,每当调用 OnNext 时,项都将写入控制台。

Subject<int> subject = new Subject<int>();
var subscription = subject.Subscribe(
                         x => Console.WriteLine("Value published: {0}", x),
                         () => Console.WriteLine("Sequence Completed."));
subject.OnNext(1);

subject.OnNext(2);

Console.WriteLine("Press any key to continue");
Console.ReadKey();
subject.OnCompleted();
subscription.Dispose();

以下示例演示了主题的代理和广播性质。 我们首先创建一个源序列,该序列每 1 秒生成一个整数。 然后,我们创建一个 Subject,并将其作为观察者传递给源,以便它接收由此源序列推送的所有值。 之后,我们再创建两个订阅,这次是主题作为源。 然后, subSubject1subSubject2 订阅将接收使用者) 从源 (传递的任何值。

var source = Observable.Interval(TimeSpan.FromSeconds(1));
Subject<long> subject = new Subject<long>();
var subSource = source.Subscribe(subject);
var subSubject1 = subject.Subscribe(
                         x => Console.WriteLine("Value published to observer #1: {0}", x),
                         () => Console.WriteLine("Sequence Completed."));
var subSubject2 = subject.Subscribe(
                         x => Console.WriteLine("Value published to observer #2: {0}", x),
                         () => Console.WriteLine("Sequence Completed."));
Console.WriteLine("Press any key to continue");
Console.ReadKey();
subject.OnCompleted();
subSubject1.Dispose();
subSubject2.Dispose();

不同类型的主题

Rx 库中的 Subject<T> 类型是 ISubject<T> 接口的基本实现 (也可以实现 ISubject<T> 接口来创建自己的主题类型) 。 ISubject<T> 的其他实现提供了不同的功能。 所有这些类型都存储通过 OnNext 推送到它们的一些 (或所有) 值,并将其广播回其观察者。 这样,它们就会将热可观测值转换为冷的可观测值。 这意味着,如果多次订阅其中任何一个, (即订阅 -> 取消订阅 - 订阅-> 再次订阅) ,你将再次看到至少一个相同的值。 有关热和冷可观测对象的详细信息,请参阅 创建和订阅简单可观测序列 主题的最后一节。

ReplaySubject 存储它已发布的所有值。 因此,当你订阅它时,你会自动收到它已发布的整个值历史记录,即使你的订阅可能在推送某些值后出现。行为对象类似于 ReplaySubject,只不过它只存储了发布的最后一个值。 在初始化时,行为对象还需要类型为 T 的默认值。 当使用者尚未收到其他值时,此值将发送给观察者。 这意味着,除非主题已完成,否则所有订阅者将在订阅时立即收到一个值。 AsyncSubject 类似于重播和行为主题,但它将仅存储最后一个值,并且仅在序列完成时发布它。 对于可观测源为热且可能在任何观察者订阅之前完成的情况,可以使用 AsyncSubject 类型。 在这种情况下, AsyncSubject 仍可以提供最后一个值,并将其发布到任何将来的订阅者。