Rxjava2源码分析

前言

本文将讲述Rxjava的相关应用和原理,包括基本原理,操作符,线程调用。

1、基本用法

本文解读是rxjava的2.1.0版本,对应的rxandroid版本为2.0.1。
先看看基本用法

  Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                mRxOperatorsText.append("accept : " + s +"\n");
                Log.e(TAG, "accept : " + s +"\n" );
            }
        });

2、原理探究

Rxjava主要要素是Observable(被观察者)、Observer(观察者)、subscribe(订阅)和事件。理解这些我们接下来往下看。

2.1 Observable.create

跟踪Observable.create

  @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

传入的是ObservableOnSubscribe类型对象,方法的返回值是调用RxJavaPlugins.onAssembly()

public interface ObservableOnSubscribe<T> {

    void subscribe(@NonNull ObservableEmitter<T> e) throws Exception;
}

onAssembly

在RxJavaPlugins里面看到

@SuppressWarnings({ "rawtypes", "unchecked" })
    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;//1
        if (f != null) {
            return apply(f, source); //2
        }
        return source;//3
    }

在这里可以看出RxJavaPlugin的作用是方便测试和追踪。可以配合instanceof 等等替换某些你想追踪的Obseravble/Observer/Scheduler等等 。你想想onObservableAssembly不为null的情形。

//1 onObservableAssembly

onObservableAssembly是个对象属性,你可以set和get来控制你的对象,主要是用来测试用的。由于没set该对象,此时的f为null,所以返回 source,也就是上面的基本用法里面new 出来的ObservableOnSubscribe对象

    static volatile Function<? super Observable, ? extends Observable> onObservableAssembly;

Function类只有一个方法

public interface Function<T, R> {
    @NonNull
    R apply(@NonNull T t) throws Exception;
}

接下来看看ObservableCreate里面有什么

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    ......
}

请记住这个subscribeActual和这个ObservableCreate,下面会用到。

2.2 observable.subscribe(consumer)

下面是订阅的环节。

public final Disposable subscribe(Consumer<? super T> onNext) {
        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
    }

往下追

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete, Consumer<? super Disposable> onSubscribe) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(onError, "onError is null");
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

        LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);//1

        subscribe(ls);//2

        return ls;
    }

//1 LambdaObserver

这里新建了一个LambdaObserver类型的对象,把传入的参数转换了。

public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {

    private static final long serialVersionUID = -7251123623727029452L;
    final Consumer<? super T> onNext;
    final Consumer<? super Throwable> onError;
    final Action onComplete;
    final Consumer<? super Disposable> onSubscribe;

    public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
            Action onComplete,
            Consumer<? super Disposable> onSubscribe) {
        super();
        this.onNext = onNext;
        this.onError = onError;
        this.onComplete = onComplete;
        this.onSubscribe = onSubscribe;
    }

    @Override
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.setOnce(this, s)) {
            try {
                onSubscribe.accept(this);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                s.dispose();
                onError(ex);
            }
        }
    }

    @Override
    public void onNext(T t) {
        if (!isDisposed()) {
            try {
                onNext.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                get().dispose();
                onError(e);
            }
        }
    }
     @Override
    public void onError(Throwable t) {
        if (!isDisposed()) {
            lazySet(DisposableHelper.DISPOSED);
            try {
                onError.accept(t);
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                RxJavaPlugins.onError(new CompositeException(t, e));
            }
        }
    }

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            lazySet(DisposableHelper.DISPOSED);
            try {
                onComplete.run();
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                RxJavaPlugins.onError(e);
            }
        }
    }

    @Override
    public void dispose() {
        DisposableHelper.dispose(this);
    }

    @Override
    public boolean isDisposed() {
        return get() == DisposableHelper.DISPOSED;
    }
}

这里可以看得出LambdaObserver是封装了一层,用来隔绝参数用。

//2 subscribe(ls)

public abstract class Observable<T> implements ObservableSource<T> {
......
 public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer); //3

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer); //4
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

protected abstract void subscribeActual(Observer<? super T> observer);

......

