前言

和之前的Glide篇一样,这篇RxJava2源码分析也会先列出一些要点,然后按这些点一步一步分析
注意,文章中的RxJava均是指RxJava2

  • Rxjava中的主要类型
  • Rxjava订阅与终止订阅的过程(Observable、Observer、create、just、dispose)
  • Rxjava操作符原理(map、lift、compose)
  • Rxjava线程调度原理(subscribeOn、observeOn、io、main)
  • Rxjava背压处理原理(buffer、latest、drop)
  • Rxjava冷热Observable(publish、share、connect、refCount)
  • Rxjava封装库RxBinding原理

RxJava中的主要类型

开始之前先梳理下几个关键类的作用和他们之间的关系

  • Observable 被观察者(事件源),不处理背压
  • Observer 观察者,用于订阅Observable
  • Subject 继承了Observable实现了Observer,既可做观察者也可做被观察者,通常作为两者的桥梁或代理

  • Flowable(Publisher) 被观察者(事件源),有背压处理策略

  • Subscriber 观察者,用于订阅Flowable
  • Processor 实现类FlowableProcessor继承了Flowable实现了FlowableSubscriber,类似Subject

  • Single/SingleObserver 仅发生一次消息,遵循onSubscribe (onSuccess | onError)?

  • Completable/CompletableObserver 仅发生一次消息,遵循onSubscribe (onComplete | onError)?
  • Maybe/MaybeObserver 仅发生一次消息,遵循onSubscribe (onSuccess | onError | onComplete)?

  • Disposable 替代了RxJava1中的Subscription ,实现该接口的资源具备可被取消(dispose)的能力

  • SubscriptionSubscriber订阅时回调的对象,具备拉取(request)和取消订阅(cancel)的能力

RxJava订阅与终止订阅的过程

这里先以最基础的Observable.create为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
}
})
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});

进入create方法,requireNonNull只是一个简单的判空处理,然后由onAssembly返回Observable<T>对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}

onAssembly是一个具有hook作用的方法,它会判断它的Function类型成员变量onObservableAssembly是否存在,不存在则直接把传入的参数返回,存在则把经过onObservableAssembly处理后的结果返回,相当于提供了一层允许插入额外操作的hook层。在当前场景下它直接返回了我们创建的ObservableCreate<T>,它是一个继承了Observable的类,之后我们会调用Observablesubscribe方法来完成订阅

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}

订阅方法中的requireNonNullRxJavaPlugins就不再赘述了,最后执行了subscribeActual方法,这里是实际完成订阅的地方。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);//观察者传入Emitter
observer.onSubscribe(parent); //触发订阅时产生的回调
try {
source.subscribe(parent); //发射器开始发射定义在事件源中的事件
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
}

回到ObservableCreate中,在subscribeActual里首先创建了用于发射事件的CreateEmitter对象,CreateEmitter实现了接口EmitterDisposable, 并持有observer。当通过onNext发射事件时会传递给观察者的onNext方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t); //将Next事件给到observer
}
}
@Override
public void onError(Throwable t) {
if (t == null) {
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
}
if (!isDisposed()) {
try {
observer.onError(t); //将Error事件给到observer并终止订阅
} finally {
dispose();
}
} else {
RxJavaPlugins.onError(t);
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
try {
observer.onComplete(); //将Complete事件给到observer并终止订阅
} finally {
dispose();
}
}
}
}

最后执行source.subscribe(parent);使数据源开始经由发射器发射数据,至此整个创建过程就走通了

1
2
3
4
5
6
7
new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("123");
e.onComplete();
}
}

经过上面的分析,我们了解到了:

  • 每次将事件传递给观察者时都会判断isDisposed()检查是否订阅已经终止,一旦触发了onError()onComplete()紧接着就会执行dispose()

  • 执行Observable.subscribe后才会在subscribeActual中完成实际的订阅,并且开始执行发射器发射事件的代码,创建型操作符createdeferfromCallablejust等均遵循这个规则,稍稍需要注意的是,just()方法即使没有订阅也会立刻执行(是立刻执行该函数本身,不是开始发射数据),他会一开始就把我们要发射的内容作为value保存下来

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
private final T value;
public ObservableJust(final T value) {
this.value = value;
}
@Override
protected void subscribeActual(Observer<? super T> s) {
ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
s.onSubscribe(sd);
sd.run();
}
@Override
public T call() {
return value;
}
}

