Rxjava2源码分析(1)

1、基本用法

本文解读是rxjava的2.1.0版本,对应的rxandroid版本为2.0.1。
先看看基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append("accept : " + s +"\n");
Log.e(TAG, "accept : " + s +"\n" );
}
});

2、原理探究

Rxjava主要要素是Observable(被观察者)、Observer(观察者)、subscribe(订阅)和事件。理解这些我们接下来往下看。

2.1 Observable.create

跟踪Observable.create

1
2
3
4
5
6
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

传入的是ObservableOnSubscribe类型对象,方法的返回值是调用RxJavaPlugins.onAssembly()

1
2
3
4
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}

onAssembly

在RxJavaPlugins里面看到

1
2
3
4
5
6
7
8
9
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;//1
if (f != null) {
return apply(f, source); //2
}
return source;//3
}

在这里可以看出RxJavaPlugin的作用是方便测试和追踪。可以配合instanceof 等等替换某些你想追踪的Obseravble/Observer/Scheduler等等 。你想想onObservableAssembly不为null的情形。

//1 onObservableAssembly

onObservableAssembly是个对象属性,你可以set和get来控制你的对象,主要是用来测试用的。由于没set该对象,此时的f为null,所以返回 source,也就是上面的基本用法里面new 出来的ObservableOnSubscribe对象

1
static volatile Function<? super Observable, ? extends Observable> onObservableAssembly;

Function类只有一个方法

1
2
3
4
public interface Function<T, R> {
@NonNull
R apply(@NonNull T t) throws Exception;
}

接下来看看ObservableCreate里面有什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
......
}

请记住这个subscribeActual和这个ObservableCreate,下面会用到。

2.2 observable.subscribe(consumer)

下面是订阅的环节。

1
2
3
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

往下追

1
2
3
4
5
6
7
8
9
10
11
12
13
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);//1
subscribe(ls);//2
return ls;
}

//1 LambdaObserver

这里新建了一个LambdaObserver类型的对象,把传入的参数转换了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Disposable> onSubscribe;
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Disposable> onSubscribe) {
super();
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.setOnce(this, s)) {
try {
onSubscribe.accept(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
onError(ex);
}
}
}
@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().dispose();
onError(e);
}
}
}
@Override
public void onError(Throwable t) {
if (!isDisposed()) {
lazySet(DisposableHelper.DISPOSED);
try {
onError.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
lazySet(DisposableHelper.DISPOSED);
try {
onComplete.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
}
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}
}

这里可以看得出LambdaObserver是封装了一层,用来隔绝参数用。

//2 subscribe(ls)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public abstract class Observable<T> implements ObservableSource<T> {
......
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer); //3
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer); //4
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
protected abstract void subscribeActual(Observer<? super T> observer);
......

//3 RxJavaPlugins.onSubscribe

1
2
3
4
5
6
7
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
if (f != null) {
return apply(f, source, observer);
}
return observer;
}

这里同样只原路返回observer,因为f为null。

//4 subscribeActual(observer)

追踪subscribeActual只是一个抽象方法。这时调用的是当初ObservableCreate里面实现的方法。

1
protected abstract void subscribeActual(Observer<? super T> observer);

ObservableCreate里面的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer); //1
observer.onSubscribe(parent);
try {
source.subscribe(parent);//2
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

2.3 subscribeActual

//1 CreateEmitter < T > parent = new CreateEmitter< T >(observer);

注意上面,observer传入的对象其实就是LambdaObserver。这个CreateEmitter给LambdaObserver封装了一层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}

observer.onSubscribe(parent);

这里是传入系统默认值,这里为空,可以认为不起作用。

//2 source.subscribe(parent);

看到这个source没,这个是source当初new出来的对象并传进来的。

1
2
3
4
5
6
7
new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}

所以在new ObservableOnSubscribe里面的subscribe调用的e.onNext就调用到了后来new Consumer里面的 accept方法。

1
2
3
4
5
6
7
new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append("accept : " + s +"\n");
Log.e(TAG, "accept : " + s +"\n" );
}
}
0%