在instantiate(1)里提到,peer收集完endorsements后创建common.Envelope, 通过grpc发送到/orderer.AtomicBroadcast/Broadcast
. common.Envelope的数据结构如下
1 | // common.Envelope |
全局搜索,orderer端执行protos/orderer/ab.pb.go#_AtomicBroadcast_Broadcast_Handler
-> orderer/common/server/server.go#Broadcast
-> orderer/common/broadcast/broadcast.go#Handle
.
1 | // Handle starts a service thread for a given gRPC connection and services the broadcast connection |
for
循环里srv.Recv()
从grpc stream里接收数据。bh.sm.BroadcastChannelSupport(msg)
检查并返回proposal.header.channelHeader.Type
,这里的HeaderType为HeaderType_ENDORSER_TRANSACTION
(参考instantiate-2),判断为normalMsg。processor.WaitReady()
。这里分三个实现,solo(orderer/consensus/solo/consensus.go), etcdraft(orderer/consensus/etcdraft/chain.go,V1.4.1版本引入)和kafka(orderer/consensus/kafka/chain.go)。solo方法是orderer单节点,不存在数据交换汇集,不会阻塞,不做任何操作直接返回。而etcdraft,kafka实现是多节点orderer,节点间需要通信。这个后面详述。configSeq, err := processor.ProcessNormalMsg(msg)
,返回当前配置的seq(单增),并且策略检查msg。这些策略可以回溯到fabric/orderer/common/multichannel/chainsupport.go#newChainSupport
里创建cs.Processor = msgprocessor.NewStandardChannel(cs, msgprocessor.CreateStandardChannelFilters(cs))
时初始化。策略fliter的规则可以通过名称大致猜测。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18// ProcessNormalMsg will check the validity of a message based on the current configuration. It returns the current configuration sequence number and nil on success, or an error if the message is not valid
func (s *StandardChannel) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
configSeq = s.support.Sequence()
err = s.filters.Apply(env)
return
}
// <orderer/common/msgprocessor/standardchannel.go>
// CreateStandardChannelFilters creates the set of filters for a normal (non-system) chain
func CreateStandardChannelFilters(filterSupport channelconfig.Resources) *RuleSet {
ordererConfig, ok := filterSupport.OrdererConfig()
return NewRuleSet([]Rule{
EmptyRejectRule, // 校验envelop的payload不为nil
NewExpirationRejectRule(filterSupport), // 校验payload.Header.SignatureHeade.Creator的x509证书未过期
NewSizeFilter(ordererConfig), // 校验enveloy的size不能超过配置的batch.AbsoluteMaxBytes,对应于configtx.yaml中的Orderer.BatchSize.AbsoluteMaxBytes(10MB)
NewSigFilter(policies.ChannelWriters, filterSupport), // 检查policy当前请求拥有当前channel的写权限
})
}- 这里简述下solo的方式,kafka后面专题详述。solo把message直接放入
ch.sendChan
内, solo的相关代码在orderer/consensus/solo/consensus.go
1
2
3
4
5
6
7
8
9
10
11
12// solo: Order accepts normal messages for ordering
func (ch *chain) Order(env *cb.Envelope, configSeq uint64) error {
select {
case ch.sendChan <- &message{
configSeq: configSeq,
normalMsg: env,
}:
return nil
case <-ch.exitChan:
return fmt.Errorf("Exiting")
}
}
在orderer/consensus/solo/consensus.go:main
内处理channle sendChan里的消息
1 | func (ch *chain) main() { |
msg.configSeq < seq
判断配置在写入channel和读出channel过程中是否有更改,如果有,则重新执行ProcessNormalMsg
用新的规则校验消息是否合法。batches, _ := ch.support.BlockCutter().Ordered(msg.normalMsg)
将消息切片,短小消息合并成一个批次,大消息切分多个批次,使其不超过block规定长度。源码注释有详细介绍。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16// messageBatches length: 0, pending: false
// - impossible, as we have just received a message
// messageBatches length: 0, pending: true
// - no batch is cut and there are messages pending.
// - 没有切分block,并且加上当前envelop后继续等待添加
// messageBatches length: 1, pending: false
// - the message count reaches BatchSize.MaxMessageCount。
// - 切分block,因为加上当前envelop后达到数量限制,对应configtx.yaml中的Orderer.BatchSize.MaxMessageCount(10)
// messageBatches length: 1, pending: true
// - the current message will cause the pending batch size in bytes to exceed BatchSize.PreferredMaxBytes.
// - 切分block,因为如果加上当前envelop后,所有的batch大小超过配置的configtx.yaml中的Orderer.BatchSize.PreferredMaxBytes(512KB),所以将之前的切分block,而当前的envelop继续等待后续添加(当前envelop大小小于PreferredMaxBytes)
// messageBatches length: 2, pending: false
// - the current message size in bytes exceeds BatchSize.PreferredMaxBytes, therefore isolated in its own batch.
// - 当前envelop大小大于PreferredMaxBytes,将之前的作为第一个block,将当前的作为第二个block(前面已经校验过,envelop大小不会超过AbsoluteMaxBytes(10MB))
// messageBatches length: 2, pending: true
// - impossibleCreateNextBlock
创建下一个区块,可以看出,header的number(深度)+1,并且附带上前一个区块的hash,并且计算当前区块data数据的哈希,这些构成区块的信息。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21{
"Header": ${BlockHeader},
"Data": ${BlockData},
"Metadata": ${BlockMetadata}
}
// BlockHeader
{
"Number": ${lastBlock.Header.Number}+1,
"PreviousHash": #lastBlock.Header.Hash(),
"DataHash": #${BlockData}.Hash()
}
// BlockData
{
"Data": #make([][]byte, len(envelop))
}
// BlockMetadata
{
"Metadata": [][]byte
}WriteBlock
提交区块。这里会加上block的签名,然后提交。BlockWriter.support
主要有三种实现,file,json(一个区块创建一个文件,以深度命名,写入json格式),ram。这里写入orderer本地,后面通过deleiver服务同步到peer节点真正的提交到各自的本地账本db。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31// WriteBlock should be invoked for blocks which contain normal transactions.
// It sets the target block as the pending next block, and returns before it is committed.
// Before returning, it acquires the committing lock, and spawns a go routine which will
// annotate the block with metadata and signatures, and write the block to the ledger
// then release the lock. This allows the calling thread to begin assembling the next block
// before the commit phase is complete.
func (bw *BlockWriter) WriteBlock(block *cb.Block, encodedMetadataValue []byte) {
bw.committingBlock.Lock()
bw.lastBlock = block
go func() {
defer bw.committingBlock.Unlock()
bw.commitBlock(encodedMetadataValue)
}()
}
// commitBlock should only ever be invoked with the bw.committingBlock held
// this ensures that the encoded config sequence numbers stay in sync
func (bw *BlockWriter) commitBlock(encodedMetadataValue []byte) {
// Set the orderer-related metadata field
// 在BlockMetadata.Metadata内加上key为cb.BlockMetadataIndex_ORDERER的值
if encodedMetadataValue != nil {
bw.lastBlock.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
}
// 在BlockMetadata.Metadata内加上key为cb.BlockMetadataIndex_SIGNATURES的值
bw.addBlockSignature(bw.lastBlock)
// 在BlockMetadata.Metadata内加上key为cb.BlockMetadataIndex_LAST_CONFIG的值
bw.addLastConfigSignature(bw.lastBlock)
err := bw.support.Append(bw.lastBlock)
}
对于configMsg,流程非常相似,processor.ProcessConfigUpdateMsg(msg)
相对而言,除了normalMsg的步骤外,还通过提交过来的config和本地存储的现行config比较,计算出readSet和writeSet,校验通过后,签名生成envelope(utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, s.support.ChainID(), s.support.Signer(), configEnvelope, msgVersion, epoch)
).而processor.Configure(config, configSeq)
则与processor.Order(msg, configSeq)
完全一致了,仅是名称不同。
至此,完成了orderer端的工作,这些提交到本地的区块,后续将通过deliver服务同步到peer。具体后面专题介绍。