Fabric 1.4源码分析 - chaincode instantiate(8)orderer的排序过程

在instantiate(1)里提到,peer收集完endorsements后创建common.Envelope, 通过grpc发送到/orderer.AtomicBroadcast/Broadcast. common.Envelope的数据结构如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// common.Envelope
{
"Payload": ${common.Payload}.byte,
"Signature": #signer.Sign(${common.Payload}.byte)
}

// common.Payload
{
"Header": ${proposal.Header}, // proposal参考instantiate(2)
"Data": ${peer.Transaction}.byte
}

// peer.Transaction
{
"Actions": taas := make([]*peer.TransactionAction, 1), taas[0] = ${peer.TransactionAction}
}

// peer.TransactionAction
{
"Header": ${proposal.Header.SignatureHeader},
"Payload": ${peer.ChaincodeActionPayload}
}

// peer.ChaincodeActionPayload
{
"ChaincodeProposalPayload": ${peer.ChaincodeProposalPayload}.byte,
"Action": ${peer.ChaincodeEndorsedAction}
}

// peer.ChaincodeProposalPayload
{
"Input": ${proposal.Payload.Input},
"TransientMap": nil
}

// peer.ChaincodeEndorsedAction, response结构参考instantiate(7)
{
"ProposalResponsePayload": ${resps[0].Payload}, // 所有response的payload都应该是一致的,在前面已经校验过
"Endorsements": #endorsements := make([]*peer.Endorsement, len(resps)) // 所有的endorsement
}

全局搜索,orderer端执行protos/orderer/ab.pb.go#_AtomicBroadcast_Broadcast_Handler -> orderer/common/server/server.go#Broadcast -> orderer/common/broadcast/broadcast.go#Handle.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// Handle starts a service thread for a given gRPC connection and services the broadcast connection
func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
for {
msg, err := srv.Recv()

chdr, isConfig, processor, err := bh.sm.BroadcastChannelSupport(msg)
if err != nil {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_BAD_REQUEST, Info: err.Error()})
}

if err = processor.WaitReady(); err != nil {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()})
}

if !isConfig {
configSeq, err := processor.ProcessNormalMsg(msg)
if err != nil {
return srv.Send(&ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()})
}
err = processor.Order(msg, configSeq)
} else { // isConfig
config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
if err != nil {
return srv.Send(&ab.BroadcastResponse{Status: ClassifyError(err), Info: err.Error()})
}
err = processor.Configure(config, configSeq)
}

err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
}
}
  1. for循环里srv.Recv()从grpc stream里接收数据。
  2. bh.sm.BroadcastChannelSupport(msg)检查并返回proposal.header.channelHeader.Type,这里的HeaderType为HeaderType_ENDORSER_TRANSACTION(参考instantiate-2),判断为normalMsg。
  3. processor.WaitReady()。这里分三个实现,solo(orderer/consensus/solo/consensus.go), etcdraft(orderer/consensus/etcdraft/chain.go,V1.4.1版本引入)和kafka(orderer/consensus/kafka/chain.go)。solo方法是orderer单节点,不存在数据交换汇集,不会阻塞,不做任何操作直接返回。而etcdraft,kafka实现是多节点orderer,节点间需要通信。这个后面详述。
  4. configSeq, err := processor.ProcessNormalMsg(msg),返回当前配置的seq(单增),并且策略检查msg。这些策略可以回溯到fabric/orderer/common/multichannel/chainsupport.go#newChainSupport里创建cs.Processor = msgprocessor.NewStandardChannel(cs, msgprocessor.CreateStandardChannelFilters(cs))时初始化。策略fliter的规则可以通过名称大致猜测。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    // ProcessNormalMsg will check the validity of a message based on the current configuration.  It returns the current configuration sequence number and nil on success, or an error if the message is not valid
    func (s *StandardChannel) ProcessNormalMsg(env *cb.Envelope) (configSeq uint64, err error) {
    configSeq = s.support.Sequence()
    err = s.filters.Apply(env)
    return
    }

    // <orderer/common/msgprocessor/standardchannel.go>
    // CreateStandardChannelFilters creates the set of filters for a normal (non-system) chain
    func CreateStandardChannelFilters(filterSupport channelconfig.Resources) *RuleSet {
    ordererConfig, ok := filterSupport.OrdererConfig()
    return NewRuleSet([]Rule{
    EmptyRejectRule, // 校验envelop的payload不为nil
    NewExpirationRejectRule(filterSupport), // 校验payload.Header.SignatureHeade.Creator的x509证书未过期
    NewSizeFilter(ordererConfig), // 校验enveloy的size不能超过配置的batch.AbsoluteMaxBytes,对应于configtx.yaml中的Orderer.BatchSize.AbsoluteMaxBytes(10MB)
    NewSigFilter(policies.ChannelWriters, filterSupport), // 检查policy当前请求拥有当前channel的写权限
    })
    }
  5. 这里简述下solo的方式,kafka后面专题详述。solo把message直接放入ch.sendChan内, solo的相关代码在orderer/consensus/solo/consensus.go
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    // solo: Order accepts normal messages for ordering
    func (ch *chain) Order(env *cb.Envelope, configSeq uint64) error {
    select {
    case ch.sendChan <- &message{
    configSeq: configSeq,
    normalMsg: env,
    }:
    return nil
    case <-ch.exitChan:
    return fmt.Errorf("Exiting")
    }
    }

