因为.Concat()
只订阅当前完成后的下一个内部可观察对象,并且在订阅时,重叠的"下一个"可观察对象已经发出了它的第一个值,没有人消费.
使用.Merge()
或
或.SelectMany(x => x)
而不是合并
或使用.Buffer
或者创建如下所示的缓冲操作符:
public static class RxExtensions
{
public static IObservable<T> CollectUntilSubscribed<T, TUntil>(this IObservable<T> source, IObservable<TUntil> stopMe)
{
var subscribersCount = 0;
var ready = new Subject<Unit>();
var delayed = source
.Delay(x => subscribersCount > 0 ? Observable.Return(Unit.Default):ready)
.Publish()
;
var connection = delayed.Connect();
stopMe.Take(1).Subscribe(_ => connection.Dispose());
return Observable.Create<T>(o =>
{
var sub = delayed.Subscribe(o);
if (Interlocked.Increment(ref subscribersCount) == 1) ready.OnNext(Unit.Default);
return Disposable.Create(() =>
{
Interlocked.Decrement(ref subscribersCount);
sub.Dispose();
});
});
}
}
internal class Program
{
static void Main(string[] args)
{
Observable
.Interval(TimeSpan.FromSeconds(1))
.Window(2,1)
.Select(x => x.CollectUntilSubscribed(Observable.Never<Unit>()))
.Concat()
.Subscribe(Console.WriteLine);
Console.ReadLine();
}
}