Fabric 1.4源码分析 - orderer的kafka实现

在instantiate(8)里提到,orderer总共有solo和kafka,etcdraft三种方式,并且介绍了单orderer节点的solo方式。这里继续补充介绍多orderer节点下采用的kafka方式。kafka实现的代码放在orderer/consensus/kafka/chain.go里,与solo一样实现了orderer/common/broadcast/broadcast.go#Consenter这个接口。在orderer的处理消息时orderer/common/broadcast/broadcast.go#Handle,调用了其Order方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
func (chain *chainImpl) Order(env *cb.Envelope, configSeq uint64) error {
return chain.order(env, configSeq, int64(0))
}

func (chain *chainImpl) order(env *cb.Envelope, configSeq uint64, originalOffset int64) error {
marshaledEnv, err := utils.Marshal(env)
if !chain.enqueue(newNormalMessage(marshaledEnv, configSeq, originalOffset)) {
return fmt.Errorf("cannot enqueue")
}
return nil
}

// enqueue accepts a message and returns true on acceptance, or false otheriwse.
func (chain *chainImpl) enqueue(kafkaMsg *ab.KafkaMessage) bool {
select {
case <-chain.startChan: // The Start phase has completed
select {
case <-chain.haltChan: // The chain has been halted, stop here
return false
default: // The post path
payload, err := utils.Marshal(kafkaMsg)

message := newProducerMessage(chain.channel, payload)
if _, _, err = chain.producer.SendMessage(message); err != nil {
return false
}
return true
}
default: // Not ready yet
return false
}
}

func newProducerMessage(channel channel, pld []byte) *sarama.ProducerMessage {
return &sarama.ProducerMessage{
Topic: channel.topic(),
Key: sarama.StringEncoder(strconv.Itoa(int(channel.partition()))),
Value: sarama.ByteEncoder(pld),
}
}

首先构建了消息实体orderer/consensus/kafka/chain.go:KafkaMessage,入参为传递的消息cb.Envelope,当前配置序列号configSeq,固定的originalOffset值为0(这个值的含义后面会提到,表示的是是否当前消息是重新发送重排,值为0则说明当前消息是新消息)。然后构建传递kafka消息sarama.ProducerMessage,这里kafka相关的使用第三方库Shopify/sarama。消息里指定了topic,由partition构建key,然后就是将刚才构建的消息作为payload。后面可以看到topic和partition都是channel的配置参数(初始化或者后面的更新)。这里可以看出,所有的kafka消息的key都是相同的,意味这所有消息都发送到同一个partition内,按照key的算法也就是channel.partition()。这里保证了单个orderer发出的envelop的有序性,但同时从全局来说,只使用一个partition,并没有充分利用kafka多partition的带来的高性能。然后使用sarama.SyncProducer发送消息chain.producer.SendMessage

接下来分析sarama.SyncProducer的初始化,以及相应的kafka consumer消费过程。kafka的初始化方法是orderer/consensus/kafka/chain.go#startThread。从这个一直回溯到chainsupport.go#start,总共有两个地方调用。一是registar.go#NewRegistrar,这里从ledgerFactory里的记录找出existingChains重建;二是registar.go#newChain,接收到消息的payload.Header.ChannelHeader.Type为cb.HeaderType_ORDERER_TRANSACTION时新建。这里以新建chain(channel)为例。newChain方法里调用了newChainSupport,里面初始化chainsupport的Chain参数cs.Chain, err = consenter.HandleChain(cs, metadata)。这里的consenter是consensus.go#Consenter接口,这里的实现类是orderer/consensus/kafka/consenter.go:consenterImpl.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
func newChainSupport(
registrar *Registrar,
ledgerResources *ledgerResources,
consenters map[string]consensus.Consenter,
signer crypto.LocalSigner,
) *ChainSupport {
// Read in the last block and metadata for the channel
lastBlock := blockledger.GetBlock(ledgerResources, ledgerResources.Height()-1)

metadata, err := utils.GetMetadataFromBlock(lastBlock, cb.BlockMetadataIndex_ORDERER)

// Construct limited support needed as a parameter for additional support
cs := &ChainSupport{
ledgerResources: ledgerResources,
LocalSigner: signer,
cutter: blockcutter.NewReceiverImpl(ledgerResources),
}

// Set up the msgprocessor
cs.Processor = msgprocessor.NewStandardChannel(cs, msgprocessor.CreateStandardChannelFilters(cs))

// Set up the block writer
cs.BlockWriter = newBlockWriter(lastBlock, registrar, cs)

// Set up the consenter
cs.Chain, err = consenter.HandleChain(cs, metadata)
return cs
}