//3 RxJavaPlugins.onSubscribe

 public static <T> Observer<? super T> onSubscribe(@NonNull Observable<T> source, @NonNull Observer<? super T> observer) {
        BiFunction<? super Observable, ? super Observer, ? extends Observer> f = onObservableSubscribe;
        if (f != null) {
            return apply(f, source, observer);
        }
        return observer;
    }

这里同样只原路返回observer,因为f为null。

//4 subscribeActual(observer)

追踪subscribeActual只是一个抽象方法。这时调用的是当初ObservableCreate里面实现的方法。

    protected abstract void subscribeActual(Observer<? super T> observer);

ObservableCreate里面的实现

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer); //1
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);//2
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

2.3 subscribeActual

//1 CreateEmitter < T > parent = new CreateEmitter< T >(observer);

注意上面,observer传入的对象其实就是LambdaObserver。这个CreateEmitter给LambdaObserver封装了一层

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {


        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

         @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }

observer.onSubscribe(parent);

这里是传入系统默认值,这里为空,可以认为不起作用。

//2 source.subscribe(parent);

看到这个source没,这个是source当初new出来的对象并传进来的。

new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }

所以在new ObservableOnSubscribe里面的subscribe调用的e.onNext就调用到了后来new Consumer里面的 accept方法。

new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                mRxOperatorsText.append("accept : " + s +"\n");
                Log.e(TAG, "accept : " + s +"\n" );
            }
        }

这一篇将讲下Rxjava2的操作符。在Rxjava里面最常用的是map操作符,接下来将从map入手分析。

1.map操作符的基本用法

这里按照国际惯例,先上map的基本用法。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onNext(3);
            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(@NonNull Integer integer) throws Exception {
                return "This is result " + integer;
            }
        }).subscribe(new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                mRxOperatorsText.append("accept : " + s +"\n");
                Log.e(TAG, "accept : " + s +"\n" );
            }
        });

2.map源码跟踪

2.1 map

跟踪map进去,进入Observable里面,你会发现同样有RxJavaPlugins.onAssembly这东西。

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }

RxJavaPlugins.onAssembly上文已经说过了,这次主要关注ObservableMap。new ObservableMap<T, R>(this, mapper)里面的this是create返回的Observable对象,mapper是你给的Function对象。

2.2 ObservableMap

public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    final Function<? super T, ? extends U> function;

    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
        super(source);
        this.function = function;
    }

    @Override
    public void subscribeActual(Observer<? super U> t) { 
        source.subscribe(new MapObserver<T, U>(t, function));
    }


    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {
            if (done) { //默认done为false,跳过
                return;
            }

            if (sourceMode != NONE) { //默认sourceMode 为0,跳过
                actual.onNext(null);
                return;
            }

            U v;

            try {
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");//1
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            actual.onNext(v);//2
        }

        @Override
        public int requestFusion(int mode) {
            return transitiveBoundaryFusion(mode);
        }

        @Nullable
        @Override
        public U poll() throws Exception {
            T t = qs.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        }
    }
}

AbstractObservableWithUpstream

AbstractObservableWithUpstream是什么?它继承了Observable类,把ObservableSource对象保存了起来。

abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    /** The source consumable Observable. */
    protected final ObservableSource<T> source;

    /**
     * Constructs the ObservableSource with the given consumable.
     * @param source the consumable Observable
     */
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }

}

//1 mapper.apply(t)

在ObservableMap里面,subscribeActual,将完成上游Observable的订阅。MapObserver里面的mapper.apply(t)将上游的t转换成下游所需的U。这里怎么变换是你一开始时候就设定好的。这里我们当初传入map里面的Function如下。

new Function<Integer, String>() {
            @Override
            public String apply(@NonNull Integer integer) throws Exception {
                return "This is result " + integer;
            }
        }

//2 actual.onNext(v)

actual.onNext(v); 是交接给下游的Observer。actual对应本例就是当初create的对象,执行onNext方法就是执行当初自己定义的操作。

