Rxjava2源码分析

前言

本文将讲述Rxjava的相关应用和原理,包括基本原理,操作符,线程调用。

1、基本用法

本文解读是rxjava的2.1.0版本,对应的rxandroid版本为2.0.1。
先看看基本用法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append("accept : " + s +"\n");
Log.e(TAG, "accept : " + s +"\n" );
}
});

2、原理探究

Rxjava主要要素是Observable(被观察者)、Observer(观察者)、subscribe(订阅)和事件。理解这些我们接下来往下看。

2.1 Observable.create

跟踪Observable.create

1
2
3
4
5
6
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

传入的是ObservableOnSubscribe类型对象,方法的返回值是调用RxJavaPlugins.onAssembly()

1
2
3
4
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}

onAssembly

在RxJavaPlugins里面看到

1
2
3
4
5
6
7
8
9
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;//1
if (f != null) {
return apply(f, source); //2
}
return source;//3
}

在这里可以看出RxJavaPlugin的作用是方便测试和追踪。可以配合instanceof 等等替换某些你想追踪的Obseravble/Observer/Scheduler等等 。你想想onObservableAssembly不为null的情形。

//1 onObservableAssembly

onObservableAssembly是个对象属性,你可以set和get来控制你的对象,主要是用来测试用的。由于没set该对象,此时的f为null,所以返回 source,也就是上面的基本用法里面new 出来的ObservableOnSubscribe对象

1
static volatile Function<? super Observable, ? extends Observable> onObservableAssembly;

Function类只有一个方法

1
2
3
4
public interface Function<T, R> {
@NonNull
R apply(@NonNull T t) throws Exception;
}

接下来看看ObservableCreate里面有什么

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
......
}

请记住这个subscribeActual和这个ObservableCreate,下面会用到。

2.2 observable.subscribe(consumer)

下面是订阅的环节。

1
2
3
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

往下追

1
2
3
4
5
6
7
8
9
10
11
12
13
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);//1
subscribe(ls);//2
return ls;
}

//1 LambdaObserver

这里新建了一个LambdaObserver类型的对象,把传入的参数转换了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
private static final long serialVersionUID = -7251123623727029452L;
final Consumer<? super T> onNext;
final Consumer<? super Throwable> onError;
final Action onComplete;
final Consumer<? super Disposable> onSubscribe;
public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete,
Consumer<? super Disposable> onSubscribe) {
super();
this.onNext = onNext;
this.onError = onError;
this.onComplete = onComplete;
this.onSubscribe = onSubscribe;
}
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.setOnce(this, s)) {
try {
onSubscribe.accept(this);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
s.dispose();
onError(ex);
}
}
}
@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().dispose();
onError(e);
}
}
}
@Override
public void onError(Throwable t) {
if (!isDisposed()) {
lazySet(DisposableHelper.DISPOSED);
try {
onError.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(new CompositeException(t, e));
}
}
}
@Override
public void onComplete() {
if (!isDisposed()) {
lazySet(DisposableHelper.DISPOSED);
try {
onComplete.run();
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
}
}
}
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
@Override
public boolean isDisposed() {
return get() == DisposableHelper.DISPOSED;
}
}

这里可以看得出LambdaObserver是封装了一层,用来隔绝参数用。

//2 subscribe(ls)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public abstract class Observable<T> implements ObservableSource<T> {
......
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer); //3
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer); //4
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
protected abstract void subscribeActual(Observer<? super T> observer);
......

//3 RxJavaPlugins.onSubscribe

1
2
3
4
5
6
7
public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
if (f != null) {
return apply(f, source, observer);
}
return observer;
}

这里同样只原路返回observer,因为f为null。

//4 subscribeActual(observer)

追踪subscribeActual只是一个抽象方法。这时调用的是当初ObservableCreate里面实现的方法。

1
protected abstract void subscribeActual(Observer<? super T> observer);

ObservableCreate里面的实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer); //1
observer.onSubscribe(parent);
try {
source.subscribe(parent);//2
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}

