欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

使用RxJava2怎么實現(xiàn)線程調(diào)度

使用RxJava2怎么實現(xiàn)線程調(diào)度?很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學習下,希望你能有所收獲。

創(chuàng)新互聯(lián)專注于民豐企業(yè)網(wǎng)站建設(shè),成都響應(yīng)式網(wǎng)站建設(shè)公司,商城網(wǎng)站定制開發(fā)。民豐網(wǎng)站建設(shè)公司,為民豐等地區(qū)提供建站服務(wù)。全流程按需設(shè)計網(wǎng)站,專業(yè)設(shè)計,全程項目跟蹤,創(chuàng)新互聯(lián)專業(yè)和態(tài)度為您提供的服務(wù)

subscribeOn

Observable.subscribeOn()在方法內(nèi)部生成了一個ObservableSubscribeOn對象.

主要看一下ObservableSubscribeOn的subscribeActual方法.

 @Override
  public void subscribeActual(final Observer<? super T> observer) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
    //調(diào)用下游的Observer的onSubscribe方法
    observer.onSubscribe(parent);
    //通過SubscribeTask執(zhí)行了上游Observable的subscribeActual方法
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
  }

scheduler.scheduleDirect(Runnable)用于執(zhí)行SubscribeTask這個任務(wù).SubscribeTask本身是Runnable的實現(xiàn)類.看一下其run方法.

@Override
    public void run() {
      //上游的Observable.subscribe方法被切換到了新的線程
      source.subscribe(parent);
    }

首先可以得出結(jié)論:subscribeOn將上游的Observable的subscribe方法切換到了新的線程.

如果多次調(diào)用subscribeOn切換線程,會有什么效果?

由下往上,每次調(diào)用subscribeOn,都會導致上游的Observable的subscribeActual切換到指定的線程.那么最后一次調(diào)用的切換最上游的創(chuàng)建型操作符的subscribeActual的執(zhí)行線程.如果操作符有默認執(zhí)行線程怎么辦?

操作符默認線程

如果是創(chuàng)建型操作符,處于最上游,那么subscribeOn的線程切換對它不起作用.天高皇帝遠,縣官不如現(xiàn)管.就是這個道理.
如果是其它操作符,會是怎樣的?

以操作符timeout為例:它對應(yīng)ObservableTimeoutTimed和TimeoutObserver

 @Override
    public void onNext(T t) {
      downstream.onNext(t);
      //超時計時
      startTimeout(idx + 1);
    }

    void startTimeout(long nextIndex) {
      //交給操作符默認的線程執(zhí)行
      task.replace(worker.schedule(new TimeoutTask(nextIndex, this), timeout, unit));
    }

    @Override
    public void onError(Throwable t) {
        downstream.onError(t); 
    }

    @Override
    public void onComplete() {
        downstream.onComplete();
      }
    }

    @Override
    public void onTimeout(long idx) {
        downstream.onError(new TimeoutException(timeoutMessage(timeout, unit)));
    }
//TimeoutTask.java
static final class TimeoutTask implements Runnable {

    @Override
    public void run() {
      parent.onTimeout(idx);
    }
  }

可以看到操作符默認的執(zhí)行線程只用來做超時計時任務(wù),如果超時了,會在操作符的默認線程執(zhí)行onError方法..操作符默認線程對下游的observer造成什么影響要做具體對待.

observeOn

observeOn對應(yīng)ObservableObserveOnObserveOnObserver.

 //ObservableObserveOn.java
 @Override
  protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
      source.subscribe(observer);
    } else {
      Scheduler.Worker w = scheduler.createWorker();
      source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
  }
 //ObserveOnObserver.java 
  @Override
    public void onSubscribe(Disposable d) {
      if (DisposableHelper.validate(this.upstream, d)) {
        if (d instanceof QueueDisposable) {
          if (m == QueueDisposable.SYNC) {
          //執(zhí)行下游Observer的onSubscribe方法
            downstream.onSubscribe(this);
            schedule();
            return;
          }
          if (m == QueueDisposable.ASYNC) {
           //執(zhí)行下游Observer的onSubscribe方法
            downstream.onSubscribe(this);
            return;
          }
        }
         //執(zhí)行下游Observer的onSubscribe方法
        downstream.onSubscribe(this);
      }
    }
    @Override
    public void onNext(T t) {
     //省略
      schedule();
    }
    @Override
    public void onError(Throwable t) {
     //省略
      schedule();
    }
     void schedule() {
      if (getAndIncrement() == 0) {
      /*
      ObserveOnObserver是Runnable的實現(xiàn)類.交給線程池執(zhí)行
      */
        worker.schedule(this);
      }
    }
    
    
    void drainNormal() {
      final Observer<? super T> a = downstream;
      for (;;) {
        for (;;) {
          T v;
          try {
            v = q.poll();
          } catch (Throwable ex) {
            a.onError(ex);
            return;
          }
          //執(zhí)行下游Observer的onNext方法
          a.onNext(v);
        }
      }
    }

    void drainFused() {
      for (;;) {
        if (!delayError && d && ex != null) {
          //執(zhí)行下游Observer的onError方法
          downstream.onError(error);
          return;
        }
        downstream.onNext(null);
        if (d) {
          ex = error;
          if (ex != null) {
            //執(zhí)行下游Observer的onError方法
            downstream.onError(ex);
          } else {
            //執(zhí)行下游Observer的onComplete方法
            downstream.onComplete();
          }
          return;
        }
      }
    }
    //執(zhí)行線程任務(wù)
    @Override
    public void run() {
      if (outputFused) {
        drainFused();
      } else {
        drainNormal();
      }
    }