new Consumer<String>() {
            @Override
            public void accept(@NonNull String s) throws Exception {
                mRxOperatorsText.append("accept : " + s +"\n");
                Log.e(TAG, "accept : " + s +"\n" );
            }
        }

至此,当初的e.onNext(1) 将得到响应。

总结一下。订阅过程是最后一个Observable往上一个Observable订阅(本例中是MapObserver)。从最后一个Observable里面的subscribe有个subscribeActual方法调用,这个方法会一层层订阅,直至第一个Observable。然后触发第一个Observable的subscribe,这里面会触发数据流操作。在本例中,订阅至到第一个Observable的subscribe实现类,里面的e.onNext(1); 会触发数据一层层往下流,下一个是MapObserver对数据的操作,到最后的Observable。

Rxjava2里面最惊艳的莫过于线程调度吗,下文将探讨一下线程调度的原理。

1. 基本用法

Observable.create((ObservableOnSubscribe<Integer>) e -> { 
        e.onNext(1); 
        e.onNext(2); 
        e.onComplete(); })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(
        i -> System.out.println("onNext : i= " + i));
    }

observeOn

 @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
 @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
@SuppressWarnings({ "rawtypes", "unchecked" })
    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) {
            return apply(f, source);
        }
        return source;
    }
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;

    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    protected void subscribeActual(Observer<? super T> observer) {
        if (this.scheduler instanceof TrampolineScheduler) {
            this.source.subscribe(observer);
        } else {
            Worker w = this.scheduler.createWorker();
            this.source.subscribe(new ObservableObserveOn.ObserveOnObserver(observer, w, this.delayError, this.bufferSize));
        }

    }

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
        private static final long serialVersionUID = 6576896619930983584L;
        final Observer<? super T> actual;
        final Worker worker;
        final boolean delayError;
        final int bufferSize;
        SimpleQueue<T> queue;
        Disposable s;
        Throwable error;
        volatile boolean done;
        volatile boolean cancelled;
        int sourceMode;
        boolean outputFused;

        ObserveOnObserver(Observer<? super T> actual, Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }

        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    QueueDisposable<T> qd = (QueueDisposable)s;
                    int m = qd.requestFusion(7);
                    if (m == 1) {
                        this.sourceMode = m;
                        this.queue = qd;
                        this.done = true;
                        this.actual.onSubscribe(this);
                        this.schedule();
                        return;
                    }

                    if (m == 2) {
                        this.sourceMode = m;
                        this.queue = qd;
                        this.actual.onSubscribe(this);
                        return;
                    }
                }

                this.queue = new SpscLinkedArrayQueue(this.bufferSize);
                this.actual.onSubscribe(this);
            }

        }

        public void onNext(T t) {
            if (!this.done) {
                if (this.sourceMode != 2) {
                    this.queue.offer(t);
                }

                this.schedule();
            }
        }

        public void onError(Throwable t) {
            if (this.done) {
                RxJavaPlugins.onError(t);
            } else {
                this.error = t;
                this.done = true;
                this.schedule();
            }
        }

        public void onComplete() {
            if (!this.done) {
                this.done = true;
                this.schedule();
            }
        }

        public void dispose() {
            if (!this.cancelled) {
                this.cancelled = true;
                this.s.dispose();
                this.worker.dispose();
                if (this.getAndIncrement() == 0) {
                    this.queue.clear();
                }
            }

        }

        public boolean isDisposed() {
            return this.cancelled;
        }

        void schedule() {
            if (this.getAndIncrement() == 0) {
                this.worker.schedule(this);
            }

        }

        void drainNormal() {
            int missed = 1;
            SimpleQueue<T> q = this.queue;
            Observer a = this.actual;

            do {
                if (this.checkTerminated(this.done, q.isEmpty(), a)) {
                    return;
                }

                while(true) {
                    boolean d = this.done;

                    Object v;
                    try {
                        v = q.poll();
                    } catch (Throwable var7) {
                        Exceptions.throwIfFatal(var7);
                        this.s.dispose();
                        q.clear();
                        a.onError(var7);
                        this.worker.dispose();
                        return;
                    }

                    boolean empty = v == null;
                    if (this.checkTerminated(d, empty, a)) {
                        return;
                    }

                    if (empty) {
                        missed = this.addAndGet(-missed);
                        break;
                    }

                    a.onNext(v);
                }
            } while(missed != 0);

        }

        void drainFused() {
            int missed = 1;

            do {
                if (this.cancelled) {
                    return;
                }

                boolean d = this.done;
                Throwable ex = this.error;
                if (!this.delayError && d && ex != null) {
                    this.actual.onError(this.error);
                    this.worker.dispose();
                    return;
                }

                this.actual.onNext((Object)null);
                if (d) {
                    ex = this.error;
                    if (ex != null) {
                        this.actual.onError(ex);
                    } else {
                        this.actual.onComplete();
                    }

                    this.worker.dispose();
                    return;
                }

                missed = this.addAndGet(-missed);
            } while(missed != 0);

        }

        public void run() {
            if (this.outputFused) {
                this.drainFused();
            } else {
                this.drainNormal();
            }

        }

        boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
            if (this.cancelled) {
                this.queue.clear();
                return true;
            } else {
                if (d) {
                    Throwable e = this.error;
                    if (this.delayError) {
                        if (empty) {
                            if (e != null) {
                                a.onError(e);
                            } else {
                                a.onComplete();
                            }

                            this.worker.dispose();
                            return true;
                        }
                    } else {
                        if (e != null) {
                            this.queue.clear();
                            a.onError(e);
                            this.worker.dispose();
                            return true;
                        }

                        if (empty) {
                            a.onComplete();
                            this.worker.dispose();
                            return true;
                        }
                    }
                }

                return false;
            }
        }

        public int requestFusion(int mode) {
            if ((mode & 2) != 0) {
                this.outputFused = true;
                return 2;
            } else {
                return 0;
            }
        }

        @Nullable
        public T poll() throws Exception {
            return this.queue.poll();
        }

        public void clear() {
            this.queue.clear();
        }

        public boolean isEmpty() {
            return this.queue.isEmpty();
        }
    }
}
AndroidSchedulers.mainThread()
public final class AndroidSchedulers {