接下来分析终止订阅的过程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public enum DisposableHelper implements Disposable {
/**
* The singleton instance representing a terminal, disposed state, don't leak it.
*/
DISPOSED
;
public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
}

在终止订阅时,执行dispose()会将一个单例对象DISPOSED,赋给当前的Disposable对象,由于枚举成员本质是静态常量,所以isDisposed(Disposable d)方法也只需要判断当前对象的引用是否是DISPOSED即可。这种判断当前state的方法我们在设计开源库时也可以借鉴。然后进入Observable.just("Test")方法,它会创建并返回一个ObservableJust<T>对象

Rxjava操作符原理

先从最经典的map方法入手

1
2
3
4
5
6
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

ObservableMap继承自Observable,所以map方法返回了一个新的Observable,进入ObservableMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}

这里用到了装饰者模式,ObservableMap持有来自它上游的被观察者sourceMapObserver持有来自它下游的观察者和我们实现的转换方法,在subscribeActual方法中完成ObservableMapsource的订阅,将来自source的原始数据经过转换后再发给下游的观察者,从而实现map这一功能。

到这里我们也能总结出包含多个操作符时的订阅流程了:

  • 执行代码时,自上而下每一步操作符都会创建一个新的Observable,当最后一步执行subscribe时会触发最后一个ObservablesubscribeActual方法,在该方法中完成它所持有的上一个Observable的订阅(即执行source.subscribe),由此也会触发其subscribeActual方法,如此反复直至最上游,所以订阅的过程是自下而上的。
  • 订阅者Observer在订阅的过程中会自下游一路创建至上游,每一个创建的Observer都持有它下游的Observer。当订阅到最上游数据源开始发射数据时,数据会自上而下的传递,直至最终将数据从数据源传递到我们手动创建的最下游Observer

了解了map后就可以去了解比较特殊的liftcompose了,在RxJava1中大部分变换都基于lift这个神奇的操作符,进入lift的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final class ObservableLift<R, T> extends AbstractObservableWithUpstream<T, R> {
/** The actual operator. */
final ObservableOperator<? extends R, ? super T> operator;
public ObservableLift(ObservableSource<T> source, ObservableOperator<? extends R, ? super T> operator) {
super(source);
this.operator = operator;
}
@Override
public void subscribeActual(Observer<? super R> s) {
Observer<? super T> observer;
try {
observer = ObjectHelper.requireNonNull(operator.apply(s), "Operator " + operator + " returned a null Observer");
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
source.subscribe(observer);
}
}

lift也是创建并返回了一个新的Observable,即ObservableLift。在它的subscribeActual方法中也是完成了实际订阅,但是在订阅前执行了operator.apply(s),我们实际的订阅者是由下游订阅者经过apply方法转换产生的。现在可以回去翻之前的代码,在mapMapObserver替我们完成了这一步封装,MapObserver做为实际的订阅者和下游Observer的代理,让我们有了额外执行转换的机会,而lift要更原始更自由,把定义代理Observer的过程完全交由我们自己来实现,因此它也作为了RxJava1中转换操作符依赖的基础。

compose则更简单了,它允许我们将一系列操作符打包封装到一起方便复用。根据我们之前对订阅流程的分析,只要把当前Observable做为source传入,把经过一系列操作符后将最终创建的Observable返回即可实现这个功能。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@SuppressWarnings("unchecked")
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) {
return wrap(((ObservableTransformer<T, R>) composer).apply(this));
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> wrap(ObservableSource<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
if (source instanceof Observable) {
return RxJavaPlugins.onAssembly((Observable<T>)source);
}
return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source));
}
public interface ObservableTransformer<Upstream, Downstream> {
ObservableSource<Downstream> apply(Observable<Upstream> upstream);
}

compose的源码只有这么短,warp的作用不再赘述,果然它把当前Observable作为source传入了ObservableTransformer,我们需要自己实现ObservableTransformer接口,拿到上游数据源source经过需要封装的操作符处理后将结果Observable返回即可,符合我们之前分析的订阅流程。以后需要复用操作符时直接用compose传入我们之前定义好的ObservableTransformer实现类即可。

可见lift给了我们封装Observer的机会,compose给了我们封装Observable的机会。

RxJava线程调度原理

