Rxjava2源码分析(2)——操作符

承接上一篇Rxjava2源码分析(1)
这一篇将讲下Rxjava2的操作符。在Rxjava里面最常用的是map操作符,接下来将从map入手分析。

1.map操作符的基本用法

这里按照国际惯例,先上map的基本用法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append("accept : " + s +"\n");
Log.e(TAG, "accept : " + s +"\n" );
}
});

2.map源码跟踪

2.1 map

跟踪map进去,进入Observable里面,你会发现同样有RxJavaPlugins.onAssembly这东西。

1
2
3
4
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

RxJavaPlugins.onAssembly上文已经说过了,这次主要关注ObservableMap。new ObservableMap<T, R>(this, mapper)里面的this是create返回的Observable对象,mapper是你给的Function对象。

2.2 ObservableMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) { //默认done为false,跳过
return;
}
if (sourceMode != NONE) { //默认sourceMode 为0,跳过
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");//1
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);//2
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}

AbstractObservableWithUpstream

AbstractObservableWithUpstream是什么?它继承了Observable类,把ObservableSource对象保存了起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/** The source consumable Observable. */
protected final ObservableSource<T> source;
/**
* Constructs the ObservableSource with the given consumable.
* @param source the consumable Observable
*/
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}

//1 mapper.apply(t)

在ObservableMap里面,subscribeActual,将完成上游Observable的订阅。MapObserver里面的mapper.apply(t)将上游的t转换成下游所需的U。这里怎么变换是你一开始时候就设定好的。这里我们当初传入map里面的Function如下。

1
2
3
4
5
6
new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return "This is result " + integer;
}
}

//2 actual.onNext(v)

actual.onNext(v); 是交接给下游的Observer。actual对应本例就是当初create的对象,执行onNext方法就是执行当初自己定义的操作。

1
2
3
4
5
6
7
new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append("accept : " + s +"\n");
Log.e(TAG, "accept : " + s +"\n" );
}
}

至此,当初的e.onNext(1) 将得到响应。

总结一下。订阅过程是最后一个Observable往上一个Observable订阅(本例中是MapObserver)。从最后一个Observable里面的subscribe有个subscribeActual方法调用,这个方法会一层层订阅,直至第一个Observable。然后触发第一个Observable的subscribe,这里面会触发数据流操作。在本例中,订阅至到第一个Observable的subscribe实现类,里面的e.onNext(1); 会触发数据一层层往下流,下一个是MapObserver对数据的操作,到最后的Observable。

0%