前言
和之前的Glide篇一样,这篇RxJava2源码分析也会先列出一些要点,然后按这些点一步一步分析
注意,文章中的RxJava均是指RxJava2
- Rxjava中的主要类型
- Rxjava订阅与终止订阅的过程(Observable、Observer、create、just、dispose)
- Rxjava操作符原理(map、lift、compose)
- Rxjava线程调度原理(subscribeOn、observeOn、io、main)
- Rxjava背压处理原理(buffer、latest、drop)
- Rxjava冷热Observable(publish、share、connect、refCount)
- Rxjava封装库RxBinding原理
RxJava中的主要类型
开始之前先梳理下几个关键类的作用和他们之间的关系
Observable
被观察者(事件源),不处理背压
Observer
观察者,用于订阅Observable
Subject
继承了Observable
实现了Observer
,既可做观察者也可做被观察者,通常作为两者的桥梁或代理
Flowable(Publisher)
被观察者(事件源),有背压处理策略
Subscriber
观察者,用于订阅Flowable
Processor
实现类FlowableProcessor
继承了Flowable
实现了FlowableSubscriber
,类似Subject
Single/SingleObserver
仅发生一次消息,遵循onSubscribe (onSuccess | onError)?
Completable/CompletableObserver
仅发生一次消息,遵循onSubscribe (onComplete | onError)?
Maybe/MaybeObserver
仅发生一次消息,遵循onSubscribe (onSuccess | onError | onComplete)?
Disposable
替代了RxJava1中的Subscription
,实现该接口的资源具备可被取消(dispose)的能力
Subscription
在Subscriber
订阅时回调的对象,具备拉取(request)和取消订阅(cancel)的能力
RxJava订阅与终止订阅的过程
这里先以最基础的Observable.create
为例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { } }) .subscribe(new Observer<String>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(String s) { } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
|
进入create
方法,requireNonNull
只是一个简单的判空处理,然后由onAssembly
返回Observable<T>
对象
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> create(ObservableOnSubscribe<T> source) { ObjectHelper.requireNonNull(source, "source is null"); return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source)); } @SuppressWarnings({ "rawtypes", "unchecked" }) @NonNull public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) { Function<? super Observable, ? extends Observable> f = onObservableAssembly; if (f != null) { return apply(f, source); } return source; }
|
onAssembly
是一个具有hook作用的方法,它会判断它的Function
类型成员变量onObservableAssembly
是否存在,不存在则直接把传入的参数返回,存在则把经过onObservableAssembly
处理后的结果返回,相当于提供了一层允许插入额外操作的hook层。在当前场景下它直接返回了我们创建的ObservableCreate<T>
,它是一个继承了Observable
的类,之后我们会调用Observable
的subscribe
方法来完成订阅
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @SchedulerSupport(SchedulerSupport.NONE) @Override public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer); } catch (NullPointerException e) { throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; }
|
订阅方法中的requireNonNull
和RxJavaPlugins
就不再赘述了,最后执行了subscribeActual
方法,这里是实际完成订阅的地方。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); try { source.subscribe(parent); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } }
|
回到ObservableCreate
中,在subscribeActual
里首先创建了用于发射事件的CreateEmitter
对象,CreateEmitter
实现了接口Emitter
和Disposable
, 并持有observer
。当通过onNext
发射事件时会传递给观察者的onNext
方法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { private static final long serialVersionUID = -3434801548987643227L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t); } } @Override public void onError(Throwable t) { if (t == null) { t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources."); } if (!isDisposed()) { try { observer.onError(t); } finally { dispose(); } } else { RxJavaPlugins.onError(t); } } @Override public void onComplete() { if (!isDisposed()) { try { observer.onComplete(); } finally { dispose(); } } } }
|
最后执行source.subscribe(parent);
使数据源开始经由发射器发射数据,至此整个创建过程就走通了
1 2 3 4 5 6 7
| new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> e) throws Exception { e.onNext("123"); e.onComplete(); } }
|
经过上面的分析,我们了解到了:
每次将事件传递给观察者时都会判断isDisposed()
检查是否订阅已经终止,一旦触发了onError()
和onComplete()
紧接着就会执行dispose()
执行Observable.subscribe
后才会在subscribeActual
中完成实际的订阅,并且开始执行发射器发射事件的代码,创建型操作符create
,defer
,fromCallable
,just
等均遵循这个规则,稍稍需要注意的是,just()
方法即使没有订阅也会立刻执行(是立刻执行该函数本身,不是开始发射数据),他会一开始就把我们要发射的内容作为value
保存下来
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> { private final T value; public ObservableJust(final T value) { this.value = value; } @Override protected void subscribeActual(Observer<? super T> s) { ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value); s.onSubscribe(sd); sd.run(); } @Override public T call() { return value; } }
|
接下来分析终止订阅的过程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public enum DisposableHelper implements Disposable { * The singleton instance representing a terminal, disposed state, don't leak it. */ DISPOSED ; public static boolean isDisposed(Disposable d) { return d == DISPOSED; } public static boolean dispose(AtomicReference<Disposable> field) { Disposable current = field.get(); Disposable d = DISPOSED; if (current != d) { current = field.getAndSet(d); if (current != d) { if (current != null) { current.dispose(); } return true; } } return false; } }
|
在终止订阅时,执行dispose()
会将一个单例对象DISPOSED
,赋给当前的Disposable
对象,由于枚举成员本质是静态常量,所以isDisposed(Disposable d)
方法也只需要判断当前对象的引用是否是DISPOSED
即可。这种判断当前state的方法我们在设计开源库时也可以借鉴。然后进入Observable.just("Test")
方法,它会创建并返回一个ObservableJust<T>
对象
Rxjava操作符原理
先从最经典的map
方法入手
1 2 3 4 5 6
| @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) { ObjectHelper.requireNonNull(mapper, "mapper is null"); return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper)); }
|
ObservableMap
继承自Observable
,所以map
方法返回了一个新的Observable
,进入ObservableMap
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> { final Function<? super T, ? extends U> function; public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) { super(source); this.function = function; } @Override public void subscribeActual(Observer<? super U> t) { source.subscribe(new MapObserver<T, U>(t, function)); } static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> { final Function<? super T, ? extends U> mapper; MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) { super(actual); this.mapper = mapper; } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != NONE) { actual.onNext(null); return; } U v; try { v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value."); } catch (Throwable ex) { fail(ex); return; } actual.onNext(v); } @Override public int requestFusion(int mode) { return transitiveBoundaryFusion(mode); } @Nullable @Override public U poll() throws Exception { T t = qs.poll(); return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null; } } }
|
这里用到了装饰者模式,ObservableMap
持有来自它上游的被观察者source
,MapObserver
持有来自它下游的观察者和我们实现的转换方法,在subscribeActual
方法中完成ObservableMap
对source
的订阅,将来自source
的原始数据经过转换后再发给下游的观察者,从而实现map
这一功能。
到这里我们也能总结出包含多个操作符时的订阅流程了:
- 执行代码时,自上而下每一步操作符都会创建一个新的
Observable
,当最后一步执行subscribe
时会触发最后一个Observable
的subscribeActual
方法,在该方法中完成它所持有的上一个Observable
的订阅(即执行source.subscribe
),由此也会触发其subscribeActual
方法,如此反复直至最上游,所以订阅的过程是自下而上
的。
- 订阅者
Observer
在订阅的过程中会自下游一路创建至上游,每一个创建的Observer
都持有它下游的Observer
。当订阅到最上游数据源开始发射数据时,数据会自上而下
的传递,直至最终将数据从数据源传递到我们手动创建的最下游Observer
。
了解了map
后就可以去了解比较特殊的lift
和compose
了,在RxJava1中大部分变换都基于lift
这个神奇的操作符,进入lift
的源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public final class ObservableLift<R, T> extends AbstractObservableWithUpstream<T, R> { final ObservableOperator<? extends R, ? super T> operator; public ObservableLift(ObservableSource<T> source, ObservableOperator<? extends R, ? super T> operator) { super(source); this.operator = operator; } @Override public void subscribeActual(Observer<? super R> s) { Observer<? super T> observer; try { observer = ObjectHelper.requireNonNull(operator.apply(s), "Operator " + operator + " returned a null Observer"); } catch (NullPointerException e) { throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } source.subscribe(observer); } }
|
lift
也是创建并返回了一个新的Observable
,即ObservableLift
。在它的subscribeActual
方法中也是完成了实际订阅,但是在订阅前执行了operator.apply(s)
,我们实际的订阅者是由下游订阅者经过apply
方法转换产生的。现在可以回去翻之前的代码,在map
中MapObserver
替我们完成了这一步封装,MapObserver
做为实际的订阅者和下游Observer
的代理,让我们有了额外执行转换的机会,而lift
要更原始更自由,把定义代理Observer
的过程完全交由我们自己来实现,因此它也作为了RxJava1中转换操作符依赖的基础。
compose
则更简单了,它允许我们将一系列操作符打包封装到一起方便复用。根据我们之前对订阅流程的分析,只要把当前Observable
做为source
传入,把经过一系列操作符后将最终创建的Observable
返回即可实现这个功能。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| @SuppressWarnings("unchecked") @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final <R> Observable<R> compose(ObservableTransformer<? super T, ? extends R> composer) { return wrap(((ObservableTransformer<T, R>) composer).apply(this)); } @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public static <T> Observable<T> wrap(ObservableSource<T> source) { ObjectHelper.requireNonNull(source, "source is null"); if (source instanceof Observable) { return RxJavaPlugins.onAssembly((Observable<T>)source); } return RxJavaPlugins.onAssembly(new ObservableFromUnsafeSource<T>(source)); } public interface ObservableTransformer<Upstream, Downstream> { ObservableSource<Downstream> apply(Observable<Upstream> upstream); }
|
compose
的源码只有这么短,warp
的作用不再赘述,果然它把当前Observable
作为source
传入了ObservableTransformer
,我们需要自己实现ObservableTransformer
接口,拿到上游数据源source
经过需要封装的操作符处理后将结果Observable
返回即可,符合我们之前分析的订阅流程。以后需要复用操作符时直接用compose
传入我们之前定义好的ObservableTransformer
实现类即可。
可见lift
给了我们封装Observer
的机会,compose
给了我们封装Observable
的机会。
RxJava线程调度原理
先从subscribeOn开始,进入.subscribeOn(Schedulers.io())
的源码
1 2 3 4 5 6
| @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
|
subscribeOn
也是一个操作符,根据我们之前的分析,它显然也会返回一个Observable
对象,即ObservableSubscribeOn
,进入其中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new Runnable() { @Override public void run() { source.subscribe(parent); } })); } static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable { private static final long serialVersionUID = 8094547886072529208L; final Observer<? super T> actual; final AtomicReference<Disposable> s; SubscribeOnObserver(Observer<? super T> actual) { this.actual = actual; this.s = new AtomicReference<Disposable>(); } @Override public void onSubscribe(Disposable s) { DisposableHelper.setOnce(this.s, s); } @Override public void onNext(T t) { actual.onNext(t); } @Override public void onError(Throwable t) { actual.onError(t); } @Override public void onComplete() { actual.onComplete(); } @Override public void dispose() { DisposableHelper.dispose(s); DisposableHelper.dispose(this); } @Override public boolean isDisposed() { return DisposableHelper.isDisposed(get()); } void setDisposable(Disposable d) { DisposableHelper.setOnce(this, d); } } }
|
在实际订阅时首先创建了一个SubscribeOnObserver
对象,这个类实现了Observer
和Disposable
接口,作为下游Observer
的装饰类,事件实际会传递给下游Observer
,同时也具备了取消订阅(dispose
)的能力。然后会执行onSubscribe
,把这个可以视为Disposable
的包装类在订阅时回调给订阅者,便于取消订阅。最后把实际订阅过程source.subscribe(parent);
封装成Runnable
交给了scheduler.scheduleDirect
方法,scheduler
对象是我们最初调用操作符时传入的Schedulers.io()
,进入它的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| @NonNull public static Scheduler io() { return RxJavaPlugins.onIoScheduler(IO); } @NonNull static final Scheduler IO; @NonNull public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) { Function<? super Scheduler, ? extends Scheduler> f = onIoHandler; if (f == null) { return defaultScheduler; } return apply(f, defaultScheduler); }
|
返回的是IO
,它是Scheduler
的子类IoScheduler
对象,然而这个类没有覆写Scheduler
的scheduleDirect
方法,所以先进入Scheduler
类中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @NonNull public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); w.schedule(new Runnable() { @Override public void run() { try { decoratedRun.run(); } finally { w.dispose(); } } }, delay, unit); return w; } @NonNull public abstract Worker createWorker();
|
看来切换线程的工作交给了Worker
,并且又包装了一层Runnable
,原本的Runnable
届时会经由run
方法启动。且Worker
实现了Disposable
接口,可被取消。接下来进入IoScheduler
去看createWorker
的实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @NonNull @Override public Worker createWorker() { return new EventLoopWorker(pool.get()); } static final class EventLoopWorker extends Scheduler.Worker { EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; this.tasks = new CompositeDisposable(); this.threadWorker = pool.get(); } @NonNull @Override public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) { if (tasks.isDisposed()) { return EmptyDisposable.INSTANCE; } return threadWorker.scheduleActual(action, delayTime, unit, tasks); } }
|
从ThreadWorker
池中取了一个ThreadWorker
,执行他的scheduleActual
方法,看名字就知道这里是完成实际调度的地方
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| public class NewThreadWorker extends Scheduler.Worker implements Disposable { private final ScheduledExecutorService executor; public NewThreadWorker(ThreadFactory threadFactory) { executor = SchedulerPoolFactory.create(threadFactory); } public static ScheduledExecutorService create(ThreadFactory factory) { final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory); if (exec instanceof ScheduledThreadPoolExecutor) { ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec; POOLS.put(e, exec); } return exec; } @NonNull public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null) { if (!parent.add(sr)) { return sr; } } Future<?> f; try { if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; } }
|
到这里我们终于看到了熟悉的身影,原来最终是把这个Runnable
直接提交给newScheduledThreadPool
创建的线程池来执行了,到此结束。
现在开始回顾上面的过程:
subscribeOn
只能生效一次,因为完整的订阅过程是先自下而上订阅,然后数据源发射事件,再自上而下传递的,所以真正发射事件时所在线程,是由最接近上游(较晚执行到)的一次subscribeOn
来最终决定的,之前订阅时的切换效果会被覆盖(即多次调用subscribeOn
操作符时只取第一次有效)
subscribeOn
是在发射数据之前的订阅阶段完成的线程切换,所以在没有observerOn
的前提下会影响到整个数据流所在的线程
- 具体负责启动线程工作的
Worker
实现了Disposable
接口,以便在订阅被取消时不再执行尚未执行的Runnable
接下来进入.observeOn(AndroidSchedulers.mainThread())
的源码
1 2 3 4 5 6 7 8 9 10 11 12
| @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> observeOn(Scheduler scheduler) { return observeOn(scheduler, false, bufferSize()); } @CheckReturnValue @SchedulerSupport(SchedulerSupport.CUSTOM) public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); }
|
和所有操作符一样,返回一个Observable
对象ObservableObserveOn
,进入其中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111
| public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { Scheduler.Worker w = scheduler.createWorker(); source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { @Override public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; queue = new SpscLinkedArrayQueue<T>(bufferSize); actual.onSubscribe(this); } } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } } @Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } } void drainNormal() { int missed = 1; final SimpleQueue<T> q = queue; final Observer<? super T> a = actual; for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return; } for (;;) { boolean d = done; T v; try { v = q.poll(); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); s.dispose(); q.clear(); a.onError(ex); worker.dispose(); return; } boolean empty = v == null; if (checkTerminated(d, empty, a)) { return; } if (empty) { break; } a.onNext(v); } missed = addAndGet(-missed); if (missed == 0) { break; } } } } }
|
这部分代码比较多,省略了部分代码,在subscribeActual
中可以看到observeOn
订阅过程和正常订阅一致,说明它不影响订阅过程,然后根据调度类型创建对应Worker
传入了到了ObserveOnObserver
,这个类既实现了Observer
又实现了Runnable
,且在订阅时(onSubscribe)会创建一个缓冲队列queue
。当上游数据到来时先放入队列,接着执行调度在目标线程触发run
方法,run
方法内从队列中取数据并传递给下游的Observer
完成整个过程。
又到了回顾总结的时间:
observeOn
不同于subscribeOn
,后者发生在订阅期间,数据发射前,而前者发生在数据发生后,数据传递的过程中,所以observeOn
可以多次生效。
- 由于数据传递是自上而下的,每次调度都会影响到其下游所在线程,所以当存在多个
observeOn
操作符时,每段数据所在线程由其上游最接近它的observeOn
所决定
ObserveOnObserver
在向下游发射数据时会先将数据放入默认大小是128的缓冲队列,待线程调度完成后再从队列中取出数据交给下游观察者
RxJava背压处理原理
在RxJava2中,默认策略下判断是否触发背压的因素如下:
- 在同步场景中,由发射数是否超出响应式拉取值
request
决定
- 在异步场景中,由是否超出了缓冲池
queue
的承受能力决定(在上面介绍observeOn
操作符的时候有提及queue
),需要下游的request
方法拉取queue
中的数据
- observeOn的重载方法
observeOn(Scheduler scheduler, boolean delayError, int bufferSize)
允许我们定义buffersize
即缓冲队列的初始大小,默认大小为128
Subscriber
对象订阅时产生的public void onSubscribe(Subscription s)
回调提供的Subscription
可以调用request
方法增加拉取数,如果我们不重写onSubscribe
方法的话,它的默认实现会执行s.request(Long.MAX_VALUE);
,这也是为什么我们通常无需去手动调用request
来做拉取
Observable
不具备处理背压的能力,Flowable
具备处理背压的能力,此时背压的触发因素是由具体策略决定的,如下:
ERROR
: 触发背压时直接抛出MissingBackpressureException
BUFFER
: queue
无限大,直至OOM
DROP
: queue
超载时会抛弃之后的数据,不会抛出异常
LATEST
: queue
超载时会抛弃之后的数据,不会抛出异常,且始终有一个额外空间保留着当前最新一次的数据
Flowable
也提供了操作符来实现这些策略,比如我们先进入onBackpressureDrop()
,它返回一个Flowable
,即FlowableOnBackpressureDrop
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public final class FlowableOnBackpressureDrop<T> extends AbstractFlowableWithUpstream<T, T> implements Consumer<T> { final Consumer<? super T> onDrop; @Override protected void subscribeActual(Subscriber<? super T> s) { this.source.subscribe(new BackpressureDropSubscriber<T>(s, onDrop)); } static final class BackpressureDropSubscriber<T> extends AtomicLong implements FlowableSubscriber<T>, Subscription { @Override public void onNext(T t) { if (done) { return; } long r = get(); if (r != 0L) { actual.onNext(t); BackpressureHelper.produced(this, 1); } else { try { onDrop.accept(t); } catch (Throwable e) { Exceptions.throwIfFatal(e); cancel(); onError(e); } } } @Override public void request(long n) { if (SubscriptionHelper.validate(n)) { BackpressureHelper.add(this, n); } } } }
|
在它的内部对观察者做了封装,BackpressureDropSubscriber
继承自AtomicLong
,实现了Subscriber
,只有判断自身的值不为0时才会向下游传递数据,否则丢弃。那么这个值是如何计算的呢,进入BackpressureHelper
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public static long produced(AtomicLong requested, long n) { for (;;) { long current = requested.get(); if (current == Long.MAX_VALUE) { return Long.MAX_VALUE; } long update = current - n; if (update < 0L) { RxJavaPlugins.onError(new IllegalStateException("More produced than requested: " + update)); update = 0L; } if (requested.compareAndSet(current, update)) { return update; } } } public static long add(AtomicLong requested, long n) { for (;;) { long r = requested.get(); if (r == Long.MAX_VALUE) { return Long.MAX_VALUE; } long u = addCap(r, n); if (requested.compareAndSet(r, u)) { return r; } } }
|
每次request(n)
的时候都会把n加到BackpressureDropSubscriber
的当前值上,每执行一次onNext
都会让值减一,成功实现了当把下游的拉取值消耗完后直接抛弃数据的效果。onBackpressureLatest()
与前者类似,只是它里面多了一个对象用于存储当前最近一次的值,每次onNext收到上游新数据时都会刷新这个值
1 2 3 4 5 6
| final AtomicReference<T> current = new AtomicReference<T>(); @Override public void onNext(T t) { current.lazySet(t); drain(); }
|
最后onBackpressureBuffer()
的代码就不贴了,在它的订阅者包装类BackpressureBufferSubscriber
中创建了可自增队列容器SpscLinkedArrayQueue
,会将一切上游的数据都缓存起来,等待request
依次拉取。
Rxjava冷热Observable
在RxJava中Observable
有Hot
和Cold
之分
Cold Observables
: 我们平时创建的Observable基本都是Cold类型的,当有订阅者订阅时数据源才会开始发射数据,且当有多个订阅者时他们的数据流相互独立,都会从头接收到完整的数据,符合响应式拉取的模型
Hot Observables
: 不管是否订阅,一旦开始发射数据就会一直发射,除非主动停止,且当有多个订阅者时他们观察到的会是同一条数据流,具有共享订阅的特性,Subjects
就属于Hot类型的
然后介绍一些常见的相关操作符:
publish()
: 可以将cold类型转变为hot类型,并返回一个ConnectableObservable
share()
: 它的源码很直接return this.publish().refCount();
connect()
: ConnectableObservable
在Observable
的基础上扩展出的方法,调用该方法后数据源开始发射数据,该方法会返回一个Disposable
用于终止数据的发射
refCount
: ConnectableObservable
在Observable
的基础上扩展出的方法,它会使Hot类型的Observable
也具备随第一个订阅者订阅时开始发射数据,最后一个订阅者取消时停止发射数据的类似cold的特性
下面分析一下publish()
和connect()
方法,publish()
会创建并返回一个继承了ConnectableObservable
类的ObservablePublish()
对象,进入它的源码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110
| public final class ObservablePublish<T> extends ConnectableObservable<T> implements HasUpstreamObservableSource<T> { public static <T> ConnectableObservable<T> create(ObservableSource<T> source) { final AtomicReference<PublishObserver<T>> curr = new AtomicReference<PublishObserver<T>>(); ObservableSource<T> onSubscribe = new ObservableSource<T>() { @Override public void subscribe(Observer<? super T> child) { InnerDisposable<T> inner = new InnerDisposable<T>(child); child.onSubscribe(inner); for (;;) { PublishObserver<T> r = curr.get(); if (r == null || r.isDisposed()) { PublishObserver<T> u = new PublishObserver<T>(curr); if (!curr.compareAndSet(r, u)) { continue; } r = u; } if (r.add(inner)) { inner.setParent(r); break; } } } }; return RxJavaPlugins.onAssembly(new ObservablePublish<T>(onSubscribe, source, curr)); } private ObservablePublish(ObservableSource<T> onSubscribe, ObservableSource<T> source, final AtomicReference<PublishObserver<T>> current) { this.onSubscribe = onSubscribe; this.source = source; this.current = current; } @Override protected void subscribeActual(Observer<? super T> observer) { onSubscribe.subscribe(observer); } @Override public void connect(Consumer<? super Disposable> connection) { boolean doConnect; PublishObserver<T> ps; for (;;) { ps = current.get(); if (ps == null || ps.isDisposed()) { PublishObserver<T> u = new PublishObserver<T>(current); if (!current.compareAndSet(ps, u)) { continue; } ps = u; } doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true); break; } try { connection.accept(ps); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); throw ExceptionHelper.wrapOrThrow(ex); } if (doConnect) { source.subscribe(ps); } } @SuppressWarnings("rawtypes") static final class PublishObserver<T> implements Observer<T>, Disposable { final AtomicReference<InnerDisposable<T>[]> observers; @SuppressWarnings("unchecked") PublishObserver(AtomicReference<PublishObserver<T>> current) { this.observers = new AtomicReference<InnerDisposable<T>[]>(EMPTY); this.current = current; this.shouldConnect = new AtomicBoolean(); } @Override public void onNext(T t) { for (InnerDisposable<T> inner : observers.get()) { inner.child.onNext(t); } } boolean add(InnerDisposable<T> producer) { for (;;) { InnerDisposable<T>[] c = observers.get(); if (c == TERMINATED) { return false; } int len = c.length; @SuppressWarnings("unchecked") InnerDisposable<T>[] u = new InnerDisposable[len + 1]; System.arraycopy(c, 0, u, 0, len); u[len] = producer; if (observers.compareAndSet(c, u)) { return true; } } } } }
|
publish()
方法在创建ObservablePublish
时,会伴随着创建一个被观察者onSubscribe
和一个观察者current
- 每当有观察者订阅该
ConnectableObservable
时,在subscribeActual
方法中订阅到的不是source
而是onSubscribe
,实际执行的是它的subscribe
方法
- 该方法中,当前观察者作为一个child被封装到
InnerDisposable
,而这个对象紧接着又被加入到current
的成员变量observers
中
connect()
方法在执行后才会订阅真正的上游数据源source
,打通订阅流程,数据源开始发射数据,而订阅这个source
的正是current
,在它的onNext
方法中会进行for循环,通知给所有之前已经加入进来的observers
经过这些步骤的包装,一个cold型Observable就转变成了hot型
RxJava封装库RxBinding原理
上面已经分析过很多操作符的原理了,可以发现封装操作符其实是有套路了,我们仿照这个套路自己也可以自定义一些操作符,或者拓展出RxXXX库,套路如下:
- 在操作符中返回一个我们自定义的
Observable
子类
- 重写它的
subscribeActual
方法,在这里写下订阅阶段需要完成的逻辑,并调用上游source
的subscribe
方法与参数中传入的observer
对接
- 如果在数据在传递阶段也需要处理逻辑,则封装一个
Observer
接口实现类与source
对接,在onNext
中完成自己的逻辑后再将数据传递给下游observer
的onNext
下面分析下RxBinding是如何将OnClickListener
封装成操作符的
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| final class ViewClickObservable extends Observable<Object> { private final View view; ViewClickObservable(View view) { this.view = view; } @Override protected void subscribeActual(Observer<? super Object> observer) { if (!checkMainThread(observer)) { return; } Listener listener = new Listener(view, observer); observer.onSubscribe(listener); view.setOnClickListener(listener); } static final class Listener extends MainThreadDisposable implements OnClickListener { private final View view; private final Observer<? super Object> observer; Listener(View view, Observer<? super Object> observer) { this.view = view; this.observer = observer; } @Override public void onClick(View v) { if (!isDisposed()) { observer.onNext(Notification.INSTANCE); } } @Override protected void onDispose() { view.setOnClickListener(null); } } }
|
按照套路,创建一个Observable
的子类,操作符参数要求传入一个View
,在订阅阶段完成Listener
的创建与View
的绑定,由于OnClickListener
本身就是事件源,不存在上游Observable
,所以这里不需要上游source
与observer
对接的流程,数据传递时也没有额外操作,无需包装Observer
,当触发点击事件时直接调用onNext
将事件给到下游observer
就算任务完成了,比RxJava中操作符的逻辑要简单的多。
最后
原本除了RxBinding还想再分析下RxPermissions
的源码,介于篇幅实在已经巨长了(每个想到的点都想提一下,背压冷热什么的,结果写太长了( ̄▽ ̄)),想想还是算了,大家有兴趣的话可以自己去看源码,它是把Subject
当做Observable
来用,每个Permission获取结果后都会用与自己对应的Subject通过onNext
把结果传递给下游,这个框架是挺不错的学习资料。
由于篇幅较长,且很多地方都是个人的理解,不排除会有疏漏或错误的可能性,非常欢迎指正~
参考文章
RxJava2 源码解析(一)
RxJava2 源码解析(二)
给初学者的RxJava2.0教程
驯服数据流之 hot & cold Observable
声明:本站所有文章均为原创或翻译,遵循署名-非商业性使用-禁止演绎 4.0 国际许可协议,如需转载请确保您对该协议有足够了解,并附上作者名(Est)及原贴地址