欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

生產(chǎn)常用Spark累加器剖析之二-創(chuàng)新互聯(lián)

Driver端

創(chuàng)新互聯(lián)科技有限公司專業(yè)互聯(lián)網(wǎng)基礎(chǔ)服務(wù)商,為您提供簡陽服務(wù)器托管,高防服務(wù)器租用,成都IDC機(jī)房托管,成都主機(jī)托管等互聯(lián)網(wǎng)服務(wù)。
  1. Driver端初始化構(gòu)建Accumulator并初始化,同時完成了Accumulator注冊,Accumulators.register(this)時Accumulator會在序列化后發(fā)送到Executor端
  2. Driver接收到ResultTask完成的狀態(tài)更新后,會去更新Value的值 然后在Action操作執(zhí)行后就可以獲取到Accumulator的值了

Executor端

  1. Executor端接收到Task之后會進(jìn)行反序列化操作,反序列化得到RDD和function。同時在反序列化的同時也去反序列化Accumulator(在readObject方法中完成),同時也會向TaskContext完成注冊
  2. 完成任務(wù)計算之后,隨著Task結(jié)果一起返回給Driver

結(jié)合源碼分析

Driver端初始化

??Driver端主要經(jīng)過以下步驟,完成初始化操作:

val accum = sparkContext.accumulator(0, “AccumulatorTest”)
val acc = new Accumulator(initialValue, param, Some(name))
Accumulators.register(this)

Executor端反序列化得到Accumulator

??反序列化是在調(diào)用ResultTask的runTask方式時候做的操作:

// 會反序列化出來RDD和自己定義的function
val (rdd, func) = ser.deserialize[(RDD[T], (TaskContext, Iterator[T]) => U)](
   ByteBuffer.wrap(taskBinary.value), Thread.currentThread.getContextClassLoader)

??在反序列化的過程中,會調(diào)用Accumulable中的readObject方法:

private def readObject(in: ObjectInputStream): Unit = Utils.tryOrIOException {
    in.defaultReadObject()
    // value的初始值為zero;該值是會被序列化的
    value_ = zero
    deserialized = true
    // Automatically register the accumulator when it is deserialized with the task closure.
    //
    // Note internal accumulators sent with task are deserialized before the TaskContext is created
    // and are registered in the TaskContext constructor. Other internal accumulators, such SQL
    // metrics, still need to register here.
    val taskContext = TaskContext.get()
    if (taskContext != null) {
      // 當(dāng)前反序列化所得到的對象會被注冊到TaskContext中
      // 這樣TaskContext就可以獲取到累加器
      // 任務(wù)運行結(jié)束之后,就可以通過context.collectAccumulators()返回給executor
      taskContext.registerAccumulator(this)
    }
  }

注意

Accumulable.scala中的value_,是不會被序列化的,@transient關(guān)鍵詞修飾了

@volatile @transient private var value_ : R = initialValue // Current value on master

累加器在各個節(jié)點的累加操作

針對傳入function中不同的操作,對應(yīng)有不同的調(diào)用方法,以下列舉幾種(在Accumulator.scala中):

def += (term: T) { value_ = param.addAccumulator(value_, term) }
def add(term: T) { value_ = param.addAccumulator(value_, term) }
def ++= (term: R) { value_ = param.addInPlace(value_, term)}

根據(jù)不同的累加器參數(shù),有不同實現(xiàn)的AccumulableParam(在Accumulator.scala中):