orderer/consensus/solo/consensus.go:main内处理channle sendChan里的消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
func (ch *chain) main() {
var timer <-chan time.Time
var err error

for {
seq := ch.support.Sequence()
err = nil
select {
case msg := <-ch.sendChan:
if msg.configMsg == nil {
// NormalMsg
if msg.configSeq < seq {
_, err = ch.support.ProcessNormalMsg(msg.normalMsg)
}

batches, _ := ch.support.BlockCutter().Ordered(msg.normalMsg)

for _, batch := range batches {
block := ch.support.CreateNextBlock(batch)
ch.support.WriteBlock(block, nil)
}
} else {
// ConfigMsg
...
}
}
}
}

  1. msg.configSeq < seq判断配置在写入channel和读出channel过程中是否有更改,如果有,则重新执行ProcessNormalMsg用新的规则校验消息是否合法。
  2. batches, _ := ch.support.BlockCutter().Ordered(msg.normalMsg)将消息切片,短小消息合并成一个批次,大消息切分多个批次,使其不超过block规定长度。源码注释有详细介绍。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    // messageBatches length: 0, pending: false
    // - impossible, as we have just received a message
    // messageBatches length: 0, pending: true
    // - no batch is cut and there are messages pending.
    // - 没有切分block,并且加上当前envelop后继续等待添加
    // messageBatches length: 1, pending: false
    // - the message count reaches BatchSize.MaxMessageCount。
    // - 切分block,因为加上当前envelop后达到数量限制,对应configtx.yaml中的Orderer.BatchSize.MaxMessageCount(10)
    // messageBatches length: 1, pending: true
    // - the current message will cause the pending batch size in bytes to exceed BatchSize.PreferredMaxBytes.
    // - 切分block,因为如果加上当前envelop后,所有的batch大小超过配置的configtx.yaml中的Orderer.BatchSize.PreferredMaxBytes(512KB),所以将之前的切分block,而当前的envelop继续等待后续添加(当前envelop大小小于PreferredMaxBytes)
    // messageBatches length: 2, pending: false
    // - the current message size in bytes exceeds BatchSize.PreferredMaxBytes, therefore isolated in its own batch.
    // - 当前envelop大小大于PreferredMaxBytes,将之前的作为第一个block,将当前的作为第二个block(前面已经校验过,envelop大小不会超过AbsoluteMaxBytes(10MB))
    // messageBatches length: 2, pending: true
    // - impossible
  3. CreateNextBlock创建下一个区块,可以看出,header的number(深度)+1,并且附带上前一个区块的hash,并且计算当前区块data数据的哈希,这些构成区块的信息。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    {
    "Header": ${BlockHeader},
    "Data": ${BlockData},
    "Metadata": ${BlockMetadata}
    }

    // BlockHeader
    {
    "Number": ${lastBlock.Header.Number}+1,
    "PreviousHash": #lastBlock.Header.Hash(),
    "DataHash": #${BlockData}.Hash()
    }
    // BlockData
    {
    "Data": #make([][]byte, len(envelop))
    }

    // BlockMetadata
    {
    "Metadata": [][]byte
    }
  4. WriteBlock提交区块。这里会加上block的签名,然后提交。 BlockWriter.support主要有三种实现,file,json(一个区块创建一个文件,以深度命名,写入json格式),ram。这里写入orderer本地,后面通过deleiver服务同步到peer节点真正的提交到各自的本地账本db。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    // WriteBlock should be invoked for blocks which contain normal transactions.
    // It sets the target block as the pending next block, and returns before it is committed.
    // Before returning, it acquires the committing lock, and spawns a go routine which will
    // annotate the block with metadata and signatures, and write the block to the ledger
    // then release the lock. This allows the calling thread to begin assembling the next block
    // before the commit phase is complete.
    func (bw *BlockWriter) WriteBlock(block *cb.Block, encodedMetadataValue []byte) {
    bw.committingBlock.Lock()
    bw.lastBlock = block

    go func() {
    defer bw.committingBlock.Unlock()
    bw.commitBlock(encodedMetadataValue)
    }()
    }

    // commitBlock should only ever be invoked with the bw.committingBlock held
    // this ensures that the encoded config sequence numbers stay in sync
    func (bw *BlockWriter) commitBlock(encodedMetadataValue []byte) {
    // Set the orderer-related metadata field
    // 在BlockMetadata.Metadata内加上key为cb.BlockMetadataIndex_ORDERER的值
    if encodedMetadataValue != nil {
    bw.lastBlock.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
    }
    // 在BlockMetadata.Metadata内加上key为cb.BlockMetadataIndex_SIGNATURES的值
    bw.addBlockSignature(bw.lastBlock)
    // 在BlockMetadata.Metadata内加上key为cb.BlockMetadataIndex_LAST_CONFIG的值
    bw.addLastConfigSignature(bw.lastBlock)

    err := bw.support.Append(bw.lastBlock)
    }

对于configMsg,流程非常相似,processor.ProcessConfigUpdateMsg(msg)相对而言,除了normalMsg的步骤外,还通过提交过来的config和本地存储的现行config比较,计算出readSet和writeSet,校验通过后,签名生成envelope(utils.CreateSignedEnvelope(cb.HeaderType_CONFIG, s.support.ChainID(), s.support.Signer(), configEnvelope, msgVersion, epoch)).而processor.Configure(config, configSeq)则与processor.Order(msg, configSeq)完全一致了,仅是名称不同。

至此,完成了orderer端的工作,这些提交到本地的区块,后续将通过deliver服务同步到peer。具体后面专题介绍。