    private static final class MainHolder {

        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });

    /** A {@link Scheduler} which executes actions on the Android main thread. */
    public static Scheduler mainThread() {
        return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
    }

    /** A {@link Scheduler} which executes actions on {@code looper}. */
    public static Scheduler from(Looper looper) {
        if (looper == null) throw new NullPointerException("looper == null");
        return new HandlerScheduler(new Handler(looper));
    }

    private AndroidSchedulers() {
        throw new AssertionError("No instances.");
    }
}
public static Scheduler io() {
    return RxJavaPlugins.onIoScheduler(IO);
}
@NonNull
public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
    Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
    if (f == null) {
        return defaultScheduler;
    }
    return apply(f, defaultScheduler);
}
public abstract class Scheduler {
    /**
     * The tolerance for a clock drift in nanoseconds where the periodic scheduler will rebase.
     * <p>
     * The associated system parameter, {@code rx.scheduler.drift-tolerance}, expects its value in minutes.
     */
    static final long CLOCK_DRIFT_TOLERANCE_NANOSECONDS;
    static {
        CLOCK_DRIFT_TOLERANCE_NANOSECONDS = TimeUnit.MINUTES.toNanos(
                Long.getLong("rx2.scheduler.drift-tolerance", 15));
    }

    /**
     * Returns the clock drift tolerance in nanoseconds.
     * <p>Related system property: {@code rx2.scheduler.drift-tolerance} in minutes
     * @return the tolerance in nanoseconds
     * @since 2.0
     */
    public static long clockDriftTolerance() {
        return CLOCK_DRIFT_TOLERANCE_NANOSECONDS;
    }


    /**
     * Retrieves or creates a new {@link Scheduler.Worker} that represents serial execution of actions.
     * <p>
     * When work is completed it should be unsubscribed using {@link Scheduler.Worker#dispose()}.
     * <p>
     * Work on a {@link Scheduler.Worker} is guaranteed to be sequential.
     *
     * @return a Worker representing a serial queue of actions to be executed
     */
    @NonNull
    public abstract Worker createWorker();