trait AccumulableParam[R, T] extends Serializable {
  /**
  def addAccumulator(r: R, t: T): R
  def addInPlace(r1: R, r2: R): R
  def zero(initialValue: R): R
}

不同的實現(xiàn)如下圖所示:
生產(chǎn)常用Spark累加器剖析之二
以IntAccumulatorParam為例:

implicit object IntAccumulatorParam extends AccumulatorParam[Int] {
  def addInPlace(t1: Int, t2: Int): Int = t1 + t2
  def zero(initialValue: Int): Int = 0
}

我們發(fā)現(xiàn)IntAccumulatorParam實現(xiàn)的是trait AccumulatorParam[T]:

trait AccumulatorParam[T] extends AccumulableParam[T, T] {
  def addAccumulator(t1: T, t2: T): T = {
    addInPlace(t1, t2)
  }
}

在各個節(jié)點上的累加操作完成之后,就會緊跟著返回更新之后的Accumulators的value_值

聚合操作

在Task.scala中的run方法,會執(zhí)行如下:

// 返回累加器,并運行task
// 調(diào)用TaskContextImpl的collectAccumulators,返回值的類型為一個Map
(runTask(context), context.collectAccumulators())

在Executor端已經(jīng)完成了一系列操作,需要將它們的值返回到Driver端進(jìn)行聚合匯總,整個順序如圖累加器執(zhí)行流程:
生產(chǎn)常用Spark累加器剖析之二
根據(jù)執(zhí)行流程,我們可以發(fā)現(xiàn),在執(zhí)行完collectAccumulators方法之后,最終會在DAGScheduler中調(diào)用updateAccumulators(event),而在該方法中會調(diào)用Accumulators的add方法,從而完成聚合操作:

def add(values: Map[Long, Any]): Unit = synchronized {
  // 遍歷傳進(jìn)來的值
  for ((id, value) <- values) {
    if (originals.contains(id)) {
      // Since we are now storing weak references, we must check whether the underlying data
      // is valid.
      // 根據(jù)id從注冊的Map中取出對應(yīng)的累加器
      originals(id).get match {
        // 將值給累加起來,最終將結(jié)果加到value里面
       // ++=是被重載了
        case Some(accum) => accum.asInstanceOf[Accumulable[Any, Any]] ++= value
        case None =>
          throw new IllegalAccessError("Attempted to access garbage collected Accumulator.")
      }
    } else {
      logWarning(s"Ignoring accumulator update for unknown accumulator id $id")
    }
  }
}

獲取累加器的值

通過accum.value方法可以獲取到累加器的值

至此,累加器執(zhí)行完畢。

另外有需要云服務(wù)器可以了解下創(chuàng)新互聯(lián)scvps.cn,海內(nèi)外云服務(wù)器15元起步,三天無理由+7*72小時售后在線,公司持有idc許可證,提供“云服務(wù)器、裸金屬服務(wù)器、高防服務(wù)器、香港服務(wù)器、美國服務(wù)器、虛擬主機(jī)、免備案服務(wù)器”等云主機(jī)租用服務(wù)以及企業(yè)上云的綜合解決方案,具有“安全穩(wěn)定、簡單易用、服務(wù)可用性高、性價比高”等特點與優(yōu)勢,專為企業(yè)上云打造定制,能夠滿足用戶豐富、多元化的應(yīng)用場景需求。

網(wǎng)頁標(biāo)題:生產(chǎn)常用Spark累加器剖析之二-創(chuàng)新互聯(lián)
瀏覽地址:http://www.aaarwkj.com/article34/dpgope.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站導(dǎo)航做網(wǎng)站、網(wǎng)站收錄、定制開發(fā)、用戶體驗、網(wǎng)站建設(shè)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站建設(shè)
亚洲精品一区二区三区不卡| 亚洲成人av毛片在线观看| 最新日韩人妻中文字幕一区| 亚洲精品二区在线播放| 欧美日韩在线视频一区| 免费人成在线观看网站免费观看| 欧美专区另类综合日韩| 久久国产精品一二三区| 国产成人大片中文字幕在线| 久久一区二区三区播放| 在线观看高清欧美国产视频| 国产一区二区欧美精品| 久国产亚洲精品久久久极品| 久久成人免费在线电影| 亚洲综合一区二区在线视频| 欧美日韩丝袜一区二区| 日本一本高清免费不卡| 初爱视频教程完整版韩国| 青青草原在线视频伊人| 久草亚洲一区二区三区av| 青青成线在人线免费啪| 久久成人日韩电影午夜| 色吊丝二区三区中文字幕| 欧美人妻精品一区二区| 调教亚洲另类唯美第二页| 精品久久一区麻豆香蕉| 欧美一区二区三区东京热| 中文字幕在线五月婷婷| 成人精品亚洲一区二区| 成年网站在线91九色| 国产丝袜在线精品丝袜不卡| 国产情侣最新地址在线| 中文欧美一区二区精品| 欧美在线免费一级黄片| 久久精品视频视频视频| 欧美黄片精品在线观看| 九九热超在线视频精品| 欧美精品蜜桃激情一区久久| 欧美日韩国产av一区| 欧美日韩免费r在线视频| 久久99精品国产99久久无|