在instantiate(8)里提到,orderer总共有solo和kafka,etcdraft三种方式,并且介绍了单orderer节点的solo方式。这里继续补充介绍多orderer节点下采用的kafka方式。kafka实现的代码放在orderer/consensus/kafka/chain.go
里,与solo一样实现了orderer/common/broadcast/broadcast.go#Consenter
这个接口。在orderer的处理消息时orderer/common/broadcast/broadcast.go#Handle
,调用了其Order
方法。
1 | func (chain *chainImpl) Order(env *cb.Envelope, configSeq uint64) error { |
首先构建了消息实体orderer/consensus/kafka/chain.go:KafkaMessage
,入参为传递的消息cb.Envelope
,当前配置序列号configSeq
,固定的originalOffset
值为0(这个值的含义后面会提到,表示的是是否当前消息是重新发送重排,值为0则说明当前消息是新消息)。然后构建传递kafka消息sarama.ProducerMessage
,这里kafka相关的使用第三方库Shopify/sarama。消息里指定了topic,由partition构建key,然后就是将刚才构建的消息作为payload。后面可以看到topic和partition都是channel的配置参数(初始化或者后面的更新)。这里可以看出,所有的kafka消息的key都是相同的,意味这所有消息都发送到同一个partition内,按照key的算法也就是channel.partition()
。这里保证了单个orderer发出的envelop的有序性,但同时从全局来说,只使用一个partition,并没有充分利用kafka多partition的带来的高性能。然后使用sarama.SyncProducer
发送消息chain.producer.SendMessage
。
接下来分析sarama.SyncProducer
的初始化,以及相应的kafka consumer消费过程。kafka的初始化方法是orderer/consensus/kafka/chain.go#startThread
。从这个一直回溯到chainsupport.go#start
,总共有两个地方调用。一是registar.go#NewRegistrar
,这里从ledgerFactory里的记录找出existingChains重建;二是registar.go#newChain
,接收到消息的payload.Header.ChannelHeader.Type为cb.HeaderType_ORDERER_TRANSACTION
时新建。这里以新建chain(channel)为例。newChain
方法里调用了newChainSupport
,里面初始化chainsupport的Chain参数cs.Chain, err = consenter.HandleChain(cs, metadata)
。这里的consenter是consensus.go#Consenter
接口,这里的实现类是orderer/consensus/kafka/consenter.go:consenterImpl
.
1 | func newChainSupport( |
- 从上一个区块
blockledger.GetBlock(ledgerResources, ledgerResources.Height()-1)
获取元数据,也就是BlockMetadata.Metadata
内key为cb.BlockMetadataIndex_SIGNATURES
的值,在instantiate(8)里提及其他三种。从getOffsets
里看到,如果说当前channel已经存在也消费过数据,例如重启回复的情况下,则使用的是metadata里记录的值。如果是新建channel,则使用了默认LastOffsetPersisted
,LastOriginalOffsetProcessed
,LastResubmittedConfigOffset
这三个值分别取sarama.OffsetOldest - 1
,0,0。具体参数的含义后面分析。 - 接着调用
chain.go#newChain
,里面主要是构建了orderer/consensus/kafka/chain.go:chainImpl
这个实体。其中需要关注的是chainImpl.channel
这个参数初始化为newChannel(support.ChainID(), defaultPartition)
.即kafka的topic为support.ChainID()
(即channel名),partition为固定值0,这也就是前面提到的channel的topic和partition。1
2
3
4
5
6
7
8
9
10// channel.go
const defaultPartition = 0
// Returns a new channel for a given topic name and partition number.
func newChannel(topic string, partition int32) channel {
return &channelImpl{
tpc: fmt.Sprintf("%s", topic),
prt: partition,
}
}
下面再具体分析chain.go#startThread
1 | // Called by Start(). |
setupProducerForChannel
尝试用配置好的参数创建kafka的producer(sarama.SyncProducer
),并且加上了失败重试机制。配置在orderer.yaml
文件里。sendConnectMessage
使用刚创建的kafka producer发送ab.KafkaMessage_Connect
消息,payload为nil,只是为了保证配置正确能正常返送。(Post a CONNECT message to the channel using the given retry options. This prevents the panicking that would occur if we were to set up a consumer and seek on a partition that hadn’t been written to yet. )这里也加上失败重连机制。setupParentConsumerForChannel
和setupChannelConsumerForChannel
构造sarama.PartitionConsumer
,只消费指定partition的数据,这里消费的就是初始化时设置的channle.Partition(),实际上是const defaultPartition = 0
。失败重试。processMessagesToBlocks
真正的sarama.PartitionConsumer
消费数据,处理数据。这个方法里有比较多的select-case
,首先是处理case kafkaErr := <-chain.channelConsumer.Errors():
,这里错误分两种,一种是sarama.ErrOffsetOutOfRange:
,这类错误无法通过自动重试回复,则重新发送连接消息推进offset,go sendConnectMessage(chain.consenter.retryOptions(), chain.haltChan, chain.producer, chain.channel)
;另一种错误则是可以通过自带的重试机制回复,这里有两个变量和case分支。1)case <-topicPartitionSubscriptionResumed:
错误时添加监听器,成功重连后打印日志;2)case <-deliverSessionTimedOut:
重连超时,则再次发送连接消息go sendConnectMessage(...)
。这个select-case
里交织着处理这两个变量分支的代码。
接下来是重要的两个case。第一个case <-chain.timer:
这里是在当前channel设置的timer到期后,发送&ab.KafkaMessage_TimeToCut
消息到kafka通知所有orderer,这里的参数为下一个block的number(chain.lastCutBlockNumber+1
)。第二个case in, ok := <-chain.channelConsumer.Messages():
里kafka收到的消息类型总共有三种。
1 | for { |
case *ab.KafkaMessage_Connect:
前面提到的在startThread
启动时发送连接消息和遇到错误时尝试重联都回发送此类消息以推进offset。这里对这类消息不需要处理,仅仅作为记录。case *ab.KafkaMessage_TimeToCut:
判断消息里要求cut的blocknum是否是当前节点本地block数据的下一个block,如果是的话,直接将当前汇集的batch切割,剩下的跟instantiate(8)里solo的流程一致,写入block。这里有所区别的是这里要提交kafka的metadata到block里,而这部分metadata写入了bw.lastBlock.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER
里,也就是上面提到在重启chain时候从block的BlockMetadata.Metadata
内取key为cb.BlockMetadataIndex_SIGNATURES
的值来获取kafka消费消息。回顾对比solo方式,在调用chain.WriteBlock()
时,传入的第二个参数为nil。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35func (chain *chainImpl) processTimeToCut(ttcMessage *ab.KafkaMessageTimeToCut, receivedOffset int64) error {
ttcNumber := ttcMessage.GetBlockNumber()
if ttcNumber == chain.lastCutBlockNumber+1 {
chain.timer = nil
batch := chain.BlockCutter().Cut()
block := chain.CreateNextBlock(batch)
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
LastOffsetPersisted: receivedOffset,
LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
})
chain.WriteBlock(block, metadata)
chain.lastCutBlockNumber++
return nil
} else if ttcNumber > chain.lastCutBlockNumber+1 {
return fmt.Errorf(...)
}
return nil
}
func (bw *BlockWriter) WriteBlock(block *cb.Block, encodedMetadataValue []byte) {
bw.committingBlock.Lock()
bw.lastBlock = block
go func() {
defer bw.committingBlock.Unlock()
bw.commitBlock(encodedMetadataValue)
}()
}
func (bw *BlockWriter) commitBlock(encodedMetadataValue []byte) {
// Set the orderer-related metadata field
if encodedMetadataValue != nil {
bw.lastBlock.Metadata.Metadata[cb.BlockMetadataIndex_ORDERER] = utils.MarshalOrPanic(&cb.Metadata{Value: encodedMetadataValue})
}
// 省略...
}
case *ab.KafkaMessage_Regular:
处理消息(envelop)1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147func (chain *chainImpl) processRegular(regularMessage *ab.KafkaMessageRegular, receivedOffset int64) error {
// When committing a normal message, we also update `lastOriginalOffsetProcessed` with `newOffset`. It is caller's responsibility to deduce correct value of `newOffset` based on following rules:
// - if Resubmission is switched off, it should always be zero
// - if the message is committed on first pass, meaning it's not re-validated and re-ordered, this value should be the same as current `lastOriginalOffsetProcessed`
// - if the message is re-validated and re-ordered, this value should be the `OriginalOffset` of that Kafka message, so that `lastOriginalOffsetProcessed` is advanced
// 其实,这个方法就是完成3件事情。
// 1)如果不需要切割,并且如果chain.timer没有设置则重设timer(用以倒计时发送*ab.KafkaMessage_TimeToCut);
// 2)切割,更新KafkaMetadata.LastOffsetPersisted为这个block最后envelope的offset;
// 3)更新chain.lastOriginalOffsetProcessed,并且用作KafkaMetadata.LastOriginalOffsetProcessed。
commitNormalMsg := func(message *cb.Envelope, newOffset int64) {
batches, pending := chain.BlockCutter().Ordered(message)
if len(batches) == 0 {
// If no block is cut, we update the `lastOriginalOffsetProcessed`, start the timer if necessary and return
chain.lastOriginalOffsetProcessed = newOffset
if chain.timer == nil {
// configtx.yaml里的Orderer: &OrdererDefaults.BatchTimeout
chain.timer = time.After(chain.SharedConfig().BatchTimeout())
}
return
}
chain.timer = nil
offset := receivedOffset
if pending || len(batches) == 2 {
offset--
} else {
chain.lastOriginalOffsetProcessed = newOffset
}
// Commit the first block
block := chain.CreateNextBlock(batches[0])
metadata := utils.MarshalOrPanic(&ab.KafkaMetadata{
// LastOffsetPersisted记录的是这个block的最后envelope的offset,也就是消费的最后offset
LastOffsetPersisted: offset,
// LastOriginalOffsetProcessed记录指的是截止该block,originalOffset小于这个值的所有message都已经被排序,即最新处理的originalOffset
LastOriginalOffsetProcessed: chain.lastOriginalOffsetProcessed,
// LastResubmittedConfigOffset这个记录的是最近提交的重新校验排序的configMsg的offset
LastResubmittedConfigOffset: chain.lastResubmittedConfigOffset,
})
chain.WriteBlock(block, metadata)
chain.lastCutBlockNumber++
// Commit the second block if exists
if len(batches) == 2 {...}
}
seq := chain.Sequence()
env := &cb.Envelope{}
// 这部分主要是为了兼容v1.1前的版本,前面的版本不支持re-submission
// 这里的配置在configtx.yaml里,V1_1:true则`chain.SharedConfig().Capabilities().Resubmission()`返回为true。这里分析默认都使用V1.1后的版本
//Capabilities:
// Orderer: &OrdererCapabilities
// V1_1: true
if regularMessage.Class == ab.KafkaMessageRegular_UNKNOWN || !chain.SharedConfig().Capabilities().Resubmission() {...}
switch regularMessage.Class {
case ab.KafkaMessageRegular_NORMAL:
// This is a message that is re-validated and re-ordered
// 普通消息的OriginalOffset为0,因为configSeq改变被重新验证和加入重排序
if regularMessage.OriginalOffset != 0 {
// chain.lastOriginalOffsetProcessed记录着最近的处理的重排消息的offset,意味着小于这个的消息都已经处理过,故返回不再重复处理。
// 从下面的`regularMessage.ConfigSeq < seq`可知,这是因为有多个orderer节点,每个节点都会往kafka重发消息,故可能会存在多条同样的消息(OriginalOffset相同但offset不同)
if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {
return nil
}
// 未处理的重排消息按照正常流程往下处理
}
// The config sequence has advanced,当前的配置已经更新,因此需要重新验证,重发kafka进行re-order
if regularMessage.ConfigSeq < seq {
configSeq, err := chain.ProcessNormalMsg(env)
if err != nil {
return fmt.Errorf("discarding bad normal message because = %s", err)
}
// 新配置下重新校验通过
// For both messages that are ordered for the first time or re-ordered, we set original offset to current received offset and re-order it.
if err := chain.order(env, configSeq, receivedOffset); err != nil {...}
return nil
}
// 下面的commitNormalMsg方法里可以看到用offset更新chain.lastOriginalOffsetProcessed,
// 因此,offset或者保持原值chain.lastOriginalOffsetProcessed(当前消息不是re-order消息),或者采用当前消息的OriginalOffset(当前消息是re-order消息)
offset := regularMessage.OriginalOffset
if offset == 0 {
offset = chain.lastOriginalOffsetProcessed
}
commitNormalMsg(env, offset)
case ab.KafkaMessageRegular_CONFIG:
// This is a message that is re-validated and re-ordered,同上
if regularMessage.OriginalOffset != 0 {
// 同上, normalMsg
if regularMessage.OriginalOffset <= chain.lastOriginalOffsetProcessed {
return nil
}
// lastResubmittedConfigOffset这个记录的是最近提交的被重新校验排序的configMsg的offset,如果消息的OriginalOffset等于该值,并且configSeq也相等,则说明已经本地已更新了配置,并且是最新的配置,可以关闭doneReprocessingMsgInFlight这个channel,继续消费(后面详述)
if regularMessage.OriginalOffset == chain.lastResubmittedConfigOffset && // This is very last resubmitted config message
regularMessage.ConfigSeq == seq { // AND we don't need to resubmit it again
close(chain.doneReprocessingMsgInFlight) // Therefore, we could finally close the channel to unblock broadcast
}
// Somebody resubmitted message at offset X, whereas we didn't. This is due to non-determinism where
// that message was considered invalid by us during revalidation, however somebody else deemed it to
// be valid, and resubmitted it. We need to advance lastResubmittedConfigOffset in this case in order
// to enforce consistency across the network.
if chain.lastResubmittedConfigOffset < regularMessage.OriginalOffset {
chain.lastResubmittedConfigOffset = regularMessage.OriginalOffset
}
}
// The config sequence has advanced
if regularMessage.ConfigSeq < seq {
// ProcessConfigUpdateMsg will attempt to apply the config impetus msg to the current configuration, and if successful
// return the resulting config message and the configSeq the config was computed from. If the config impetus message
// is invalid, an error is returned.
// 在这个方法里,将envelop里指定的配置尝试作用于本地
configEnv, configSeq, err := chain.ProcessConfigMsg(env)
// 同上, normalMsg。For both messages that are ordered for the first time or re-ordered, we set original offset to current received offset and re-order it.
if err := chain.configure(configEnv, configSeq, receivedOffset); err != nil {
return fmt.Errorf("error re-submitting config message because = %s", err)
}
// 更新lastResubmittedConfigOffset,最新的重新提交configMsg的offset
chain.lastResubmittedConfigOffset = receivedOffset // Keep track of last resubmitted message offset
chain.doneReprocessingMsgInFlight = make(chan struct{}) // Create the channel to block ingress messages
return nil
}
// 同上,normalMsg
offset := regularMessage.OriginalOffset
if offset == 0 {
offset = chain.lastOriginalOffsetProcessed
}
// 同上,commitNormalMsg
commitConfigMsg(env, offset)
return nil
}回顾instantiate(8)里
orderer/common/broadcast/broadcast.go#Handle
提到的,orderer的broacast服务调用processor.WaitReady()
, 这里select-case case <-chain.doneReprocessingMsgInFlight:
,也就是说,在configMsg进行re-order时,不再对外提供服务接收处理新数据,直到最新的配置已更新,关闭doneReprocessingMsgInFlight这个channel。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32// Handle starts a service thread for a given gRPC connection and services the broadcast connection
func (bh *handlerImpl) Handle(srv ab.AtomicBroadcast_BroadcastServer) error {
for {
msg, err := srv.Recv()
// ...
if err = processor.WaitReady(); err != nil {
return srv.Send(&ab.BroadcastResponse{Status: cb.Status_SERVICE_UNAVAILABLE, Info: err.Error()})
}
if !isConfig {
configSeq, err := processor.ProcessNormalMsg(msg)
err = processor.Order(msg, configSeq)
} else { // isConfig
config, configSeq, err := processor.ProcessConfigUpdateMsg(msg)
err = processor.Configure(config, configSeq)
}
err = srv.Send(&ab.BroadcastResponse{Status: cb.Status_SUCCESS})
}
}
func (chain *chainImpl) WaitReady() error {
select {
case <-chain.startChan: // The Start phase has completed
select {
case <-chain.haltChan: // The chain has been halted, stop here
return error
// Block waiting for all re-submitted messages to be reprocessed
case <-chain.doneReprocessingMsgInFlight:
return nil
}
}
}
总结来说,fabric的共识算法需要解决两个问题,一是交易消息的有序性,二是恶意节点的拜占庭问题。当前提供的两种共识机制单节点solo和多节点kafka。kafka在fabric的应用中,始终使用了单个partition,这样削弱了kafka本身提供的多分区带来的高性能。这样的考虑处于要最大程度保证交易排序和最后执行的有序性,虽然在v1.1版本后提供的可以re-validate和re-order特性在一定程度上违背了这种强有序性,但是在fabric里共识更重要的是全局一致性,即关键的是block是有序的,而且是一致的,而不在乎顺序是怎样的。但是最重要的是,kafka本身的共识算法并不能解决拜占庭问题,无法容忍网络里恶意节点的存在。这个通过fabric本身的准入审核,签名和策略等机制可以一定程度上预防这个问题。对于拜占庭问题,相对成熟的算法是pbft。在fabric的v0.6版本提供pbft共识机制,v1.0后采用了分割出orderer后,目前仍未提供pbft的相关实现。这个值得后续持续关注。
Hyperledger Fabric Ordering Service
HyperLeger Fabric - Bringing up a Kafka-based Ordering Service
HyperLeger Fabric - Introduction - Consensus
Hyperledger Fabric Model - Consensus
A Kafka-based Ordering Service for Fabric : 详细介绍fabri orderer应用kafka的设计思想演进过程