// HandleChain creates/returns a reference to a consensus.Chain object for the given set of support resources. Implements the consensus.Consenter interface.
// Called by consensus.newChainSupport(), which is itself called by multichannel.NewManagerImpl() when ranging over the ledgerFactory's existingChains.
func (consenter *consenterImpl) HandleChain(support consensus.ConsenterSupport, metadata *cb.Metadata) (consensus.Chain, error) {
lastOffsetPersisted, lastOriginalOffsetProcessed, lastResubmittedConfigOffset := getOffsets(metadata.Value, support.ChainID())
return newChain(consenter, support, lastOffsetPersisted, lastOriginalOffsetProcessed, lastResubmittedConfigOffset)
}

func getOffsets(metadataValue []byte, chainID string) (persisted int64, processed int64, resubmitted int64) {
if metadataValue != nil {
// Extract orderer-related metadata from the tip of the ledger first
kafkaMetadata := &ab.KafkaMetadata{}
proto.Unmarshal(metadataValue, kafkaMetadata);
return kafkaMetadata.LastOffsetPersisted,
kafkaMetadata.LastOriginalOffsetProcessed,
kafkaMetadata.LastResubmittedConfigOffset
}
return sarama.OffsetOldest - 1, int64(0), int64(0) // default
}

  1. 从上一个区块blockledger.GetBlock(ledgerResources, ledgerResources.Height()-1)获取元数据,也就是BlockMetadata.Metadata内key为cb.BlockMetadataIndex_SIGNATURES的值,在instantiate(8)里提及其他三种。从getOffsets里看到,如果说当前channel已经存在也消费过数据,例如重启回复的情况下,则使用的是metadata里记录的值。如果是新建channel,则使用了默认LastOffsetPersisted,LastOriginalOffsetProcessed,LastResubmittedConfigOffset这三个值分别取sarama.OffsetOldest - 1,0,0。具体参数的含义后面分析。
  2. 接着调用chain.go#newChain,里面主要是构建了orderer/consensus/kafka/chain.go:chainImpl这个实体。其中需要关注的是chainImpl.channel这个参数初始化为newChannel(support.ChainID(), defaultPartition).即kafka的topic为support.ChainID()(即channel名),partition为固定值0,这也就是前面提到的channel的topic和partition。
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // channel.go
    const defaultPartition = 0

    // Returns a new channel for a given topic name and partition number.
    func newChannel(topic string, partition int32) channel {
    return &channelImpl{
    tpc: fmt.Sprintf("%s", topic),
    prt: partition,
    }
    }

下面再具体分析chain.go#startThread

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Called by Start().
func startThread(chain *chainImpl) {
// Set up the producer
chain.producer, err = setupProducerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)

// Have the producer post the CONNECT message
if err = sendConnectMessage(chain.consenter.retryOptions(), chain.haltChan, chain.producer, chain.channel); err != nil {}

// Set up the parent consumer
chain.parentConsumer, err = setupParentConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.SharedConfig().KafkaBrokers(), chain.consenter.brokerConfig(), chain.channel)

// Set up the channel consumer
chain.channelConsumer, err = setupChannelConsumerForChannel(chain.consenter.retryOptions(), chain.haltChan, chain.parentConsumer, chain.channel, chain.lastOffsetPersisted+1)

chain.doneProcessingMessagesToBlocks = make(chan struct{})

close(chain.startChan) // Broadcast requests will now go through
chain.errorChan = make(chan struct{}) // Deliver requests will also go through

chain.processMessagesToBlocks() // Keep up to date with the channel
}

  1. setupProducerForChannel尝试用配置好的参数创建kafka的producer(sarama.SyncProducer),并且加上了失败重试机制。配置在orderer.yaml文件里。
  2. sendConnectMessage使用刚创建的kafka producer发送ab.KafkaMessage_Connect消息,payload为nil,只是为了保证配置正确能正常返送。(Post a CONNECT message to the channel using the given retry options. This prevents the panicking that would occur if we were to set up a consumer and seek on a partition that hadn’t been written to yet. )这里也加上失败重连机制。
  3. setupParentConsumerForChannelsetupChannelConsumerForChannel构造sarama.PartitionConsumer,只消费指定partition的数据,这里消费的就是初始化时设置的channle.Partition(),实际上是const defaultPartition = 0。失败重试。
  4. processMessagesToBlocks真正的sarama.PartitionConsumer消费数据,处理数据。这个方法里有比较多的select-case,首先是处理case kafkaErr := <-chain.channelConsumer.Errors():,这里错误分两种,一种是sarama.ErrOffsetOutOfRange:,这类错误无法通过自动重试回复,则重新发送连接消息推进offset,go sendConnectMessage(chain.consenter.retryOptions(), chain.haltChan, chain.producer, chain.channel);另一种错误则是可以通过自带的重试机制回复,这里有两个变量和case分支。1)case <-topicPartitionSubscriptionResumed:错误时添加监听器,成功重连后打印日志;2)case <-deliverSessionTimedOut:重连超时,则再次发送连接消息go sendConnectMessage(...)。这个select-case里交织着处理这两个变量分支的代码。