先从subscribeOn开始,进入.subscribeOn(Schedulers.io())的源码

1
2
3
4
5
6
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

subscribeOn也是一个操作符,根据我们之前的分析,它显然也会返回一个Observable对象,即ObservableSubscribeOn,进入其中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler; //传入的Schedulers.io()
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent); //实际订阅发生在Runnable中由scheduler调度
}
}));
}
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> actual;
final AtomicReference<Disposable> s;
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable s) {
DisposableHelper.setOnce(this.s, s);
}
@Override
public void onNext(T t) {
actual.onNext(t);
}
@Override
public void onError(Throwable t) {
actual.onError(t);
}
@Override
public void onComplete() {
actual.onComplete();
}
@Override
public void dispose() {
DisposableHelper.dispose(s);
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return DisposableHelper.isDisposed(get());
}
void setDisposable(Disposable d) {
DisposableHelper.setOnce(this, d);
}
}
}

在实际订阅时首先创建了一个SubscribeOnObserver对象,这个类实现了ObserverDisposable接口,作为下游Observer的装饰类,事件实际会传递给下游Observer,同时也具备了取消订阅(dispose)的能力。然后会执行onSubscribe,把这个可以视为Disposable的包装类在订阅时回调给订阅者,便于取消订阅。最后把实际订阅过程source.subscribe(parent);封装成Runnable交给了scheduler.scheduleDirect方法,scheduler对象是我们最初调用操作符时传入的Schedulers.io(),进入它的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@NonNull
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
@NonNull
static final Scheduler IO;
@NonNull
public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
if (f == null) {
return defaultScheduler;
}
return apply(f, defaultScheduler);
}

返回的是IO,它是Scheduler的子类IoScheduler对象,然而这个类没有覆写SchedulerscheduleDirect方法,所以先进入Scheduler类中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker(); //实际线程调度者,由各Scheduler具体实现
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
@NonNull
public abstract Worker createWorker();

看来切换线程的工作交给了Worker,并且又包装了一层Runnable,原本的Runnable届时会经由run方法启动。且Worker实现了Disposable接口,可被取消。接下来进入IoScheduler去看createWorker的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}

ThreadWorker池中取了一个ThreadWorker,执行他的scheduleActual方法,看名字就知道这里是完成实际调度的地方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
//创建线程池
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr); //任务交由线程池执行
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
}

到这里我们终于看到了熟悉的身影,原来最终是把这个Runnable直接提交给newScheduledThreadPool创建的线程池来执行了,到此结束。

现在开始回顾上面的过程:

  • subscribeOn只能生效一次,因为完整的订阅过程是先自下而上订阅,然后数据源发射事件,再自上而下传递的,所以真正发射事件时所在线程,是由最接近上游(较晚执行到)的一次subscribeOn来最终决定的,之前订阅时的切换效果会被覆盖(即多次调用subscribeOn操作符时只取第一次有效)
  • subscribeOn是在发射数据之前的订阅阶段完成的线程切换,所以在没有observerOn的前提下会影响到整个数据流所在的线程
  • 具体负责启动线程工作的Worker实现了Disposable接口,以便在订阅被取消时不再执行尚未执行的Runnable

接下来进入.observeOn(AndroidSchedulers.mainThread())的源码

1
2
3
4
5
6
7
8
9
10
11
12
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

和所有操作符一样,返回一个Observable对象ObservableObserveOn,进入其中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler; //线程调度器
this.delayError = delayError;
this.bufferSize = bufferSize; //缓冲区大小,默认128
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
implements Observer<T>, Runnable {
//省略部分代码
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
//省略部分代码,创建缓冲队列
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t); //上游的数据全部先入队列
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this); //调度当前Runnable到指定线程并执行run中的内容
}
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
//该函数在Runnable所在的线程执行,已经完成了调度
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;
final Observer<? super T> a = actual;
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
try {
v = q.poll(); //队列中取数据
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
q.clear();
a.onError(ex);
worker.dispose();
return;
}
boolean empty = v == null;
if (checkTerminated(d, empty, a)) {
return;
}
if (empty) {
break;
}
a.onNext(v); //向下游发射数据
}
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
}
}