    /**
     * Returns the 'current time' of the Scheduler in the specified time unit.
     * @param unit the time unit
     * @return the 'current time'
     * @since 2.0
     */
    public long now(@NonNull TimeUnit unit) {
        return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
    }

    /**
     * Allows the Scheduler instance to start threads
     * and accept tasks on them.
     * <p>Implementations should make sure the call is idempotent and thread-safe.
     * @since 2.0
     */
    public void start() {

    }

    /**
     * Instructs the Scheduler instance to stop threads
     * and stop accepting tasks on any outstanding Workers.
     * <p>Implementations should make sure the call is idempotent and thread-safe.
     * @since 2.0
     */
    public void shutdown() {

    }

    /**
     * Schedules the given task on this scheduler non-delayed execution.
     *
     * <p>
     * This method is safe to be called from multiple threads but there are no
     * ordering guarantees between tasks.
     *
     * @param run the task to execute
     *
     * @return the Disposable instance that let's one cancel this particular task.
     * @since 2.0
     */
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

    /**
     * Schedules the execution of the given task with the given delay amount.
     *
     * <p>
     * This method is safe to be called from multiple threads but there are no
     * ordering guarantees between tasks.
     *
     * @param run the task to schedule
     * @param delay the delay amount, non-positive values indicate non-delayed scheduling
     * @param unit the unit of measure of the delay amount
     * @return the Disposable that let's one cancel this particular delayed task.
     * @since 2.0
     */
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