接下来是重要的两个case。第一个case <-chain.timer:这里是在当前channel设置的timer到期后,发送&ab.KafkaMessage_TimeToCut消息到kafka通知所有orderer,这里的参数为下一个block的number(chain.lastCutBlockNumber+1)。第二个case in, ok := <-chain.channelConsumer.Messages():里kafka收到的消息类型总共有三种。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
for {
select {
case in, ok := <-chain.channelConsumer.Messages():
switch msg.Type.(type) {
case *ab.KafkaMessage_Connect:
_ = chain.processConnect(chain.ChainID())
case *ab.KafkaMessage_TimeToCut:
if err := chain.processTimeToCut(msg.GetTimeToCut(), in.Offset); err != nil {...}
case *ab.KafkaMessage_Regular:
if err := chain.processRegular(msg.GetRegular(), in.Offset); err != nil {...}
}
case <-chain.timer:
if err := sendTimeToCut(chain.producer, chain.channel, chain.lastCutBlockNumber+1, &chain.timer); err != nil {...}
}
  1. case *ab.KafkaMessage_Connect:前面提到的在startThread启动时发送连接消息和遇到错误时尝试重联都回发送此类消息以推进offset。这里对这类消息不需要处理,仅仅作为记录。
  2. case *ab.KafkaMessage_TimeToCut:判断消息里要求cut的blocknum是否是当前节点本地block数据的下一个block,如果是的话,直接将当前汇集的batch切割,剩下的跟instantiate(8)里solo的流程一致,写入block。这里有所区别的是这里要提交kafka的metadata到block里,而这部分metadata写入了bw.lastBlock.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER里,也就是上面提到在重启chain时候从block的BlockMetadata.Metadata内取key为cb.BlockMetadataIndex_SIGNATURES的值来获取kafka消费消息。回顾对比solo方式,在调用chain.WriteBlock()时,传入的第二个参数为nil。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
     func (chain *chainImpl) processTimeToCut(ttcMessage *ab.KafkaMessageTimeToCut, receivedOffset int64) error {
    ttcNumber := ttcMessage.GetBlockNumber()
    if ttcNumber == chain.lastCutBlockNumber+1 {
    chain.timer = nil
    batch := chain.BlockCutter().Cut()
    block := chain.CreateNextBlock(batch)
    metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
    LastOffsetPersisted: receivedOffset,
    LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
    })
    chain.WriteBlock(block, metadata)
    chain.lastCutBlockNumber++
    return nil
    } else if ttcNumber > chain.lastCutBlockNumber+1 {
    return fmt.Errorf(...)
    }
    return nil
    }

    func (bw *BlockWriter) WriteBlock(block *cb.Block, encodedMetadataValue []byte) {
    bw.committingBlock.Lock()
    bw.lastBlock = block
    go func() {
    defer bw.committingBlock.Unlock()
    bw.commitBlock(encodedMetadataValue)
    }()
    }

    func (bw *BlockWriter) commitBlock(encodedMetadataValue []byte) {
    // Set the orderer-related metadata field
    if encodedMetadataValue != nil {
    bw.lastBlock.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
    }
    // 省略...
    }
  1. case *ab.KafkaMessage_Regular:处理消息(envelop)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, receivedOffset int64) error {

    // When committing a normal message, we also update `lastOriginalOffsetProcessed` with `newOffset`. It is caller's responsibility to deduce correct value of `newOffset` based on following rules:
    // - if Resubmission is switched off, it should always be zero
    // - if the message is committed on first pass, meaning it's not re-validated and re-ordered, this value should be the same as current `lastOriginalOffsetProcessed`
    // - if the message is re-validated and re-ordered, this value should be the `OriginalOffset` of that Kafka message, so that `lastOriginalOffsetProcessed` is advanced
    // 其实,这个方法就是完成3件事情。
    // 1)如果不需要切割,并且如果chain.timer没有设置则重设timer(用以倒计时发送*ab.KafkaMessage_TimeToCut);
    // 2)切割,更新KafkaMetadata.LastOffsetPersisted为这个block最后envelope的offset;
    // 3)更新chain.lastOriginalOffsetProcessed,并且用作KafkaMetadata.LastOriginalOffsetProcessed。
    commitNormalMsg := func(message *cb.Envelope, newOffset int64) {
    batches, pending := chain.BlockCutter().Ordered(message)

    if len(batches) == 0 {
    // If no block is cut, we update the `lastOriginalOffsetProcessed`, start the timer if necessary and return
    chain.lastOriginalOffsetProcessed = newOffset
    if chain.timer == nil {
    // configtx.yaml里的Orderer: &OrdererDefaults.BatchTimeout
    chain.timer = time.After(chain.SharedConfig().BatchTimeout())
    }
    return
    }

    chain.timer = nil

    offset := receivedOffset
    if pending || len(batches) == 2 {
    offset--
    } else {
    chain.lastOriginalOffsetProcessed = newOffset
    }

    // Commit the first block
    block := chain.CreateNextBlock(batches[0])
    metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
    // LastOffsetPersisted记录的是这个block的最后envelope的offset,也就是消费的最后offset
    LastOffsetPersisted: offset,
    // LastOriginalOffsetProcessed记录指的是截止该block,originalOffset小于这个值的所有message都已经被排序,即最新处理的originalOffset
    LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
    // LastResubmittedConfigOffset这个记录的是最近提交的重新校验排序的configMsg的offset
    LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
    })
    chain.WriteBlock(block, metadata)
    chain.lastCutBlockNumber++

    // Commit the second block if exists
    if len(batches) == 2 {...}
    }

    seq := chain.Sequence()
    env := &cb.Envelope{}

    // 这部分主要是为了兼容v1.1前的版本,前面的版本不支持re-submission
    // 这里的配置在configtx.yaml里,V1_1:true则`chain.SharedConfig().Capabilities().Resubmission()`返回为true。这里分析默认都使用V1.1后的版本
    //Capabilities:
    // Orderer: &OrdererCapabilities
    // V1_1: true
    if regularMessage.Class == ab.KafkaMessageRegular_UNKNOWN || !chain.SharedConfig().Capabilities().Resubmission() {...}

    switch regularMessage.Class {
    case ab.KafkaMessageRegular_NORMAL:
    // This is a message that is re-validated and re-ordered
    // 普通消息的OriginalOffset为0,因为configSeq改变被重新验证和加入重排序
    if regularMessage.OriginalOffset != 0 {
    // chain.lastOriginalOffsetProcessed记录着最近的处理的重排消息的offset,意味着小于这个的消息都已经处理过,故返回不再重复处理。
    // 从下面的`regularMessage.ConfigSeq < seq`可知,这是因为有多个orderer节点,每个节点都会往kafka重发消息,故可能会存在多条同样的消息(OriginalOffset相同但offset不同)
    if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {
    return nil
    }
    // 未处理的重排消息按照正常流程往下处理
    }

    // The config sequence has advanced,当前的配置已经更新,因此需要重新验证,重发kafka进行re-order
    if regularMessage.ConfigSeq < seq {
    configSeq, err := chain.ProcessNormalMsg(env)
    if err != nil {
    return fmt.Errorf("discarding bad normal message because = %s", err)
    }

    // 新配置下重新校验通过
    // For both messages that are ordered for the first time or re-ordered, we set original offset to current received offset and re-order it.
    if err := chain.order(env, configSeq, receivedOffset); err != nil {...}
    return nil
    }

    // 下面的commitNormalMsg方法里可以看到用offset更新chain.lastOriginalOffsetProcessed,
    // 因此,offset或者保持原值chain.lastOriginalOffsetProcessed(当前消息不是re-order消息),或者采用当前消息的OriginalOffset(当前消息是re-order消息)
    offset := regularMessage.OriginalOffset
    if offset == 0 {
    offset = chain.lastOriginalOffsetProcessed
    }

    commitNormalMsg(env, offset)

    case ab.KafkaMessageRegular_CONFIG:
    // This is a message that is re-validated and re-ordered,同上
    if regularMessage.OriginalOffset != 0 {
    // 同上, normalMsg
    if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {
    return nil
    }

    // lastResubmittedConfigOffset这个记录的是最近提交的被重新校验排序的configMsg的offset,如果消息的OriginalOffset等于该值,并且configSeq也相等,则说明已经本地已更新了配置,并且是最新的配置,可以关闭doneReprocessingMsgInFlight这个channel,继续消费(后面详述)
    if regularMessage.OriginalOffset == chain.lastResubmittedConfigOffset && // This is very last resubmitted config message
    regularMessage.ConfigSeq == seq { // AND we don't need to resubmit it again
    close(chain.doneReprocessingMsgInFlight) // Therefore, we could finally close the channel to unblock broadcast
    }

    // Somebody resubmitted message at offset X, whereas we didn't. This is due to non-determinism where
    // that message was considered invalid by us during revalidation, however somebody else deemed it to
    // be valid, and resubmitted it. We need to advance lastResubmittedConfigOffset in this case in order
    // to enforce consistency across the network.
    if chain.lastResubmittedConfigOffset < regularMessage.OriginalOffset {
    chain.lastResubmittedConfigOffset = regularMessage.OriginalOffset
    }
    }

    // The config sequence has advanced
    if regularMessage.ConfigSeq < seq {
    // ProcessConfigUpdateMsg will attempt to apply the config impetus msg to the current configuration, and if successful
    // return the resulting config message and the configSeq the config was computed from. If the config impetus message
    // is invalid, an error is returned.
    // 在这个方法里,将envelop里指定的配置尝试作用于本地
    configEnv, configSeq, err := chain.ProcessConfigMsg(env)

    // 同上, normalMsg。For both messages that are ordered for the first time or re-ordered, we set original offset to current received offset and re-order it.
    if err := chain.configure(configEnv, configSeq, receivedOffset); err != nil {
    return fmt.Errorf("error re-submitting config message because = %s", err)
    }

    // 更新lastResubmittedConfigOffset,最新的重新提交configMsg的offset
    chain.lastResubmittedConfigOffset = receivedOffset // Keep track of last resubmitted message offset
    chain.doneReprocessingMsgInFlight = make(chan struct{}) // Create the channel to block ingress messages

    return nil
    }

    // 同上,normalMsg
    offset := regularMessage.OriginalOffset
    if offset == 0 {
    offset = chain.lastOriginalOffsetProcessed
    }

    // 同上,commitNormalMsg
    commitConfigMsg(env, offset)
    return nil
    }

    回顾instantiate(8)里orderer/common/broadcast/broadcast.go#Handle提到的,orderer的broacast服务调用processor.WaitReady(), 这里select-case case <-chain.doneReprocessingMsgInFlight:,也就是说,在configMsg进行re-order时,不再对外提供服务接收处理新数据,直到最新的配置已更新,关闭doneReprocessingMsgInFlight这个channel。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    // Handle starts a service thread for a given gRPC connection and services the broadcast connection
    func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
    for {
    msg, err := srv.Recv()
    // ...
    if err = processor.WaitReady(); err != nil {
    return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()})
    }

    if !isConfig {
    configSeq, err := processor.ProcessNormalMsg(msg)
    err = processor.Order(msg, configSeq)
    } else { // isConfig
    config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
    err = processor.Configure(config, configSeq)
    }
    err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
    }
    }

    func (chain *chainImpl) WaitReady() error {
    select {
    case <-chain.startChan: // The Start phase has completed
    select {
    case <-chain.haltChan: // The chain has been halted, stop here
    return error
    // Block waiting for all re-submitted messages to be reprocessed
    case <-chain.doneReprocessingMsgInFlight:
    return nil
    }
    }
    }