这部分代码比较多,省略了部分代码,在subscribeActual中可以看到observeOn订阅过程和正常订阅一致,说明它不影响订阅过程,然后根据调度类型创建对应Worker传入了到了ObserveOnObserver,这个类既实现了Observer又实现了Runnable,且在订阅时(onSubscribe)会创建一个缓冲队列queue。当上游数据到来时先放入队列,接着执行调度在目标线程触发run方法,run方法内从队列中取数据并传递给下游的Observer完成整个过程。

又到了回顾总结的时间:

  • observeOn不同于subscribeOn,后者发生在订阅期间,数据发射前,而前者发生在数据发生后,数据传递的过程中,所以observeOn可以多次生效。
  • 由于数据传递是自上而下的,每次调度都会影响到其下游所在线程,所以当存在多个observeOn操作符时,每段数据所在线程由其上游最接近它的observeOn所决定
  • ObserveOnObserver在向下游发射数据时会先将数据放入默认大小是128的缓冲队列,待线程调度完成后再从队列中取出数据交给下游观察者

RxJava背压处理原理

在RxJava2中,默认策略下判断是否触发背压的因素如下:

  • 在同步场景中,由发射数是否超出响应式拉取值request决定
  • 在异步场景中,由是否超出了缓冲池queue的承受能力决定(在上面介绍observeOn操作符的时候有提及queue),需要下游的request方法拉取queue中的数据
  • observeOn的重载方法observeOn(Scheduler scheduler, boolean delayError, int bufferSize)允许我们定义buffersize即缓冲队列的初始大小,默认大小为128
  • Subscriber对象订阅时产生的public void onSubscribe(Subscription s)回调提供的Subscription可以调用request方法增加拉取数,如果我们不重写onSubscribe方法的话,它的默认实现会执行s.request(Long.MAX_VALUE);,这也是为什么我们通常无需去手动调用request来做拉取

Observable不具备处理背压的能力,Flowable具备处理背压的能力,此时背压的触发因素是由具体策略决定的,如下:

  • ERROR: 触发背压时直接抛出MissingBackpressureException
  • BUFFER: queue无限大,直至OOM
  • DROP: queue超载时会抛弃之后的数据,不会抛出异常
  • LATEST: queue超载时会抛弃之后的数据,不会抛出异常,且始终有一个额外空间保留着当前最新一次的数据

Flowable也提供了操作符来实现这些策略,比如我们先进入onBackpressureDrop(),它返回一个Flowable,即FlowableOnBackpressureDrop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
public final class FlowableOnBackpressureDrop<T> extends AbstractFlowableWithUpstream<T, T> implements Consumer<T> {
final Consumer<? super T> onDrop;
@Override
protected void subscribeActual(Subscriber<? super T> s) {
this.source.subscribe(new BackpressureDropSubscriber<T>(s, onDrop));
}
static final class BackpressureDropSubscriber<T>
extends AtomicLong implements FlowableSubscriber<T>, Subscription {
@Override
public void onNext(T t) {
if (done) {
return;
}
long r = get();
if (r != 0L) {
actual.onNext(t);
BackpressureHelper.produced(this, 1);
} else {
try {
onDrop.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
cancel();
onError(e);
}
}
}
@Override
public void request(long n) {
if (SubscriptionHelper.validate(n)) {
BackpressureHelper.add(this, n);
}
}
}
}

在它的内部对观察者做了封装,BackpressureDropSubscriber继承自AtomicLong ,实现了Subscriber ,只有判断自身的值不为0时才会向下游传递数据,否则丢弃。那么这个值是如何计算的呢,进入BackpressureHelper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public static long produced(AtomicLong requested, long n) {
for (;;) {
long current = requested.get();
if (current == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long update = current - n;
if (update < 0L) {
RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update));
update = 0L;
}
if (requested.compareAndSet(current, update)) {
return update;
}
}
}
public static long add(AtomicLong requested, long n) {
for (;;) {
long r = requested.get();
if (r == Long.MAX_VALUE) {
return Long.MAX_VALUE;
}
long u = addCap(r, n);
if (requested.compareAndSet(r, u)) {
return r;
}
}
}

每次request(n)的时候都会把n加到BackpressureDropSubscriber的当前值上,每执行一次onNext都会让值减一,成功实现了当把下游的拉取值消耗完后直接抛弃数据的效果。onBackpressureLatest()与前者类似,只是它里面多了一个对象用于存储当前最近一次的值,每次onNext收到上游新数据时都会刷新这个值