2.3 subscribeActual

//1 CreateEmitter < T > parent = new CreateEmitter< T >(observer);

注意上面,observer传入的对象其实就是LambdaObserver。这个CreateEmitter给LambdaObserver封装了一层

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}

observer.onSubscribe(parent);

这里是传入系统默认值,这里为空,可以认为不起作用。

//2 source.subscribe(parent);

看到这个source没,这个是source当初new出来的对象并传进来的。

1
2
3
4
5
6
7
new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}

所以在new ObservableOnSubscribe里面的subscribe调用的e.onNext就调用到了后来new Consumer里面的 accept方法。

1
2
3
4
5
6
7
new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append("accept : " + s +"\n");
Log.e(TAG, "accept : " + s +"\n" );
}
}

这一篇将讲下Rxjava2的操作符。在Rxjava里面最常用的是map操作符,接下来将从map入手分析。

1.map操作符的基本用法

这里按照国际惯例,先上map的基本用法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1);
e.onNext(2);
e.onNext(3);
}
}).map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return "This is result " + integer;
}
}).subscribe(new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append("accept : " + s +"\n");
Log.e(TAG, "accept : " + s +"\n" );
}
});

2.map源码跟踪

2.1 map

跟踪map进去,进入Observable里面,你会发现同样有RxJavaPlugins.onAssembly这东西。

1
2
3
4
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

RxJavaPlugins.onAssembly上文已经说过了,这次主要关注ObservableMap。new ObservableMap<T, R>(this, mapper)里面的this是create返回的Observable对象,mapper是你给的Function对象。

2.2 ObservableMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) { //默认done为false,跳过
return;
}
if (sourceMode != NONE) { //默认sourceMode 为0,跳过
actual.onNext(null);
return;
}
U v;
try {
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");//1
} catch (Throwable ex) {
fail(ex);
return;
}
actual.onNext(v);//2
}
@Override
public int requestFusion(int mode) {
return transitiveBoundaryFusion(mode);
}
@Nullable
@Override
public U poll() throws Exception {
T t = qs.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
}
}
}

AbstractObservableWithUpstream

AbstractObservableWithUpstream是什么?它继承了Observable类,把ObservableSource对象保存了起来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/** The source consumable Observable. */
protected final ObservableSource<T> source;
/**
* Constructs the ObservableSource with the given consumable.
* @param source the consumable Observable
*/
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}

//1 mapper.apply(t)

在ObservableMap里面,subscribeActual,将完成上游Observable的订阅。MapObserver里面的mapper.apply(t)将上游的t转换成下游所需的U。这里怎么变换是你一开始时候就设定好的。这里我们当初传入map里面的Function如下。

1
2
3
4
5
6
new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return "This is result " + integer;
}
}

//2 actual.onNext(v)

actual.onNext(v); 是交接给下游的Observer。actual对应本例就是当初create的对象,执行onNext方法就是执行当初自己定义的操作。

1
2
3
4
5
6
7
new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
mRxOperatorsText.append("accept : " + s +"\n");
Log.e(TAG, "accept : " + s +"\n" );
}
}

至此,当初的e.onNext(1) 将得到响应。

总结一下。订阅过程是最后一个Observable往上一个Observable订阅(本例中是MapObserver)。从最后一个Observable里面的subscribe有个subscribeActual方法调用,这个方法会一层层订阅,直至第一个Observable。然后触发第一个Observable的subscribe,这里面会触发数据流操作。在本例中,订阅至到第一个Observable的subscribe实现类,里面的e.onNext(1); 会触发数据一层层往下流,下一个是MapObserver对数据的操作,到最后的Observable。

Rxjava2里面最惊艳的莫过于线程调度吗,下文将探讨一下线程调度的原理。

1. 基本用法

1
2
3
4
5
6
7
8
9
Observable.create((ObservableOnSubscribe<Integer>) e -> {
e.onNext(1);
e.onNext(2);
e.onComplete(); })
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(
i -> System.out.println("onNext : i= " + i));
}

observeOn

