欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

如何實現(xiàn)cephSimpleMessenger模塊消息的接收

小編給大家分享一下如何實現(xiàn)ceph SimpleMessenger模塊消息的接收,希望大家閱讀完這篇文章之后都有所收獲,下面讓我們一起去探討吧!

創(chuàng)新互聯(lián)自2013年起,公司以成都網(wǎng)站設(shè)計、成都網(wǎng)站建設(shè)、系統(tǒng)開發(fā)、網(wǎng)絡(luò)推廣、文化傳媒、企業(yè)宣傳、平面廣告設(shè)計等為主要業(yè)務(wù),適用行業(yè)近百種。服務(wù)企業(yè)客戶千余家,涉及國內(nèi)多個省份客戶。擁有多年網(wǎng)站建設(shè)開發(fā)經(jīng)驗。為企業(yè)提供專業(yè)的網(wǎng)站建設(shè)、創(chuàng)意設(shè)計、宣傳推廣等服務(wù)。 通過專業(yè)的設(shè)計、獨特的風(fēng)格,為不同客戶提供各種風(fēng)格的特色服務(wù)。

OSD服務(wù)端消息的接收起始于OSD::init()中的messenger::add_dispatcher_head(Dispatcher *d)函數(shù)

|-   358   void add_dispatcher_head(Dispatcher *d) {
||   359     bool first = dispatchers.empty();
||   360     dispatchers.push_front(d);
||   361     if (d->ms_can_fast_dispatch_any())
||   362       fast_dispatchers.push_front(d);
||   363     if (first)
||   364       ready();     //如果dispatcher list空,啟動SimpleMessenger::ready,不為空證明SimpleMessenger已經(jīng)啟動了
||   365   }

在SimpleMessenger::ready()中,啟動DispatchQueue等待mqueue,如果綁定了端口就啟動 accepter接收線程

      76 void SimpleMessenger::ready()
-     77 {
|     78   ldout(cct,10) << "ready " << get_myaddr() << dendl;
|     79   dispatch_queue.start();   //啟動DispatchQueue,等待mqueue
|     80 
|     81   lock.Lock();
|     82   if (did_bind)
|     83     accepter.start();
|     84   lock.Unlock();
|     85 }

Accepter是Thread的繼承類,Accepter::start()最終調(diào)用Accepter::entry(),在entry中 accept并把接收到的sd加入到Pipe類中

void *Accepter::entry()
{
  ...
  struct pollfd pfd;
  pfd.fd = listen_sd;
  pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
  while (!done) {

    int r = poll(&pfd, 1, -1);

    if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP))
      break;

    // accept
    entity_addr_t addr;
    socklen_t slen = sizeof(addr.ss_addr());
    int sd = ::accept(listen_sd, (sockaddr*)&addr.ss_addr(), &slen);
    if (sd >= 0) {
      errors = 0;
      ldout(msgr->cct,10) << "accepted incoming on sd " << sd << dendl;
      
      msgr->add_accept_pipe(sd);     //注冊一個pipe,啟動讀線程,從該sd中讀取數(shù)據(jù)
    } else {
      ldout(msgr->cct,0) << "accepter no incoming connection?  sd = " << sd
	      << " errno " << errno << " " << cpp_strerror(errno) << dendl;
      if (++errors > 4)
	break;
    }
  }

   ...
  return 0;

在SimpleMessenger::add_accept_pipe(int sd)中,申請一個Pipe類并把sd加入到Pipe中,開始Pipe::start_reader()

     340 Pipe *SimpleMessenger::add_accept_pipe(int sd)
-    341 {   
|    342   lock.Lock();
|    343   Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL);
|    344   p->sd = sd;
|    345   p->pipe_lock.Lock();
|    346   p->start_reader();
|    347   p->pipe_lock.Unlock();
|    348   pipes.insert(p);
|    349   accepting_pipes.insert(p);
|    350   lock.Unlock();
|    351   return p;
|    352 }

Pipe類內(nèi)部有一個Reader和Writer線程類,Pipe::start_reader()啟動Pipe::Reader::entry(),最終啟動Pipe::reader函數(shù)

      134 void Pipe::start_reader()
-     135 {
|     136   assert(pipe_lock.is_locked());
|     137   assert(!reader_running);
|-    138   if (reader_needs_join) {
||    139     reader_thread.join();
||    140     reader_needs_join = false;
||    141   }
|     142   reader_running = true;
|     143   reader_thread.create("ms_pipe_read", msgr->cct->_conf->ms_rwthread_stack_bytes);
|     144 }
|-    48     class Reader : public Thread {
||    49       Pipe *pipe;
||    50     public:
||    51       explicit Reader(Pipe *p) : pipe(p) {}
||    52       void *entry() { pipe->reader(); return 0; }
||    53     } reader_thread;

在Pipe::reader函數(shù)中根據(jù)tag接收不同類型的消息,如果是CEPH_MSGR_TAG_MSG類型消息調(diào)用read_message接收消息,并把消息加入到mqueue中