1
2
3
4
5
6
final AtomicReference<T> current = new AtomicReference<T>();
@Override
public void onNext(T t) {
current.lazySet(t);
drain();
}

最后onBackpressureBuffer()的代码就不贴了,在它的订阅者包装类BackpressureBufferSubscriber中创建了可自增队列容器SpscLinkedArrayQueue,会将一切上游的数据都缓存起来,等待request依次拉取。

Rxjava冷热Observable

在RxJava中ObservableHotCold之分

Cold Observables: 我们平时创建的Observable基本都是Cold类型的,当有订阅者订阅时数据源才会开始发射数据,且当有多个订阅者时他们的数据流相互独立,都会从头接收到完整的数据,符合响应式拉取的模型

Hot Observables: 不管是否订阅,一旦开始发射数据就会一直发射,除非主动停止,且当有多个订阅者时他们观察到的会是同一条数据流,具有共享订阅的特性,Subjects 就属于Hot类型的

然后介绍一些常见的相关操作符:

  • publish(): 可以将cold类型转变为hot类型,并返回一个ConnectableObservable
  • share(): 它的源码很直接return this.publish().refCount();
  • connect(): ConnectableObservableObservable的基础上扩展出的方法,调用该方法后数据源开始发射数据,该方法会返回一个Disposable用于终止数据的发射
  • refCount: ConnectableObservableObservable的基础上扩展出的方法,它会使Hot类型的Observable也具备随第一个订阅者订阅时开始发射数据,最后一个订阅者取消时停止发射数据的类似cold的特性

下面分析一下publish()connect()方法,publish()会创建并返回一个继承了ConnectableObservable类的ObservablePublish()对象,进入它的源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
public final class ObservablePublish<T> extends ConnectableObservable<T> implements HasUpstreamObservableSource<T> {
public static <T> ConnectableObservable<T> create(ObservableSource<T> source) {
final AtomicReference<PublishObserver<T>> curr = new AtomicReference<PublishObserver<T>>();
ObservableSource<T> onSubscribe = new ObservableSource<T>() {
@Override //每当有观察者订阅时,都作为一个child加入observers
public void subscribe(Observer<? super T> child) {
InnerDisposable<T> inner = new InnerDisposable<T>(child);
child.onSubscribe(inner);
for (;;) {
PublishObserver<T> r = curr.get();
if (r == null || r.isDisposed()) {
PublishObserver<T> u = new PublishObserver<T>(curr);
if (!curr.compareAndSet(r, u)) {
continue;
}
r = u;
}
if (r.add(inner)) {
inner.setParent(r);
break; // NOPMD
}
}
}
};
return RxJavaPlugins.onAssembly(new ObservablePublish<T>(onSubscribe, source, curr));
}
private ObservablePublish(ObservableSource<T> onSubscribe, ObservableSource<T> source,
final AtomicReference<PublishObserver<T>> current) {
this.onSubscribe = onSubscribe;
this.source = source;
this.current = current;
}
//每当有观察者订阅时,实际订阅到的不是source,是onSubscribe
@Override
protected void subscribeActual(Observer<? super T> observer) {
onSubscribe.subscribe(observer);
}
@Override
public void connect(Consumer<? super Disposable> connection) {
boolean doConnect;
PublishObserver<T> ps;
for (;;) {
ps = current.get();
if (ps == null || ps.isDisposed()) {
PublishObserver<T> u = new PublishObserver<T>(current);
if (!current.compareAndSet(ps, u)) {
continue;
}
ps = u;
}
doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true);
break; // NOPMD
}
try {
connection.accept(ps);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
throw ExceptionHelper.wrapOrThrow(ex);
}
if (doConnect) {
source.subscribe(ps); //实际订阅上游数据源的地方,打通了订阅流程,开始发射数据
}
}
@SuppressWarnings("rawtypes")
static final class PublishObserver<T>
implements Observer<T>, Disposable {
//省略部分代码
//该对象持有所有观察者
final AtomicReference<InnerDisposable<T>[]> observers;
@SuppressWarnings("unchecked")
PublishObserver(AtomicReference<PublishObserver<T>> current) {
this.observers = new AtomicReference<InnerDisposable<T>[]>(EMPTY);
this.current = current;
this.shouldConnect = new AtomicBoolean();
}
//for循环把当前事件通知给所有观察者
@Override
public void onNext(T t) {
for (InnerDisposable<T> inner : observers.get()) {
inner.child.onNext(t);
}
}
//该方法将封装过的当前订阅者记录到数组中,成为observers的一员
boolean add(InnerDisposable<T> producer) {
for (;;) {
InnerDisposable<T>[] c = observers.get();
if (c == TERMINATED) {
return false;
}
int len = c.length;
@SuppressWarnings("unchecked")
InnerDisposable<T>[] u = new InnerDisposable[len + 1];
System.arraycopy(c, 0, u, 0, len);
u[len] = producer;
if (observers.compareAndSet(c, u)) {
return true;
}
}
}
}
}
  • publish()方法在创建ObservablePublish时,会伴随着创建一个被观察者onSubscribe和一个观察者current
  • 每当有观察者订阅该ConnectableObservable时,在subscribeActual方法中订阅到的不是source而是onSubscribe,实际执行的是它的subscribe方法
  • 该方法中,当前观察者作为一个child被封装到InnerDisposable,而这个对象紧接着又被加入到current的成员变量observers
  • connect()方法在执行后才会订阅真正的上游数据源source,打通订阅流程,数据源开始发射数据,而订阅这个source的正是current,在它的onNext方法中会进行for循环,通知给所有之前已经加入进来的observers

