Fabric 1.4源码分析 - orderer的deliver过程

在orderer端,deliver和broadcast是对等的,最初回溯到protos/orderer/ab.pb.go里的grpc定义,对应的grpc方法是/orderer.AtomicBroadcast/Deliver。对应源码orderer/common/server/server.go#Deliver -> common/deliver/deliver.go#Handle -> deliver.go#deliverBlocks。在deliverBlocks这个方法里,先反序列化cb.Envelope,获取并且校验Header,然后校验access control,判断client的identity证书是否过期,以及策略校验是否拥有读权限(policies.ChannelReaders,代码为orderer/common/server/server.go#Deliver : sf := msgprocessor.NewSigFilter(policies.ChannelReaders, chain))。校验通过后,将envelope反序列化为ab.SeekInfo结构。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
// SeekInfo specifies the range of requested blocks to return
// If the start position is not found, an error is immediately returned
// Otherwise, blocks are returned until a missing block is encountered, then behavior is dictated by the SeekBehavior specified.
// If BLOCK_UNTIL_READY is specified, the reply will block until the requested blocks are available
// if FAIL_IF_NOT_READY is specified, the reply will return an error indicating that the block is not found.
// To request that all blocks be returned indefinitely as they are created, behavior should be set to BLOCK_UNTIL_READY and the stop should be set to specified with a number of MAX_UINT64
type SeekInfo struct {
Start *SeekPosition
Stop *SeekPosition
Behavior SeekInfo_SeekBehavior
}

func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.Envelope) error {

// ... 校验,略

// 根据seekInfo.Stop计算range的终止block number
cursor, number := chain.Reader().Iterator(seekInfo.Start)
defer cursor.Close()
switch stop := seekInfo.Stop.Type.(type) {
case *ab.SeekPosition_Oldest:
stopNum = number
case *ab.SeekPosition_Newest:
stopNum = chain.Reader().Height() - 1
case *ab.SeekPosition_Specified:
stopNum = stop.Specified.Number
}

for {
// 如果range超越当前到chain高度,fail fast
if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY {
if number > chain.Reader().Height()-1 {
return srv.SendStatusResponse(cb.Status_NOT_FOUND)
}
}

iterCh := make(chan struct{})
go func() {
// 读取下一个block
block, status = cursor.Next()
close(iterCh)
}()

select {
case <-ctx.Done(): // "context finished before block retrieved"
return errors.Wrapf(ctx.Err(), "context finished before block retrieved")
case <-erroredChan: // "Aborting deliver for request because of background error"
return srv.SendStatusResponse(cb.Status_SERVICE_UNAVAILABLE)
case <-iterCh:
// Iterator has set the block and status vars,完成读取
}

// increment block number to support FAIL_IF_NOT_READY deliver behavior
number++

// 再次通过权限校验 access control
if err := accessControl.Evaluate(); err != nil {
return srv.SendStatusResponse(cb.Status_FORBIDDEN)
}

// 逐个block发送
if err := srv.SendBlockResponse(block); err != nil {
return err
}

if stopNum == block.Header.Number {
break
}
}

// 全部block发送完成后,再回复当前deliver rpc请求
if err := srv.SendStatusResponse(cb.Status_SUCCESS); err != nil {
return err
}

return nil
}

在配置文件orderer.yaml里指定了General.LedgerType: file(可选有file,json,ram三种模式),这里以file为例分析orderer的本地账本,实现在common/ledger/blockledger/file/impl.gofile/imple.go#Next方法里调用的i.commonIterator.Next(),如注释里所述,将会block直到可以获取到下一个block,因此只要client的请求里seekInfo.stop为最大值MAX_UINT64,SeekInfo_SeekBehavior选择BLOCK_UNTIL_READY,则新产生的block会源源不断的发送到client。(使用标准包的sync.cond),

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
func (fl *FileLedger) Iterator(startPosition *ab.SeekPosition) (blockledger.Iterator, uint64) {
var startingBlockNumber uint64
switch start := startPosition.Type.(type) {
case *ab.SeekPosition_Oldest:
// 结合deliverBlocks里的stopNum,得知SeekPosition_Oldest返回最早的一个block,start=stop=0
startingBlockNumber = 0
case *ab.SeekPosition_Newest:
// 结合deliverBlocks里的stopNum,得知SeekPosition_Newest返回最新的一个block,start=stop=chain.height-1
info, err := fl.blockStore.GetBlockchainInfo()
newestBlockNumber := info.Height - 1
startingBlockNumber = newestBlockNumber
case *ab.SeekPosition_Specified:
startingBlockNumber = start.Specified.Number
height := fl.Height()
if startingBlockNumber > height {
return &blockledger.NotFoundErrorIterator{}, 0
}
default:
return &blockledger.NotFoundErrorIterator{}, 0
}

iterator, err := fl.blockStore.RetrieveBlocks(startingBlockNumber)

return &fileLedgerIterator{ledger: fl, blockNumber: startingBlockNumber, commonIterator: iterator}, startingBlockNumber
}

// Next blocks until there is a new block available, or until Close is called.
// It returns an error if the next block is no longer retrievable.
func (i *fileLedgerIterator) Next() (*cb.Block, cb.Status) {
// 使用标准包sync.cond
result, err := i.commonIterator.Next()
if err != nil {
logger.Error(err)
return nil, cb.Status_SERVICE_UNAVAILABLE
}
// Cover the case where another thread calls Close on the iterator.
if result == nil {
return nil, cb.Status_SERVICE_UNAVAILABLE
}
return result.(*cb.Block), cb.Status_SUCCESS
}

具体跟踪下这个等待对新block的机制,
impl.go#Next : result, err := i.commonIterator.Next()
->fsblkstorage/blocks_itr.go#Next() : itr.waitForBlock(itr.blockNumToRetrieve)
->fsblkstorage/blocks_itr.go#waitForBlock(),这里执行了itr.mgr.cpInfoCond.Wait(),就是等待新block。

而相应的在添加block时,以solo方式为例,
orderer/consensus/solo/consensus.go#main : ch.support.WriteBlock(block, nil)
->orderer/common/multichannel/blockwriter.go#WriteBlock : bw.commitBlock(encodedMetadataValue)
->blockwriter.go#commitBlock : bw.support.Append(bw.lastBlock)
->common/ledger/blockledger/file/impl.go#Append : fl.blockStore.AddBlock(block)
->fsblkstorage/fs_blockstore.go#AddBlock
->fsblkstorage/blockfile_mgr.go#addBlock : mgr.updateCheckpoint(newCPInfo)
->/fsblkstorage/blockfile_mgr.go#updateCheckpoint里,执行了mgr.cpInfoCond.Broadcast(),也就是当写入block后,通知到前面的wait,实现了产生每一个新block,立刻deliver该block的逻辑。