void Pipe::reader()
{
  pipe_lock.Lock();

  if (state == STATE_ACCEPTING) {
    accept();      //第一次進入此函數(shù)處理
    assert(pipe_lock.is_locked());
  }

  // loop.
  while (state != STATE_CLOSED &&
	 state != STATE_CONNECTING) {
    assert(pipe_lock.is_locked());

   ......
   ......

    else if (tag == CEPH_MSGR_TAG_MSG) {
      ldout(msgr->cct,20) << "reader got MSG" << dendl;
      Message *m = 0;
      int r = read_message(&m, auth_handler.get());

      pipe_lock.Lock();
      
      if (!m) {
	if (r < 0)
	  fault(true);
	continue;
      }
     ......
     ......
     ......
      // note last received message.
      in_seq = m->get_seq();

      cond.Signal();  // wake up writer, to ack this
      
      ldout(msgr->cct,10) << "reader got message "
	       << m->get_seq() << " " << m << " " << *m
	       << dendl;
      in_q->fast_preprocess(m);       //mds 、mon不會進入此函數(shù),預(yù)處理

      if (delay_thread) {
        utime_t release;
        if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0) {
          release = m->get_recv_stamp();
          release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
          lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
        }
        delay_thread->queue(release, m);
      } else {
        if (in_q->can_fast_dispatch(m)) {
	  reader_dispatching = true;
          pipe_lock.Unlock();
          in_q->fast_dispatch(m);
          pipe_lock.Lock();
	  reader_dispatching = false;
	  if (state == STATE_CLOSED ||
	      notify_on_dispatch_done) { // there might be somebody waiting
	    notify_on_dispatch_done = false;
	    cond.Signal();
	  }
        } else {           //mds進入此else
          in_q->enqueue(m, m->get_priority(), conn_id);      //把接收到的messenger加入到mqueue中
        }
      }
    }
    ......
    ......
  }

 
  // reap?
  reader_running = false;
  reader_needs_join = true;
  unlock_maybe_reap();
  ldout(msgr->cct,10) << "reader done" << dendl;
}

在Pipe::DispatchQueue::enqueue函數(shù)中加入到mqueue中

void DispatchQueue::enqueue(Message *m, int priority, uint64_t id)
{

  Mutex::Locker l(lock);
  ldout(cct,20) << "queue " << m << " prio " << priority << dendl;
  add_arrival(m);
  if (priority >= CEPH_MSG_PRIO_LOW) {
    mqueue.enqueue_strict(
        id, priority, QueueItem(m));
  } else {
    mqueue.enqueue(
        id, priority, m->get_cost(), QueueItem(m));
  }
  cond.Signal();    //喚醒dispatch_queue.start() 啟動的dispatchThread,進入entry進行處理
}

看完了這篇文章,相信你對“如何實現(xiàn)ceph SimpleMessenger模塊消息的接收”有了一定的了解,如果想了解更多相關(guān)知識,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,感謝各位的閱讀!

當(dāng)前文章:如何實現(xiàn)cephSimpleMessenger模塊消息的接收
網(wǎng)站鏈接:http://www.aaarwkj.com/article2/gipjoc.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供網(wǎng)站排名手機網(wǎng)站建設(shè)、網(wǎng)站設(shè)計、網(wǎng)站建設(shè)、商城網(wǎng)站軟件開發(fā)

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

網(wǎng)站優(yōu)化排名
日韩精品视频播放一区 | 极品人妻少妇精品一区二区| 日韩欧美亚洲一区二区| 亚洲av色国产精品色午含羞草| 日韩欧美国产麻豆一区精品| 欧美日韩精品一区二区视频永久免| 中文字幕二区三区av| 精品亚洲天堂一区二区三区| 欧美日本国产老熟女视频| 国产亚洲高清一区二区| 久久精品有码视频免费观看| 精品亚洲美无人区乱码| 国产精品综合久久久久久| 国产传媒剧情剧资源网站| 国产精品久久护士96| 日本五十路亲子在线一区| 国产亚洲精品女人久久久| 欧美日韩亚洲精品亚洲欧洲| 中文字幕日产乱码一二三区| 一区二区在线视频免费播放| 青青草国产成人自拍视频在线观看| 欧美亚洲国产日韩另类| 中文乱幕亚洲无套内射| 少妇激情一区二区三区免费视频| 日韩成人中文字幕在线视频| 在线中文字幕日韩精品| 国产亚洲精品精品国产亚洲 | 国产亚洲精品视频在线网| 国产精品重口调教系列| 久久精品久久精品欧美大片| 国产区二区三区在线视频| 精品人妻少妇一区二区三区| 国产乱一伦一性一情一色| 亚洲一区二区三区女同| 人人爽人人妻人人澡| 亚洲午夜福利啪啪啪| av在线成人国产精品欧美| 国产成人精品免费视频大| 熟女aaa一区二区午夜| av免费观看男人的天堂| 日本一区二区三区高清在线|