前言 接上篇,这篇文章就开始进入这个项目的核心部分,对,就是有关 paxos 算法本身的部分。说实话写这部分的文章(肯定不止一篇)之前我花了好多时间准备,真是亚历山大。因为这部分内容是在过于庞大,花了好长的时间才大致理出个所以然来。不仅要理解 multi-paxos 本身,还要了解具体的工程实现手段,很大一部分时间还是去阅读与算法本身无关的代码去了。
这篇文章我们直接去看五个核心的类 Instance, Committer, Acceptor, Proposer 与 Learner 。是不是很兴奋?我也是一样,读通只后有着说不出的爽快感,下面直接来看。
全貌 我们先整理一下这五个类之间的关系,刚开始可能会一脸懵逼,paxos 算法的 3 个角色自然是一目了然,Instance 和 Committer 是个什么鬼?先莫急,先上一个五个类的关系图:
通过上面的图我们可以知晓整个 phxpaxos 类分布的全貌:每一个节点的信息由 PNode 类统领,PNode类中包含若干个 Group 实例,这个数量是参数化的,多数的 Group 仅仅是为了高并发,和算法本身无关,这些 Group 实例共享一个网络模块和一个存储模块;Group 与 Instance 实例一一对应,但是 Instance 会不断的刷新擦除,我们称为一轮 paxos 实例;每一个 Instance 包含 paxos 算法的三大角色 Acceptor, Proposer,Learner 以及一个 Committer ,同样的,这四个角色都是可以复用的。
Committer 的作用 Committer 只是一个代理的 Proposer 类,引入它的作用是为了过载保护。在上面两篇文章里我们讲了官方自带的实例 PhxKV ,其中 Put 一个值时我们调用了下面的接口:
1 2 3 4 5 6 7 8 9 10 11 12 int PNode :: Propose(const int iGroupIdx, const std ::string & sValue, \ uint64_t & llInstanceID, SMCtx * poSMCtx) { if (!CheckGroupID(iGroupIdx)) { return Paxos_GroupIdxWrong; } return m_vecGroupList[iGroupIdx]->GetCommitter() \ ->NewValueGetID(sValue, llInstanceID, poSMCtx); }
这个 Propose 是 PNode 类的一个公共接口,它的命名十分容易让人混淆,我想很多人在阅读 phxpaxos 看不下去多半就是因为这个接口。事实上它和 paxos 算法并没有多大联系,只是一个代理,那么谁是它的接线人呢?我们直接来看 Committer 的 NewValueGetID 函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 int Committer :: NewValueGetID(const std ::string & sValue, uint64_t & llInstanceID, \ SMCtx * poSMCtx) { BP->GetCommiterBP()->NewValue(); int iRetryCount = 3 ; int ret = PaxosTryCommitRet_OK; while (iRetryCount--) { TimeStat oTimeStat; oTimeStat.Point(); ret = NewValueGetIDNoRetry(sValue, llInstanceID, poSMCtx); if (ret != PaxosTryCommitRet_Conflict) { .... } .... } return ret; }
可以看出尝试去 commit 也就是 propose 一个新值有 3 次机会,每次调用 NewValueGetIDNoRetry 接口,那么就看它的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 int Committer :: NewValueGetIDNoRetry(const std ::string & sValue, \ uint64_t & llInstanceID, SMCtx * poSMCtx) { LogStatus(); int iLockUseTimeMs = 0 ; bool bHasLock = m_oWaitLock.Lock(m_iTimeoutMs, iLockUseTimeMs); if (!bHasLock) { if (iLockUseTimeMs > 0 ) { PLGErr("Try get lock, but timeout, lockusetime %dms" , iLockUseTimeMs); return PaxosTryCommitRet_Timeout; } else { PLGErr("Try get lock, but too many thread waiting, reject" ); return PaxosTryCommitRet_TooManyThreadWaiting_Reject; } } int iLeftTimeoutMs = -1 ; if (m_iTimeoutMs > 0 ) { iLeftTimeoutMs = m_iTimeoutMs > iLockUseTimeMs ? m_iTimeoutMs - iLockUseTimeMs : 0 ; if (iLeftTimeoutMs < 200 ) { PLGErr("Get lock ok, but lockusetime %dms too long, lefttimeout %dms" , \ iLockUseTimeMs, iLeftTimeoutMs); m_oWaitLock.UnLock(); return PaxosTryCommitRet_Timeout; } } PLGImp("GetLock ok, use time %dms" , iLockUseTimeMs); int iSMID = poSMCtx != nullptr ? poSMCtx->m_iSMID : 0 ; string sPackSMIDValue = sValue; m_poSMFac->PackPaxosValue(sPackSMIDValue, iSMID); m_poCommitCtx->NewCommit(&sPackSMIDValue, poSMCtx, iLeftTimeoutMs); m_poIOLoop->AddNotify(); int ret = m_poCommitCtx->GetResult(llInstanceID); m_oWaitLock.UnLock(); return ret; }
这里很明显了,在接受外界的 Propose 调用时,phxpaxos 将尝试去获取 m_oWaitLock 锁,只有拿到这把锁的线程才能真正的去刷新 Committer ,而拿不到的线程只能老老实实的排队去等待,如果超时则放弃,这就是 phxpaxos 所谓的过载保护机制。
IOLoop 到这里了,还是没有说什么时候才开始真正的 propose 呢?答案就是上图的 IOLoop 类中,这个 IOLoop 类中包含有 2 个消息队列 m_oMessageQueue 和 m_oRetryQueue , proposer , acceptor , leaner 角色产生的所有的消息全部都会扔到这个 m_oMessageQueue 中。IOLoop 会循环调用 OneLoop 接口做消息的处理,有的消息是需要重复去处理的,我们会将它们扔进 m_oRetryQueue 中去。让我们看一下 OneLoop 的代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 void IOLoop :: OneLoop(const int iTimeoutMs) { std ::string * psMessage = nullptr ; m_oMessageQueue.lock(); bool bSucc = m_oMessageQueue.peek(psMessage, iTimeoutMs); if (!bSucc) { m_oMessageQueue.unlock(); } else { m_oMessageQueue.pop(); m_oMessageQueue.unlock(); if (psMessage != nullptr && psMessage->size() > 0 ) { m_iQueueMemSize -= psMessage->size(); m_poInstance->OnReceive(*psMessage); } delete psMessage; } DealWithRetry(); m_poInstance->CheckNewValue(); }
消息处理 我们总算找到了 paxos 消息处理的总入口「OnReceive」,每次我们从 IOLoop 的消息队列中取出一条消息就去调用这个处理接口,这个接口的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 void Instance :: OnReceive(const std ::string & sBuffer) { .... if (iCmd == MsgCmd_PaxosMsg) { .... OnReceivePaxosMsg(oPaxosMsg); } else if (iCmd == MsgCmd_CheckpointMsg) { .... OnReceiveCheckpointMsg(oCheckpointMsg); } }
令人失望的是这同样是个皮包函数,根据消息的 cmd 类型,分为两路处理,由于「checkpoint」机制比较复杂,我们先不讨论,直接看 paxos 消息的处理接口「OnReceivePaxosMsg」
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 int Instance :: OnReceivePaxosMsg(const PaxosMsg & oPaxosMsg, const bool bIsRetry) { .... if (oPaxosMsg.msgtype() == MsgType_PaxosPrepareReply || oPaxosMsg.msgtype() == MsgType_PaxosAcceptReply || oPaxosMsg.msgtype() == MsgType_PaxosProposal_SendNewValue) { .... return ReceiveMsgForProposer(oPaxosMsg); } else if (oPaxosMsg.msgtype() == MsgType_PaxosPrepare || oPaxosMsg.msgtype() == MsgType_PaxosAccept) { .... return ReceiveMsgForAcceptor(oPaxosMsg, bIsRetry); } else if (oPaxosMsg.msgtype() == MsgType_PaxosLearner_AskforLearn || oPaxosMsg.msgtype() == MsgType_PaxosLearner_SendLearnValue || oPaxosMsg.msgtype() == MsgType_PaxosLearner_ProposerSendSuccess || oPaxosMsg.msgtype() == MsgType_PaxosLearner_ComfirmAskforLearn || oPaxosMsg.msgtype() == MsgType_PaxosLearner_SendNowInstanceID || oPaxosMsg.msgtype() == MsgType_PaxosLearner_SendLearnValue_Ack || oPaxosMsg.msgtype() == MsgType_PaxosLearner_AskforCheckpoint) { .... return ReceiveMsgForLearner(oPaxosMsg); } else { PLGErr("Invaid msgtype %d" , oPaxosMsg.msgtype()); } return 0 ; }
哈哈,这个接口可能会令很多人吓一跳,我们认为 paxos 里面最简单的 learner 角色的处理居然是最复杂的,这和 multi-paxos 的工程优化有关系,如果完全不考虑效率的话,我们完全不需要设计的这么复杂。
总结 对于外界的请求,phxpaxos 直接将责任扔给了 Committer 类并做了过载保护,所有角色的动作并不做同步处理,而是全部扔进两个消息队列中做异步处理。我们还发现了消息处理的总入口,并看到了一个有趣的现象,在实际工程的设计中,learner 有着相当复杂的设计,本质原因是工程项目都是以效率为先,而不是单纯地结果论。
下一篇我们直接来分析 proposer 和 acceptor 的处理消息的具体流程,即「multi-paxos」算法本身,敬请期待。
项目链接 我将源码分析工作的注释同步更新到了 github 的项目中,下面是项目链接:
https://github.com/chenneal/phxpaxos-annotated.git
欢迎大家 star 。
文章列表 phxpaxos源码阅读之一:走马观花
phxpaxos源码阅读之二:粮草先行
phxpaxos源码阅读之三:粉墨登场
phxpaxos源码阅读之四:各个击破
phxpaxos源码阅读之五:暗度陈仓
phxpaxos源码阅读之六:完结篇