// Group 的数量,之前提过,只是为了并发。 //this groupcount means run paxos group count. //every paxos group is independent, there are //no any communicate between any 2 paxos group. oOptions.iGroupCount = m_iGroupCount;
// 这里有个 typo state 。 //because all group share state machine(kv), so every group have same sate machine. //just for split key to different paxos group, to upgrate performance. for (int iGroupIdx = 0; iGroupIdx < m_iGroupCount; iGroupIdx++) { // 这里的意思是每个 Group 并发执行,但是需要共享状态机, // 这里的状态机指的就是 KV 数据库,那么如何并发 ? 很容易 // 每个 Group 只负责属于它管辖的数据,用 Hash 散一下就行。 GroupSMInfo oSMInfo; oSMInfo.iGroupIdx = iGroupIdx; oSMInfo.vecSMList.push_back(&m_oPhxKVSM); oSMInfo.bIsUseMaster = true;
oOptions.vecGroupSMInfoList.push_back(oSMInfo); }
//set logfunc oOptions.pLogFunc = LOGGER->GetLogFunc(); // 不用讲了,这个 RunNode 负责整个节点的启动工作。 int ret = Node::RunNode(oOptions, m_poPaxosNode); if (ret != 0) { PLErr("run paxos fail, ret %d", ret); return ret; }
// 很多结构的初始化工作都是在这个函数里面完成的 int ret = poRealNode->Init(oOptions, poNetWork); if (ret != 0) { delete poRealNode; return ret; }
// 网络结构体指向上面刚刚new的 PNode 对象,以便正确回调。 // 初始化工作实在上面的 Init 函数里完成的。 //step1 set node to network //very important, let network on recieve callback can work. poNetWork->m_poNode = poRealNode;
// 启动网络服务,这样 phxpaxos 在做算法的各种 rpc 通信时就通过 // 这个网络服务传递消息与 log 。 //step2 run network. //start recieve message from network, so all must init before this step. //must be the last step. poNetWork->RunNetWork();
// 启动了每个 group 的初始化服务,注释写了并行,我想是因为每 // 个线程不需要等到上一个线程启动完毕就开始启动工作了。 //step7 parallel init group for (auto & poGroup : m_vecGroupList) { poGroup->StartInit(); }
// 由于是并行的初始化,需要 join 来等待全部初始化完毕。 for (auto & poGroup : m_vecGroupList) { int initret = poGroup->GetInitRet(); if (initret != 0) { ret = initret; } }
if (ret != 0) { return ret; }
//last step. must init ok, then should start threads.because that stop //threads is slower, if init fail, we need much time to stop many threads. //so we put start threads in the last step. for (auto & poGroup : m_vecGroupList) { //start group's thread first. poGroup->Start(); } // 这个 Options 里有是否启动 master 服务的选项。 RunMaster(oOptions); // 成组提交。 RunProposeBatch();