经过这些步骤的包装,一个cold型Observable就转变成了hot型

RxJava封装库RxBinding原理

上面已经分析过很多操作符的原理了,可以发现封装操作符其实是有套路了,我们仿照这个套路自己也可以自定义一些操作符,或者拓展出RxXXX库,套路如下:

  • 在操作符中返回一个我们自定义的Observable子类
  • 重写它的subscribeActual方法,在这里写下订阅阶段需要完成的逻辑,并调用上游sourcesubscribe方法与参数中传入的observer对接
  • 如果在数据在传递阶段也需要处理逻辑,则封装一个Observer接口实现类与source对接,在onNext中完成自己的逻辑后再将数据传递给下游observeronNext

下面分析下RxBinding是如何将OnClickListener封装成操作符的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
final class ViewClickObservable extends Observable<Object> {
private final View view;
ViewClickObservable(View view) {
this.view = view;
}
@Override protected void subscribeActual(Observer<? super Object> observer) {
if (!checkMainThread(observer)) {
return;
}
//订阅阶段执行的逻辑,创建listener绑定view
Listener listener = new Listener(view, observer);
observer.onSubscribe(listener);
view.setOnClickListener(listener);
}
static final class Listener extends MainThreadDisposable implements OnClickListener {
private final View view;
private final Observer<? super Object> observer;
Listener(View view, Observer<? super Object> observer) {
this.view = view;
this.observer = observer;
}
@Override public void onClick(View v) {
if (!isDisposed()) {
//发生点击时传递事件给下游
observer.onNext(Notification.INSTANCE);
}
}
@Override protected void onDispose() {
view.setOnClickListener(null);
}
}
}

按照套路,创建一个Observable的子类,操作符参数要求传入一个View,在订阅阶段完成Listener的创建与View的绑定,由于OnClickListener本身就是事件源,不存在上游Observable,所以这里不需要上游sourceobserver对接的流程,数据传递时也没有额外操作,无需包装Observer,当触发点击事件时直接调用onNext将事件给到下游observer就算任务完成了,比RxJava中操作符的逻辑要简单的多。

最后

原本除了RxBinding还想再分析下RxPermissions的源码,介于篇幅实在已经巨长了(每个想到的点都想提一下,背压冷热什么的,结果写太长了( ̄▽ ̄)),想想还是算了,大家有兴趣的话可以自己去看源码,它是把Subject当做Observable来用,每个Permission获取结果后都会用与自己对应的Subject通过onNext把结果传递给下游,这个框架是挺不错的学习资料。

由于篇幅较长,且很多地方都是个人的理解,不排除会有疏漏或错误的可能性,非常欢迎指正~

参考文章

RxJava2 源码解析(一)
RxJava2 源码解析(二)
给初学者的RxJava2.0教程
驯服数据流之 hot & cold Observable

声明:本站所有文章均为原创或翻译,遵循署名-非商业性使用-禁止演绎 4.0 国际许可协议,如需转载请确保您对该协议有足够了解,并附上作者名(Est)及原贴地址