Fabric 1.4源码分析 - orderer的raft实现

Raft可以说是Fabric 1.X系列的首个真正意义的共识算法。Fabric的实现主要涉及到三个类,chain.go <-> etcdraft/node.go <-> raft/node.go, 其中raft/node.go是etcd的开源包,chain.go是实现共识算法的主要类,etcdraft/node.go则是相当于适配模式下的适配器,用于连接两者,对实现屏蔽Raft的具体实现方案。

首先看chain.go的struct(略去部分field),包含etcdraft/node.go对象。调用chain.go#Start方法时(省略),内部调用了etcdraft/node.go#start方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
type Chain struct {
rpc RPC

raftID uint64
channelID string
lastKnownLeader uint64

submitC chan *submit
applyC chan apply
observeC chan<- raft.SoftState // Notifies external observer on leader change (passed in optionally as an argument for tests)
snapC chan *raftpb.Snapshot // Signal to catch up with snapshot
gcC chan *gc // Signal to take snapshot

configInflight bool // this is true when there is config block or ConfChange in flight
blockInflight int // number of in flight blocks

// needed by snapshotting
sizeLimit uint32 // SnapshotIntervalSize in bytes
accDataSize uint32 // accumulative data size since last snapshot
lastSnapBlockNum uint64
confState raftpb.ConfState // Etcdraft requires ConfState to be persisted within snapshot

createPuller CreateBlockPuller // func used to create BlockPuller on demand

// this is exported so that test can use `Node.Status()` to get raft node status.
Node *node
}

// Start instructs the orderer to begin serving the chain and keep it current.
func (c *Chain) Start() {
c.Node.start(c.fresh, isJoin)

// 响应c.gcC channel的信号,进行c.Node.takeSnapshot操作,并且将过期的可配置个数前的消息和snapshot清空
go c.gc()
go c.serveRequest()

//
es := c.newEvictionSuspector()
interval := DefaultLeaderlessCheckInterval
c.periodicChecker.Run()
}

1
2
3
4
5
6
7
8
type node struct {
chainID string
storage *RaftStorage // raft的wal, ram等持久化或者暂存内存实现类
config *raft.Config
rpc RPC // 负责节点间的grpc通信,管理与各个节点的grpc client/stream
chain *Chain
raft.Node // 开源库etcd的raft节点实现
}

etcdraft/node.go#run是其主要逻辑。(以下截取展示部分逻辑)。实际上可以看到,开源库etcd的raft节点实现只管理raft相关的propose, commit, election等过程,而把其他的业务相关留给使用方,包括节点间通信等。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
for {
//// n为来自开源库etcd的raft节点raft.Node,通知消息
case rd := <-n.Ready():
// wal
if err := n.storage.Store(rd.Entries, rd.HardState, rd.Snapshot); err != nil {
n.logger.Panicf("Failed to persist etcd/raft data: %s", err)
}

// 落后和新加入节点需要同步的snapshot
if !raft.IsEmptySnap(rd.Snapshot) {
n.chain.snapC <- &rd.Snapshot
}

// skip empty apply。 来自raft节点的新消息(提议的提交信息rd.CommittedEntries,或者状态变换rd.SoftState,如新leader,节点数量变化等等)
if len(rd.CommittedEntries) != 0 || rd.SoftState != nil {
n.chain.applyC <- apply{rd.CommittedEntries, rd.SoftState}
}

n.Advance()

// TODO(jay_guo) leader can write to disk in parallel with replicating to the followers and them writing to their disks. Check 10.2.1 in thesis
// 调用rpc RPC,交由管理的grpc client发送
n.send(rd.Messages)
}

Orderer消息的入口还是chain.go#Orderchain.go#Configure,实际上最后调用chain.go#Submit,其如注释所说,如果本节点是leader则发送到submitC channel内,否则通过rpc发送到leader节点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// Submit forwards the incoming request to:
// - the local serveRequest goroutine if this is leader
// - the actual leader via the transport mechanism
// The call fails if there's no leader elected yet.
func (c *Chain) Submit(req *orderer.SubmitRequest, sender uint64) error {
leadC := make(chan uint64, 1)
select {
case c.submitC <- &submit{req, leadC}:
lead := <-leadC
if lead != c.raftID {
if err := c.rpc.SendSubmit(lead, req); err != nil {
c.Metrics.ProposalFailures.Add(1)
return err
}
}
}
}