总结来说,fabric的共识算法需要解决两个问题,一是交易消息的有序性,二是恶意节点的拜占庭问题。当前提供的两种共识机制单节点solo和多节点kafka。kafka在fabric的应用中,始终使用了单个partition,这样削弱了kafka本身提供的多分区带来的高性能。这样的考虑处于要最大程度保证交易排序和最后执行的有序性,虽然在v1.1版本后提供的可以re-validate和re-order特性在一定程度上违背了这种强有序性,但是在fabric里共识更重要的是全局一致性,即关键的是block是有序的,而且是一致的,而不在乎顺序是怎样的。但是最重要的是,kafka本身的共识算法并不能解决拜占庭问题,无法容忍网络里恶意节点的存在。这个通过fabric本身的准入审核,签名和策略等机制可以一定程度上预防这个问题。对于拜占庭问题,相对成熟的算法是pbft。在fabric的v0.6版本提供pbft共识机制,v1.0后采用了分割出orderer后,目前仍未提供pbft的相关实现。这个值得后续持续关注。

Hyperledger Fabric Ordering Service
HyperLeger Fabric - Bringing up a Kafka-based Ordering Service
HyperLeger Fabric - Introduction - Consensus
Hyperledger Fabric Model - Consensus
A Kafka-based Ordering Service for Fabric : 详细介绍fabri orderer应用kafka的设计思想演进过程