Window
定期将来自原始Observable的数据分解为一个Observable窗口,发射这些窗口,而不是每次发射一项数据
Window
和Buffer
类似,但不是发射来自原始Observable的数据包,它发射的是Observables,这些Observables中的每一个都发射原始Observable数据的一个子集,最后发射一个onCompleted
通知。
和Buffer
一样,Window
有很多变体,每一种都以自己的方式将原始Observable分解为多个作为结果的Observable,每一个都包含一个映射原始数据的window
。用Window
操作符的术语描述就是,当一个窗口打开(when a window "opens")意味着一个新的Observable已经发射(产生)了,而且这个Observable开始发射来自原始Observable的数据;当一个窗口关闭(when a window "closes")意味着发射(产生)的Observable停止发射原始Observable的数据,并且发射终止通知onCompleted
给它的观察者们。
在RxJava中有许多种Window
操作符的变体。
window(closingSelector)
window
的这个变体会立即打开它的第一个窗口。每当它观察到closingSelector
返回的Observable发射了一个对象时,它就关闭当前打开的窗口并立即打开一个新窗口。用这个方法,这种window
变体发射一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据是一一对应的。
- Javadoc: window(Func0))
window(windowOpenings, closingSelector)
无论何时,只要window
观察到windowOpenings
这个Observable发射了一个Opening
对象,它就打开一个窗口,并且同时调用closingSelector
生成一个与那个窗口关联的关闭(closing)Observable。当这个关闭(closing)Observable发射了一个对象时,window
操作符就会关闭那个窗口。对这个变体来说,由于当前窗口的关闭和新窗口的打开是由单独的Observable管理的,它创建的窗口可能会存在重叠(重复某些来自原始Observable的数据)或间隙(丢弃某些来自原始Observable的数据)。
- Javadoc: window(Observable,Func1))
window(count)
这个window
的变体立即打开它的第一个窗口。每当当前窗口发射了count
项数据,它就关闭当前窗口并打开一个新窗口。如果从原始Observable收到了onError
或onCompleted
通知它也会关闭当前窗口。这种window
变体发射一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据是一一对应的。
- Javadoc: window(int))
window(count, skip)
这个window
的变体立即打开它的第一个窗口。原始Observable每发射skip
项数据它就打开一个新窗口(例如,如果skip
等于3,每到第三项数据,它会打开一耳光新窗口)。每当当前窗口发射了count
项数据,它就关闭当前窗口并打开一个新窗口。如果从原始Observable收到了onError
或onCompleted
通知它也会关闭当前窗口。如果skip=count
,它的行为与window(source, count)
相同;如果skip < count
,窗口可会有count - skip
个重叠的数据;如果skip > count
,在两个窗口之间会有skip - count
项数据被丢弃。
- Javadoc: window(int,int))
window(timespan, unit[, scheduler])
这个window
的变体立即打开它的第一个窗口。每当过了timespan
这么长的时间它就关闭当前窗口并打开一个新窗口(时间单位是unit
,可选在调度器scheduler
上执行)。如果从原始Observable收到了onError
或onCompleted
通知它也会关闭当前窗口。这种window
变体发射一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据也是一一对应的。
- Javadoc: window(long,TimeUnit))
- Javadoc: window(long,TimeUnit,Scheduler))
window(timespan, unit, count[, scheduler])
这个window
的变体立即打开它的第一个窗口。这个变体是window(count)
和window(timespan, unit[, scheduler])
的结合,每当过了timespan
的时长或者当前窗口收到了count
项数据,它就关闭当前窗口并打开另一个。如果从原始Observable收到了onError
或onCompleted
通知它也会关闭当前窗口。这种window
变体发射一系列不重叠的窗口,这些窗口的数据集合与原始Observable发射的数据也是一一对应的。
- Javadoc: window(long,TimeUnit,int))
- Javadoc: window(long,TimeUnit,int,Scheduler))
window(timespan, timeshift, unit[, scheduler])
buffer(timespan, timeshift, unit)
在每一个timeshift
时期内都创建一个新的List
,然后用原始Observable发射的每一项数据填充这个列表(在把这个List
当做自己的数据发射前,从创建时开始,直到过了timespan
这么长的时间)。如果timespan
长于timeshift
,它发射的数据包将会重叠,因此可能包含重复的数据项。
这个window
的变体立即打开它的第一个窗口。随后每当过了timeshift
的时长就打开一个新窗口(时间单位是unit
,可选在调度器scheduler
上执行),当窗口打开的时长达到timespan
,它就关闭当前打开的窗口。如果从原始Observable收到了onError
或onCompleted
通知它也会关闭当前窗口。窗口的数据可能重叠也可能有间隙,取决于你设置的timeshift
和timespan
的值。
这个变体的window
默认在computation
调度器上执行它的定时器。
- Javadoc: window(long,long,TimeUnit))
- Javadoc: window(long,long,TimeUnit,Scheduler))
window-backpressure
你可以使用Window
操作符实现反压backpressure
(意思是,处理这样一个Observable:它产生数据的数据可能比它的观察者消费数据的数据快)。
Window操作符可以将大量的数据序列缩减为较少的数据窗口序列,让它们更容易处理。例如,你可以按固定的时间间隔,定期关闭和发射来自一个爆发性Observable的数据窗口。
示例代码
Observable<Observable<Integer>> burstyWindowed = bursty.window(500, TimeUnit.MILLISECONDS);
你还可以选择每当收到爆发性Observable的N项数据时发射一个新的数据窗口。
示例代码
Observable<Observable<Integer>> burstyWindowed = bursty.window(5);