    /**
     * Schedules a periodic execution of the given task with the given initial delay and period.
     *
     * <p>
     * This method is safe to be called from multiple threads but there are no
     * ordering guarantees between tasks.
     *
     * <p>
     * The periodic execution is at a fixed rate, that is, the first execution will be after the initial
     * delay, the second after initialDelay + period, the third after initialDelay + 2 * period, and so on.
     *
     * @param run the task to schedule
     * @param initialDelay the initial delay amount, non-positive values indicate non-delayed scheduling
     * @param period the period at which the task should be re-executed
     * @param unit the unit of measure of the delay amount
     * @return the Disposable that let's one cancel this particular delayed task.
     * @since 2.0
     */
    @NonNull
    public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);

        Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
        if (d == EmptyDisposable.INSTANCE) {
            return d;
        }

        return periodicTask;
    }

    /**
     * Allows the use of operators for controlling the timing around when
     * actions scheduled on workers are actually done. This makes it possible to
     * layer additional behavior on this {@link Scheduler}. The only parameter
     * is a function that flattens an {@link Flowable} of {@link Flowable}
     * of {@link Completable}s into just one {@link Completable}. There must be
     * a chain of operators connecting the returned value to the source
     * {@link Flowable} otherwise any work scheduled on the returned
     * {@link Scheduler} will not be executed.
     * <p>
     * When {@link Scheduler#createWorker()} is invoked a {@link Flowable} of
     * {@link Completable}s is onNext'd to the combinator to be flattened. If
     * the inner {@link Flowable} is not immediately subscribed to an calls to
     * {@link Worker#schedule} are buffered. Once the {@link Flowable} is
     * subscribed to actions are then onNext'd as {@link Completable}s.
     * <p>
     * Finally the actions scheduled on the parent {@link Scheduler} when the
     * inner most {@link Completable}s are subscribed to.
     * <p>
     * When the {@link Worker} is unsubscribed the {@link Completable} emits an
     * onComplete and triggers any behavior in the flattening operator. The
     * {@link Flowable} and all {@link Completable}s give to the flattening
     * function never onError.
     * <p>
     * Limit the amount concurrency two at a time without creating a new fix
     * size thread pool:
     * 
     * <pre>
     * Scheduler limitScheduler = Schedulers.computation().when(workers -> {
     *  // use merge max concurrent to limit the number of concurrent
     *  // callbacks two at a time
     *  return Completable.merge(Flowable.merge(workers), 2);
     * });
     * </pre>
     * <p>
     * This is a slightly different way to limit the concurrency but it has some
     * interesting benefits and drawbacks to the method above. It works by
     * limited the number of concurrent {@link Worker}s rather than individual
     * actions. Generally each {@link Flowable} uses its own {@link Worker}.
     * This means that this will essentially limit the number of concurrent
     * subscribes. The danger comes from using operators like
     * {@link Flowable#zip(org.reactivestreams.Publisher, org.reactivestreams.Publisher, io.reactivex.functions.BiFunction)} where
     * subscribing to the first {@link Flowable} could deadlock the
     * subscription to the second.
     * 
     * <pre>
     * Scheduler limitScheduler = Schedulers.computation().when(workers -> {
     *  // use merge max concurrent to limit the number of concurrent
     *  // Flowables two at a time
     *  return Completable.merge(Flowable.merge(workers, 2));
     * });
     * </pre>
     * 
     * Slowing down the rate to no more than than 1 a second. This suffers from
     * the same problem as the one above I could find an {@link Flowable}
     * operator that limits the rate without dropping the values (aka leaky
     * bucket algorithm).
     * 
     * <pre>
     * Scheduler slowScheduler = Schedulers.computation().when(workers -> {
     *  // use concatenate to make each worker happen one at a time.
     *  return Completable.concat(workers.map(actions -> {
     *      // delay the starting of the next worker by 1 second.
     *      return Completable.merge(actions.delaySubscription(1, TimeUnit.SECONDS));
     *  }));
     * });
     * </pre>
     * 
     * <p>History: 2.0.1 - experimental
     * @param <S> a Scheduler and a Subscription
     * @param combine the function that takes a two-level nested Flowable sequence of a Completable and returns
     * the Completable that will be subscribed to and should trigger the execution of the scheduled Actions.
     * @return the Scheduler with the customized execution behavior
     * @since 2.1
     */
    @SuppressWarnings("unchecked")
    @NonNull
    public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flowable<Completable>>, Completable> combine) {
        return (S) new SchedulerWhen(combine, this);
    }

    /**
     * Sequential Scheduler for executing actions on a single thread or event loop.
     * <p>
     * Disposing the {@link Worker} cancels all outstanding work and allows resource cleanup.
     */
    public abstract static class Worker implements Disposable {
        /**
         * Schedules a Runnable for execution without delay.
         *
         * <p>The default implementation delegates to {@link #schedule(Runnable, long, TimeUnit)}.
         *
         * @param run
         *            Runnable to schedule
         * @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
         */
        @NonNull
        public Disposable schedule(@NonNull Runnable run) {
            return schedule(run, 0L, TimeUnit.NANOSECONDS);
        }

        /**
         * Schedules an Runnable for execution at some point in the future.
         * <p>
         * Note to implementors: non-positive {@code delayTime} should be regarded as non-delayed schedule, i.e.,
         * as if the {@link #schedule(Runnable)} was called.
         *
         * @param run
         *            the Runnable to schedule
         * @param delay
         *            time to wait before executing the action; non-positive values indicate an non-delayed
         *            schedule
         * @param unit
         *            the time unit of {@code delayTime}
         * @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
         */
        @NonNull
        public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);

        /**
         * Schedules a cancelable action to be executed periodically. This default implementation schedules
         * recursively and waits for actions to complete (instead of potentially executing long-running actions
         * concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
         * <p>
         * Note to implementors: non-positive {@code initialTime} and {@code period} should be regarded as
         * non-delayed scheduling of the first and any subsequent executions.
         *
         * @param run
         *            the Runnable to execute periodically
         * @param initialDelay
         *            time to wait before executing the action for the first time; non-positive values indicate
         *            an non-delayed schedule
         * @param period
         *            the time interval to wait each time in between executing the action; non-positive values
         *            indicate no delay between repeated schedules
         * @param unit
         *            the time unit of {@code period}
         * @return a Disposable to be able to unsubscribe the action (cancel it if not executed)
         */
        @NonNull
        public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
            final SequentialDisposable first = new SequentialDisposable();

            final SequentialDisposable sd = new SequentialDisposable(first);

            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

            final long periodInNanoseconds = unit.toNanos(period);
            final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
            final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);

            Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
                    periodInNanoseconds), initialDelay, unit);

            if (d == EmptyDisposable.INSTANCE) {
                return d;
            }
            first.replace(d);

            return sd;
        }

        /**
         * Returns the 'current time' of the Worker in the specified time unit.
         * @param unit the time unit
         * @return the 'current time'
         * @since 2.0
         */
        public long now(@NonNull TimeUnit unit) {
            return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);
        }

        /**
         * Holds state and logic to calculate when the next delayed invocation
         * of this task has to happen (accounting for clock drifts).
         */
        final class PeriodicTask implements Runnable {
            @NonNull
            final Runnable decoratedRun;
            @NonNull
            final SequentialDisposable sd;
            final long periodInNanoseconds;
            long count;
            long lastNowNanoseconds;
            long startInNanoseconds;

            PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun,
                    long firstNowNanoseconds, @NonNull SequentialDisposable sd, long periodInNanoseconds) {
                this.decoratedRun = decoratedRun;
                this.sd = sd;
                this.periodInNanoseconds = periodInNanoseconds;
                lastNowNanoseconds = firstNowNanoseconds;
                startInNanoseconds = firstStartInNanoseconds;
            }

            @Override
            public void run() {
                decoratedRun.run();

                if (!sd.isDisposed()) {

                    long nextTick;

                    long nowNanoseconds = now(TimeUnit.NANOSECONDS);
                    // If the clock moved in a direction quite a bit, rebase the repetition period
                    if (nowNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS < lastNowNanoseconds
                            || nowNanoseconds >= lastNowNanoseconds + periodInNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS) {
                        nextTick = nowNanoseconds + periodInNanoseconds;
                        /*
                         * Shift the start point back by the drift as if the whole thing
                         * started count periods ago.
                         */
                        startInNanoseconds = nextTick - (periodInNanoseconds * (++count));
                    } else {
                        nextTick = startInNanoseconds + (++count * periodInNanoseconds);
                    }
                    lastNowNanoseconds = nowNanoseconds;

                    long delay = nextTick - nowNanoseconds;
                    sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
                }
            }
        }
    }

    static class PeriodicDirectTask
    implements Runnable, Disposable {
        final Runnable run;
        @NonNull
        final Worker worker;
        @NonNull
        volatile boolean disposed;

        PeriodicDirectTask(@NonNull Runnable run, @NonNull Worker worker) {
            this.run = run;
            this.worker = worker;
        }

        @Override
        public void run() {
            if (!disposed) {
                try {
                    run.run();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    worker.dispose();
                    throw ExceptionHelper.wrapOrThrow(ex);
                }
            }
        }

        @Override
        public void dispose() {
            disposed = true;
            worker.dispose();
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }

    static final class DisposeTask implements Runnable, Disposable {
        final Runnable decoratedRun;
        final Worker w;

        Thread runner;

        DisposeTask(Runnable decoratedRun, Worker w) {
            this.decoratedRun = decoratedRun;
            this.w = w;
        }

        @Override
        public void run() {
            runner = Thread.currentThread();
            try {
                decoratedRun.run();
            } finally {
                dispose();
                runner = null;
            }
        }

        @Override
        public void dispose() {
            if (runner == Thread.currentThread() && w instanceof NewThreadWorker) {
                ((NewThreadWorker)w).shutdown();
            } else {
                w.dispose();
            }
        }

        @Override
        public boolean isDisposed() {
            return w.isDisposed();
        }
    }
}

转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。可以在下面评论区评论,也可以邮件至 long8086@foxmail.com

文章标题:Rxjava2源码分析

文章字数:5.1k

本文作者:Okloong

发布时间:2018-06-13, 11:13:01

最后更新:2020-01-06, 21:38:17

原始链接:https://okloong.com/2018/06/13/Rxjava2/

版权声明: "署名-非商用-相同方式共享 4.0" 转载请保留原文链接及作者。

目录
×

喜欢就点赞,疼爱就打赏