從上面可以看出ObservableObserveOn在其subscribeActual方法中并沒有切換上游Observable的subscribe方法的執(zhí)行線程.但是ObserveOnObserver在其onNext,onError和onComplete中通過schedule()方法將下游Observer的各個方法切換到了新的線程.

得出結(jié)論: observeOn負責切換的是下游Observer的各個方法的執(zhí)行線程

如果下游多次通過observeOn切換線程,會有什么效果?

每次切換都會對其下游造成影響,直到遇到下一個observeOn為止.

Observer(onSubscribe,onNext,onError,onComplete)

onNext,onError,onComplete與上游最近的observeOn所切換的線程保持一致.onSubscribe則不同.
遇到線程切換的時候,會首先在對應(yīng)的Observable的subscribeActual方法內(nèi),先調(diào)用observer.onSubscribe方法.而observer.onSubscribe會逐級向上傳遞直到最上游,而最上游的observer.onSubscribe是在subscribeActual方法內(nèi)調(diào)用,這是在主線程執(zhí)行的.所以onSubscribe方法無論如何都是在主線程執(zhí)行.

doOnSubscribe

.doOnSubscribe(new Consumer<Disposable>() {
          @Override
          public void accept(Disposable disposable) throws Exception {
           
          }
        })

我們要看的是方法accept的執(zhí)行線程.

通過源碼找到對應(yīng)的DisposableLambdaObserver.

 @Override
  public void onSubscribe(Disposable d) {
  //在這里調(diào)用了accept方法.
      onSubscribe.accept(d);
  }

這就要看上游在哪個線程執(zhí)行了Observer.onSubscribe(disposable)方法.

在創(chuàng)建型操作符的subscribeActual方法和subscribeOn對應(yīng)的Observable的subscribeActual方法內(nèi)調(diào)用了Observer.onSubscribe(disposable)方法.那么這兩處的執(zhí)行線程就決定了onSubscribe.accept(d);的執(zhí)行線程.

doFinally

對應(yīng)ObservableDoFinally和DoFinallyObserver

 //DoFinallyObserver.java
 @Override
    public void onError(Throwable t) {
      runFinally();
    }

    @Override
    public void onComplete() {
      runFinally();
    }

    @Override
    public void dispose() {
      runFinally();
    }
    
     void runFinally() {
       onFinally.run();
    }

可以看到與它所對應(yīng)的DoFinallyObserver的onError,onComplete,dispose方法的執(zhí)行線程有關(guān),這三個方法的執(zhí)行線程又受到上游的observeOn的影響.如果沒有observeOn,則會受到最上游的observable.subscribeActual方法影響.

doOnError

對應(yīng)ObservableDoOnEach和DoOnEachObserver

//DoOnEachObserver.java
 @Override
    public void onError(Throwable t) {
        onError.accept(t);
    }

和自身對應(yīng)的observer.onError所在線程保持一致.

doOnNext

對應(yīng)ObservableDoOnEach和DoOnEachObserver

//DoOnEachObserver.java
 @Override
    public void onNext(T t) {
        onNext.accept(t);
    }

和自身對應(yīng)的observer.onNext所在線程保持一致.

操作符對應(yīng)方法參數(shù)的執(zhí)行線程

包io.reactivex.functions下的接口類一般用于處理上游數(shù)據(jù)然后往下傳遞.這些接口類的方法一般在對應(yīng)的observer.onNext中調(diào)用.所以他們的線程保持一致.

看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進一步的了解或閱讀更多相關(guān)文章,請關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝您對創(chuàng)新互聯(lián)的支持。

網(wǎng)站題目:使用RxJava2怎么實現(xiàn)線程調(diào)度
標題鏈接:http://www.aaarwkj.com/article42/gjoeec.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供外貿(mào)建站、建站公司、品牌網(wǎng)站設(shè)計、品牌網(wǎng)站制作、手機網(wǎng)站建設(shè)、ChatGPT

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站網(wǎng)頁設(shè)計
熟女精品国产一区二区三区 | 日韩中文字幕在线乱码| 欧美亚洲国产青草久久| 欧美日韩一区二区黄色| 少妇精品偷拍高潮少妇在线观看| 91精品国产自产在线观看| 亚洲另类偷拍校园伦理| 亚洲精品二区在线播放| 亚洲av色男人天堂网| 亚洲国产女人精品久久久| 天堂av在线播放观看| 亚洲大陆免费在线视频| 一区二区三区在线观看淫| 白嫩少妇情久久密月久久| av免费观看男人的天堂| 五月婷婷六月丁香俺来也| 国产一区黄片视频在线观看| 亚洲精品不卡一区二区| 中文字幕日本乱码精品久久| 欧美三级亚洲三级日韩三级| 日韩丰满少妇在线观看| 精品人妻一区二区三区蜜桃电| 国产成人综合亚洲欧美在线| 国产91在线观看网站| 亚洲巨人精品福利导航| 最新日本人妻中文字幕| 亚洲国产精品性色av| 亚洲一区二区精品欧美日韩| 欧美日韩一区二区三区666| 日本一区二区欧美亚洲国产| 亚洲熟妇av乱码在线| 亚洲精品隔壁傲慢人妻| 中文字幕乱码在线观看一区| 一区二区三区在线观看淫| 东京男人的天堂国产av| 亚洲男人av天堂东京热| 在线播放欧美视频91| 激情亚洲综合一区二区| 99精品国产高清一区二区三区| 亚洲综合成人av在线| 国产欧美一区二区三区久久|