submitC chan *submit applyC chan apply observeC chan<- raft.SoftState // Notifies external observer on leader change (passed in optionally as an argument for tests) snapC chan *raftpb.Snapshot // Signal to catch up with snapshot gcC chan *gc // Signal to take snapshot
configInflight bool// this is true when there is config block or ConfChange in flight blockInflight int// number of in flight blocks
// needed by snapshotting sizeLimit uint32// SnapshotIntervalSize in bytes accDataSize uint32// accumulative data size since last snapshot lastSnapBlockNum uint64 confState raftpb.ConfState // Etcdraft requires ConfState to be persisted within snapshot
createPuller CreateBlockPuller // func used to create BlockPuller on demand
// this is exported so that test can use `Node.Status()` to get raft node status. Node *node }
// Start instructs the orderer to begin serving the chain and keep it current. func(c *Chain)Start() { c.Node.start(c.fresh, isJoin)
// 响应c.gcC channel的信号,进行c.Node.takeSnapshot操作,并且将过期的可配置个数前的消息和snapshot清空 go c.gc() go c.serveRequest()
// es := c.newEvictionSuspector() interval := DefaultLeaderlessCheckInterval c.periodicChecker.Run() }
// TODO(jay_guo) leader can write to disk in parallel with replicating to the followers and them writing to their disks. Check 10.2.1 in thesis // 调用rpc RPC,交由管理的grpc client发送 n.send(rd.Messages) }
// Submit forwards the incoming request to: // - the local serveRequest goroutine if this is leader // - the actual leader via the transport mechanism // The call fails if there's no leader elected yet. func(c *Chain)Submit(req *orderer.SubmitRequest, sender uint64)error { leadC := make(chanuint64, 1) select { case c.submitC <- &submit{req, leadC}: lead := <-leadC if lead != c.raftID { if err := c.rpc.SendSubmit(lead, req); err != nil { c.Metrics.ProposalFailures.Add(1) return err } } } }
select { // 来自于`chain.go#Submit`,也就是提交的proposal,这里主要是判断当前是leader才会进行propose。 // 如果当前节点是leader,则调用`consensus.ConsenterSupport#ProcessConfigMsg/ProcessNormalMsg`,然后调用`support.BlockCutter().Ordered`切割batch。 // 对切割后还有pending的消息启动timer,也就是下面的`<-timer.C():`分支,到期后在进行切割。 case s := <-submitC: // 与orderer的其他实现一致,可参考[Fabric 1.4源码分析 - chaincode instantiate(8)orderer的排序过程] batches, pending, err := c.ordered(s.req) if pending { startTimer() // no-op if timer is already started } else { stopTimer() }
c.propose(propC, bc, batches...) // 来自上文提到的`etcdraft/node.go#run`,也就是开源库etcd的raft节点raft.Node的通知消息 // 这部分消息可能包含leader的切换,最新的记录在消息的`chain.go/apply/raft.SoftState/Lead`字段,进而相应的`propC, cancelProp = becomeLeader()`或者`becomeFollower()`。 // propC即上面的propose方法的参数,在becomeLeader内处理,即调用`raft.go#Node.Propose`进行propose case app := <-c.applyC: // 这里的entries是CommittedEntries,即需要commit的entry,也就是leader当初propose的block // n.chain.applyC <- apply{rd.CommittedEntries, rd.SoftState} c.apply(app.entries) case <-timer.C(): // snapC chan *raftpb.Snapshot // Signal to catch up with snapshot // 来自于`etcdraft/node.go#run`, select-case的`rd := <-n.Ready():`内`n.chain.snapC <- &rd.Snapshot`,即底层raft的需要同步的snapshot。 // 用于落后或者新加入的节点追上当前的消息状态 case sn := <-c.snapC: c.catchUp(sn); err != nil case <-c.doneC:
}
func(c *Chain)propose(ch chan<- *common.Block, bc *blockCreator, batches ...[]*common.Envelope) { // 如果前面调用的c.ordered(s.req)切割出来的batches非空,则创建一个新block,并且在becomeLeader里的propC里调用c.Node.Propose() for _, batch := range batches { b := bc.createNextBlock(batch) select { case ch <- b: default: } // if it is config block, then we should wait for the commit of the block if utils.IsConfigBlock(b) { c.configInflight = true } c.blockInflight++ } return }
becomeLeader := func()(chan<- *common.Block, // Leader should call Propose in go routine, because this method may be blocked // if node is leaderless (this can happen when leader steps down in a heavily // loaded network). WeneedtomakesureapplyCcanstillbeconsumedproperly. ctx, cancel := context.WithCancel(context.Background()) gofunc(ctx context.Context, ch <-chan *common.Block) { for { select { case b := <-ch: data := utils.MarshalOrPanic(b) if err := c.Node.Propose(ctx, data); err != ni {...} } } }(ctx, ch)
return ch, cancel }
func(c *Chain)apply(ents []raftpb.Entry) { var position int for i := range ents { switch ents[i].Type { // 调用writeBlock,将commitEntry里的block写入账本 // accDataSize是累计的block数据大小 case raftpb.EntryNormal: position = i c.accDataSize += uint32(len(ents[i].Data)) block := utils.UnmarshalBlockOrPanic(ents[i].Data) c.writeBlock(block, ents[i].Index) case raftpb.EntryConfChange: ... if ents[i].Index > c.appliedIndex { c.appliedIndex = ents[i].Index } }
// accDataSize是累计的block数据大小,大于配置的sizeLimit后,写入c.gcC channel,在`chain.go#gc`内处理c.Node.takeSnapshot if c.accDataSize >= c.sizeLimit { b := utils.UnmarshalBlockOrPanic(ents[position].Data)