chain.go的运行主体在chain.go#serveRequest,其中主要是select

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
select {
// 来自于`chain.go#Submit`,也就是提交的proposal,这里主要是判断当前是leader才会进行propose。
// 如果当前节点是leader,则调用`consensus.ConsenterSupport#ProcessConfigMsg/ProcessNormalMsg`,然后调用`support.BlockCutter().Ordered`切割batch。
// 对切割后还有pending的消息启动timer,也就是下面的`<-timer.C():`分支,到期后在进行切割。
case s := <-submitC:
// 与orderer的其他实现一致,可参考[Fabric 1.4源码分析 - chaincode instantiate(8)orderer的排序过程]
batches, pending, err := c.ordered(s.req)
if pending {
startTimer() // no-op if timer is already started
} else {
stopTimer()
}

c.propose(propC, bc, batches...)
// 来自上文提到的`etcdraft/node.go#run`,也就是开源库etcd的raft节点raft.Node的通知消息
// 这部分消息可能包含leader的切换,最新的记录在消息的`chain.go/apply/raft.SoftState/Lead`字段,进而相应的`propC, cancelProp = becomeLeader()`或者`becomeFollower()`。
// propC即上面的propose方法的参数,在becomeLeader内处理,即调用`raft.go#Node.Propose`进行propose
case app := <-c.applyC:
// 这里的entries是CommittedEntries,即需要commit的entry,也就是leader当初propose的block
// n.chain.applyC <- apply{rd.CommittedEntries, rd.SoftState}
c.apply(app.entries)
case <-timer.C():
// snapC chan *raftpb.Snapshot // Signal to catch up with snapshot
// 来自于`etcdraft/node.go#run`, select-case的`rd := <-n.Ready():`内`n.chain.snapC <- &rd.Snapshot`,即底层raft的需要同步的snapshot。
// 用于落后或者新加入的节点追上当前的消息状态
case sn := <-c.snapC:
c.catchUp(sn); err != nil
case <-c.doneC:

}

func (c *Chain) propose(ch chan<- *common.Block, bc *blockCreator, batches ...[]*common.Envelope) {
// 如果前面调用的c.ordered(s.req)切割出来的batches非空,则创建一个新block,并且在becomeLeader里的propC里调用c.Node.Propose()
for _, batch := range batches {
b := bc.createNextBlock(batch)
select {
case ch <- b:
default:
}
// if it is config block, then we should wait for the commit of the block
if utils.IsConfigBlock(b) {
c.configInflight = true
}
c.blockInflight++
}
return
}

becomeLeader := func() (chan<- *common.Block,
// Leader should call Propose in go routine, because this method may be blocked
// if node is leaderless (this can happen when leader steps down in a heavily
// loaded network). We need to make sure applyC can still be consumed properly.
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context, ch <-chan *common.Block) {
for {
select {
case b := <-ch:
data := utils.MarshalOrPanic(b)
if err := c.Node.Propose(ctx, data); err != ni
{...}
}
}
}(ctx, ch)

return ch, cancel
}

func (c *Chain) apply(ents []raftpb.Entry) {
var position int
for i := range ents {
switch ents[i].Type {
// 调用writeBlock,将commitEntry里的block写入账本
// accDataSize是累计的block数据大小
case raftpb.EntryNormal:
position = i
c.accDataSize += uint32(len(ents[i].Data))
block := utils.UnmarshalBlockOrPanic(ents[i].Data)
c.writeBlock(block, ents[i].Index)
case raftpb.EntryConfChange:
...

if ents[i].Index > c.appliedIndex {
c.appliedIndex = ents[i].Index
}
}

// accDataSize是累计的block数据大小,大于配置的sizeLimit后,写入c.gcC channel,在`chain.go#gc`内处理c.Node.takeSnapshot
if c.accDataSize >= c.sizeLimit {
b := utils.UnmarshalBlockOrPanic(ents[position].Data)

select {
case c.gcC <- &gc{index: c.appliedIndex, state: c.confState, data: ents[position].Data}:
c.accDataSize = 0
c.lastSnapBlockNum = b.Header.Number
}
}

chain.go#gc
1
2
3
4
5
6
7
8
func (c *Chain) gc() {
for {
select {
case g := <-c.gcC:
c.Node.takeSnapshot(g.index, g.state, g.data)
}
}
}

总体架构流程如上,具体细节可以参考以下,这些参考比较详细都描述了实现细节。

Ref.

Hyperledger-Fabric源码分析(orderer-consensus-etcdraft)
Fabric raft 共识源码浅析