1
2
3
4
5
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
1
2
3
4
5
6
7
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
1
2
3
4
5
6
7
8
9
@SuppressWarnings({ "rawtypes", "unchecked" })
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
protected void subscribeActual(Observer<? super T> observer) {
if (this.scheduler instanceof TrampolineScheduler) {
this.source.subscribe(observer);
} else {
Worker w = this.scheduler.createWorker();
this.source.subscribe(new ObservableObserveOn.ObserveOnObserver(observer, w, this.delayError, this.bufferSize));
}
}
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
private static final long serialVersionUID = 6576896619930983584L;
final Observer<? super T> actual;
final Worker worker;
final boolean delayError;
final int bufferSize;
SimpleQueue<T> queue;
Disposable s;
Throwable error;
volatile boolean done;
volatile boolean cancelled;
int sourceMode;
boolean outputFused;
ObserveOnObserver(Observer<? super T> actual, Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
QueueDisposable<T> qd = (QueueDisposable)s;
int m = qd.requestFusion(7);
if (m == 1) {
this.sourceMode = m;
this.queue = qd;
this.done = true;
this.actual.onSubscribe(this);
this.schedule();
return;
}
if (m == 2) {
this.sourceMode = m;
this.queue = qd;
this.actual.onSubscribe(this);
return;
}
}
this.queue = new SpscLinkedArrayQueue(this.bufferSize);
this.actual.onSubscribe(this);
}
}
public void onNext(T t) {
if (!this.done) {
if (this.sourceMode != 2) {
this.queue.offer(t);
}
this.schedule();
}
}
public void onError(Throwable t) {
if (this.done) {
RxJavaPlugins.onError(t);
} else {
this.error = t;
this.done = true;
this.schedule();
}
}
public void onComplete() {
if (!this.done) {
this.done = true;
this.schedule();
}
}
public void dispose() {
if (!this.cancelled) {
this.cancelled = true;
this.s.dispose();
this.worker.dispose();
if (this.getAndIncrement() == 0) {
this.queue.clear();
}
}
}
public boolean isDisposed() {
return this.cancelled;
}
void schedule() {
if (this.getAndIncrement() == 0) {
this.worker.schedule(this);
}
}
void drainNormal() {
int missed = 1;
SimpleQueue<T> q = this.queue;
Observer a = this.actual;
do {
if (this.checkTerminated(this.done, q.isEmpty(), a)) {
return;
}
while(true) {
boolean d = this.done;
Object v;
try {
v = q.poll();
} catch (Throwable var7) {
Exceptions.throwIfFatal(var7);
this.s.dispose();
q.clear();
a.onError(var7);
this.worker.dispose();
return;
}
boolean empty = v == null;
if (this.checkTerminated(d, empty, a)) {
return;
}
if (empty) {
missed = this.addAndGet(-missed);
break;
}
a.onNext(v);
}
} while(missed != 0);
}
void drainFused() {
int missed = 1;
do {
if (this.cancelled) {
return;
}
boolean d = this.done;
Throwable ex = this.error;
if (!this.delayError && d && ex != null) {
this.actual.onError(this.error);
this.worker.dispose();
return;
}
this.actual.onNext((Object)null);
if (d) {
ex = this.error;
if (ex != null) {
this.actual.onError(ex);
} else {
this.actual.onComplete();
}
this.worker.dispose();
return;
}
missed = this.addAndGet(-missed);
} while(missed != 0);
}
public void run() {
if (this.outputFused) {
this.drainFused();
} else {
this.drainNormal();
}
}
boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
if (this.cancelled) {
this.queue.clear();
return true;
} else {
if (d) {
Throwable e = this.error;
if (this.delayError) {
if (empty) {
if (e != null) {
a.onError(e);
} else {
a.onComplete();
}
this.worker.dispose();
return true;
}
} else {
if (e != null) {
this.queue.clear();
a.onError(e);
this.worker.dispose();
return true;
}
if (empty) {
a.onComplete();
this.worker.dispose();
return true;
}
}
}
return false;
}
}
public int requestFusion(int mode) {
if ((mode & 2) != 0) {
this.outputFused = true;
return 2;
} else {
return 0;
}
}
@Nullable
public T poll() throws Exception {
return this.queue.poll();
}
public void clear() {
this.queue.clear();
}
public boolean isEmpty() {
return this.queue.isEmpty();
}
}
}
1
AndroidSchedulers.mainThread()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
/** A {@link Scheduler} which executes actions on {@code looper}. */
public static Scheduler from(Looper looper) {
if (looper == null) throw new NullPointerException("looper == null");
return new HandlerScheduler(new Handler(looper));
}
private AndroidSchedulers() {
throw new AssertionError("No instances.");
}
}
1
2
3
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
1
2
3
4
5
6
7
8
@NonNull
public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
if (f == null) {
return defaultScheduler;
}
return apply(f, defaultScheduler);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
public abstract class Scheduler {
/**
* The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.
* <p>
* The associated system parameter, {@code rx.scheduler.drift-tolerance}, expects its value in minutes.
*/
static final long CLOCK_DRIFT_TOLERANCE_NANOSECONDS;
static {
CLOCK_DRIFT_TOLERANCE_NANOSECONDS = TimeUnit.MINUTES.toNanos(
Long.getLong("rx2.scheduler.drift-tolerance", 15));
}
/**
* Returns the clock drift tolerance in nanoseconds.
* <p>Related system property: {@code rx2.scheduler.drift-tolerance} in minutes
* @return the tolerance in nanoseconds
* @since 2.0
*/
public static long clockDriftTolerance() {
return CLOCK_DRIFT_TOLERANCE_NANOSECONDS;
}
/**
* Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
* <p>
* When work is completed it should be unsubscribed using {@link Scheduler.Worker#dispose()}.
* <p>
* Work on a {@link Scheduler.Worker} is guaranteed to be sequential.
*
* @return a Worker representing a serial queue of actions to be executed
*/
@NonNull
public abstract Worker createWorker();
/**
* Returns the 'current time' of the Scheduler in the specified time unit.
* @param unit the time unit
* @return the 'current time'
* @since 2.0
*/
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* Allows the Scheduler instance to start threads
* and accept tasks on them.
* <p>Implementations should make sure the call is idempotent and thread-safe.
* @since 2.0
*/
public void start() {
}
/**
* Instructs the Scheduler instance to stop threads
* and stop accepting tasks on any outstanding Workers.
* <p>Implementations should make sure the call is idempotent and thread-safe.
* @since 2.0
*/
public void shutdown() {
}
/**
* Schedules the given task on this scheduler non-delayed execution.
*
* <p>
* This method is safe to be called from multiple threads but there are no
* ordering guarantees between tasks.
*
* @param run the task to execute
*
* @return the Disposable instance that let's one cancel this particular task.
* @since 2.0
*/
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
/**
* Schedules the execution of the given task with the given delay amount.
*
* <p>
* This method is safe to be called from multiple threads but there are no
* ordering guarantees between tasks.
*
* @param run the task to schedule
* @param delay the delay amount, non-positive values indicate non-delayed scheduling
* @param unit the unit of measure of the delay amount
* @return the Disposable that let's one cancel this particular delayed task.
* @since 2.0
*/
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
/**
* Schedules a periodic execution of the given task with the given initial delay and period.
*
* <p>
* This method is safe to be called from multiple threads but there are no
* ordering guarantees between tasks.
*
* <p>
* The periodic execution is at a fixed rate, that is, the first execution will be after the initial
* delay, the second after initialDelay + period, the third after initialDelay + 2 * period, and so on.
*
* @param run the task to schedule
* @param initialDelay the initial delay amount, non-positive values indicate non-delayed scheduling
* @param period the period at which the task should be re-executed
* @param unit the unit of measure of the delay amount
* @return the Disposable that let's one cancel this particular delayed task.
* @since 2.0
*/
@NonNull
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);
Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
return periodicTask;
}
/**
* Allows the use of operators for controlling the timing around when
* actions scheduled on workers are actually done. This makes it possible to
* layer additional behavior on this {@link Scheduler}. The only parameter
* is a function that flattens an {@link Flowable} of {@link Flowable}
* of {@link Completable}s into just one {@link Completable}. There must be
* a chain of operators connecting the returned value to the source
* {@link Flowable} otherwise any work scheduled on the returned
* {@link Scheduler} will not be executed.
* <p>
* When {@link Scheduler#createWorker()} is invoked a {@link Flowable} of
* {@link Completable}s is onNext'd to the combinator to be flattened. If
* the inner {@link Flowable} is not immediately subscribed to an calls to
* {@link Worker#schedule} are buffered. Once the {@link Flowable} is
* subscribed to actions are then onNext'd as {@link Completable}s.
* <p>
* Finally the actions scheduled on the parent {@link Scheduler} when the
* inner most {@link Completable}s are subscribed to.
* <p>
* When the {@link Worker} is unsubscribed the {@link Completable} emits an
* onComplete and triggers any behavior in the flattening operator. The
* {@link Flowable} and all {@link Completable}s give to the flattening
* function never onError.
* <p>
* Limit the amount concurrency two at a time without creating a new fix
* size thread pool:
*
* <pre>
* Scheduler limitScheduler = Schedulers.computation().when(workers -> {
* // use merge max concurrent to limit the number of concurrent
* // callbacks two at a time
* return Completable.merge(Flowable.merge(workers), 2);
* });
* </pre>
* <p>
* This is a slightly different way to limit the concurrency but it has some
* interesting benefits and drawbacks to the method above. It works by
* limited the number of concurrent {@link Worker}s rather than individual
* actions. Generally each {@link Flowable} uses its own {@link Worker}.
* This means that this will essentially limit the number of concurrent
* subscribes. The danger comes from using operators like
* {@link Flowable#zip(org.reactivestreams.Publisher, org.reactivestreams.Publisher, io.reactivex.functions.BiFunction)} where
* subscribing to the first {@link Flowable} could deadlock the
* subscription to the second.
*
* <pre>
* Scheduler limitScheduler = Schedulers.computation().when(workers -> {
* // use merge max concurrent to limit the number of concurrent
* // Flowables two at a time
* return Completable.merge(Flowable.merge(workers, 2));
* });
* </pre>
*
* Slowing down the rate to no more than than 1 a second. This suffers from
* the same problem as the one above I could find an {@link Flowable}
* operator that limits the rate without dropping the values (aka leaky
* bucket algorithm).
*
* <pre>
* Scheduler slowScheduler = Schedulers.computation().when(workers -> {
* // use concatenate to make each worker happen one at a time.
* return Completable.concat(workers.map(actions -> {
* // delay the starting of the next worker by 1 second.
* return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
* }));
* });
* </pre>
*
* <p>History: 2.0.1 - experimental
* @param <S> a Scheduler and a Subscription
* @param combine the function that takes a two-level nested Flowable sequence of a Completable and returns
* the Completable that will be subscribed to and should trigger the execution of the scheduled Actions.
* @return the Scheduler with the customized execution behavior
* @since 2.1
*/
@SuppressWarnings("unchecked")
@NonNull
public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flowable<Completable>>, Completable> combine) {
return (S) new SchedulerWhen(combine, this);
}
/**
* Sequential Scheduler for executing actions on a single thread or event loop.
* <p>
* Disposing the {@link Worker} cancels all outstanding work and allows resource cleanup.
*/
public abstract static class Worker implements Disposable {
/**
* Schedules a Runnable for execution without delay.
*
* <p>The default implementation delegates to {@link #schedule(Runnable, long, TimeUnit)}.
*
* @param run
* Runnable to schedule
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
*/
@NonNull
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
/**
* Schedules an Runnable for execution at some point in the future.
* <p>
* Note to implementors: non-positive {@code delayTime} should be regarded as non-delayed schedule, i.e.,
* as if the {@link #schedule(Runnable)} was called.
*
* @param run
* the Runnable to schedule
* @param delay
* time to wait before executing the action; non-positive values indicate an non-delayed
* schedule
* @param unit
* the time unit of {@code delayTime}
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
*/
@NonNull
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
/**
* Schedules a cancelable action to be executed periodically. This default implementation schedules
* recursively and waits for actions to complete (instead of potentially executing long-running actions
* concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
* <p>
* Note to implementors: non-positive {@code initialTime} and {@code period} should be regarded as
* non-delayed scheduling of the first and any subsequent executions.
*
* @param run
* the Runnable to execute periodically
* @param initialDelay
* time to wait before executing the action for the first time; non-positive values indicate
* an non-delayed schedule
* @param period
* the time interval to wait each time in between executing the action; non-positive values
* indicate no delay between repeated schedules
* @param unit
* the time unit of {@code period}
* @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
*/
@NonNull
public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
final SequentialDisposable first = new SequentialDisposable();
final SequentialDisposable sd = new SequentialDisposable(first);
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
final long periodInNanoseconds = unit.toNanos(period);
final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);
Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
periodInNanoseconds), initialDelay, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
first.replace(d);
return sd;
}
/**
* Returns the 'current time' of the Worker in the specified time unit.
* @param unit the time unit
* @return the 'current time'
* @since 2.0
*/
public long now(@NonNull TimeUnit unit) {
return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
/**
* Holds state and logic to calculate when the next delayed invocation
* of this task has to happen (accounting for clock drifts).
*/
final class PeriodicTask implements Runnable {
@NonNull
final Runnable decoratedRun;
@NonNull
final SequentialDisposable sd;
final long periodInNanoseconds;
long count;
long lastNowNanoseconds;
long startInNanoseconds;
PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun,
long firstNowNanoseconds, @NonNull SequentialDisposable sd, long periodInNanoseconds) {
this.decoratedRun = decoratedRun;
this.sd = sd;
this.periodInNanoseconds = periodInNanoseconds;
lastNowNanoseconds = firstNowNanoseconds;
startInNanoseconds = firstStartInNanoseconds;
}
@Override
public void run() {
decoratedRun.run();
if (!sd.isDisposed()) {
long nextTick;
long nowNanoseconds = now(TimeUnit.NANOSECONDS);
// If the clock moved in a direction quite a bit, rebase the repetition period
if (nowNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS < lastNowNanoseconds
|| nowNanoseconds >= lastNowNanoseconds + periodInNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS) {
nextTick = nowNanoseconds + periodInNanoseconds;
/*
* Shift the start point back by the drift as if the whole thing
* started count periods ago.
*/
startInNanoseconds = nextTick - (periodInNanoseconds * (++count));
} else {
nextTick = startInNanoseconds + (++count * periodInNanoseconds);
}
lastNowNanoseconds = nowNanoseconds;
long delay = nextTick - nowNanoseconds;
sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
}
}
}
}
static class PeriodicDirectTask
implements Runnable, Disposable {
final Runnable run;
@NonNull
final Worker worker;
@NonNull
volatile boolean disposed;
PeriodicDirectTask(@NonNull Runnable run, @NonNull Worker worker) {
this.run = run;
this.worker = worker;
}
@Override
public void run() {
if (!disposed) {
try {
run.run();
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
worker.dispose();
throw ExceptionHelper.wrapOrThrow(ex);
}
}
}
@Override
public void dispose() {
disposed = true;
worker.dispose();
}
@Override
public boolean isDisposed() {
return disposed;
}
}
static final class DisposeTask implements Runnable, Disposable {
final Runnable decoratedRun;
final Worker w;
Thread runner;
DisposeTask(Runnable decoratedRun, Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();
} finally {
dispose();
runner = null;
}
}
@Override
public void dispose() {
if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
((NewThreadWorker)w).shutdown();
} else {
w.dispose();
}
}
@Override
public boolean isDisposed() {
return w.isDisposed();
}
}
}
0%