peer端的gossip和deliver是紧密结合的。在fabric的结构中,存在leader peer节点,负责与orderer交互,通过deliver服务从orderer得到block。然后通过gossip服务扩散到其他的普通peer,从而达到全网同步。这里先介绍deliver服务。
从前文orderer的deliver过程可知,deliver的grpc服务是”/orderer.AtomicBroadcast/Deliver”,在源码中搜索并且一路回溯到方法peer/node/start.go#serve
,这个方法是执行命令peer node start
时的执行。(这里先介绍的当前peer重启的情况,新建立channel的情况后面介绍)。然后执行到core/peer/peer.go:Initialize
,这里面执行ledgermgmt.Initialize(ConfigTxProcessors)
恢复历史账本。然后从这些历史账本信息可以获取到ledgerId(即channelId)。然后对每个channel加载账本,并且从账本中获取最新的ConfigBlock,然后调用createChain
重新构造channel信息。在core/peer/peer.go:createChain
这个方法里,也是从ledger里获取到配置信息,从而得到orderer地址ordererAddresses := bundle.ChannelConfig().OrdererAddresses()
,然后调用service.GetGossipService().InitializeChannel
。在gossip/service/gossip_service.go:InitializeChannel
里,启动deliver服务需要经过判断是否节点是leader peer,即peer.gossip.useLeaderElection
(通过选举的方式动态决定leader peer)和peer.gossip.orgLeader
(静态指定当前peer为leader peer),这两种方式是互斥的。前面提到过,只有leader才与orderer进行交互,如果不是,则返回,无需继续往下启动deliver服务。
1 | // Parameters: |
假设当前节点是通过静态指定为leader,则进入创建deliver服务g.deliveryService[chainID].StartDeliverForChannel
.在这个方法里主要分两部。
调用
client := d.newClient(chainID, ledgerInfo)
构造client。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18func (d *deliverServiceImpl) newClient(chainID string, ledgerInfoProvider blocksprovider.LedgerInfo) *broadcastClient {
requester := &blocksRequester{
tls: viper.GetBool("peer.tls.enabled"),
chainID: chainID,
}
broadcastSetup := func(bd blocksprovider.BlocksDeliverer) error {
return requester.RequestBlocks(ledgerInfoProvider)
}
connProd := comm.NewConnectionProducer(d.conf.ConnFactory(chainID), d.conf.Endpoints)
bClient := NewBroadcastClient(connProd, d.conf.ABCFactory, broadcastSetup, backoffPolicy)
requester.client = bClient
return bClient
}
func NewBroadcastClient(prod comm.ConnectionProducer, clFactory clientFactory, onConnect broadcastSetup, bos retryPolicy) *broadcastClient {
return &broadcastClient{prod: prod, onConnect: onConnect, shouldRetry: bos, createClient: clFactory, stopChan: make(chan struct{}, 1)}
}构建
blocksProviderImpl
,并与channel对应存储d.blockProviders[chainID] = blocksprovider.NewBlocksProvider
.然后调用go d.launchBlockProvider(chainID, finalizer)
,进而调用了core/deliverservice/blocksprovider/blocksprovider.go#DeliverBlocks
。在这里调用core/deliverservice/client.go#Recv
方法,一路调用直到调用client.go#connect
。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27func (bc *broadcastClient) doAction(action func() (interface{}, error), actionOnNewConnection func()) (interface{}, error) {
if bc.conn == nil {
err := bc.connect()
actionOnNewConnection()
}
resp, err := action()
if err != nil {
bc.Disconnect(false)
return nil, err
}
return resp, nil
}
func (bc *broadcastClient) connect() error {
// ...
abc, err := bc.createClient(conn).Deliver(ctx)
err = bc.afterConnect(conn, abc, cf, endpoint)
// ...
}
func (bc *broadcastClient) afterConnect(conn *grpc.ClientConn, abc orderer.AtomicBroadcast_DeliverClient, cf context.CancelFunc, endpoint string) error {
// ...
bc.BlocksDeliverer = abc
err := bc.onConnect(bc)
// ...
}
这里先构建grpc client
abc, err := bc.createClient(conn).Deliver(ctx)
.这里的createClient
即上面构造broadcastClient
时传入的参数d.conf.ABCFactory
。这个参数可以回溯到gossip/service/gossip_service.go#Service
构造时(这个方法在前面提及的InitializeChannel
里被调用),指定ABCFactory: deliverclient.DefaultABCFactory
。可见,这里对应的grpc方法即/orderer.AtomicBroadcast/Deliver
.1
2
3
4
5
6
7
8
9
10
11
12
13
14func DefaultABCFactory(conn *grpc.ClientConn) orderer.AtomicBroadcastClient {
return orderer.NewAtomicBroadcastClient(conn)
}
func NewAtomicBroadcastClient(cc *grpc.ClientConn) AtomicBroadcastClient {
return &atomicBroadcastClient{cc}
}
func (c *atomicBroadcastClient) Deliver(ctx context.Context, opts ...grpc.CallOption) (AtomicBroadcast_DeliverClient, error) {
stream, err := grpc.NewClientStream(ctx, &_AtomicBroadcast_serviceDesc.Streams[1], c.cc, "/orderer.AtomicBroadcast/Deliver", opts...)
x := &atomicBroadcastDeliverClient{stream}
return x, nil
}然后在
connect
方法里设置bc.BlocksDeliverer = abc
,并且调用err = bc.afterConnect(conn, abc, cf, endpoint)
,进而调用了err := bc.onConnect(bc)
.在上面的broadcastClient
构造时传入参数onConnect
为broadcastSetup
,即执行requester.RequestBlocks(ledgerInfoProvider)
。这里实际上是向orderer请求block,header为common.HeaderType_DELIVER_SEEK_INFO
,同时分为两种情况,新的channel从oldest开始消费同步,而重启的channel则从指定的block开始(ledgerInfoProvider.LedgerHeight()
)。以上为start
值,stop
选择math.MaxUint64,Behavior
选择orderer.SeekInfo_BLOCK_UNTIL_READY
.回顾前文,这里表明是一直同步。这里发出的Envelope也就是orderer接收。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34func (b *blocksRequester) RequestBlocks(ledgerInfoProvider blocksprovider.LedgerInfo) error {
height, err := ledgerInfoProvider.LedgerHeight()
if height > 0 {
if err := b.seekLatestFromCommitter(height); err != nil{ return err }
} else {
if err := b.seekOldest(); err != nil { return err}
}
return nil
}
func (b *blocksRequester) seekOldest() error {
seekInfo := &orderer.SeekInfo{
Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Oldest{Oldest: &orderer.SeekOldest{}}},
Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
}
//TODO- epoch and msgVersion may need to be obtained for nowfollowing usage in orderer/configupdate/configupdate.go
msgVersion := int32(0)
epoch := uint64(0)
tlsCertHash := b.getTLSCertHash()
env, err := utils.CreateSignedEnvelopeWithTLSBinding(common.HeaderType_DELIVER_SEEK_INFO, b.chainID, localmsp.NewSigner(), seekInfo, msgVersion, epoch, tlsCertHash)
return b.client.Send(env)
}
func (b *blocksRequester) seekLatestFromCommitter(height uint64) error {
seekInfo := &orderer.SeekInfo{
Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: height}}},
Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
}
// ...
}
- 接下来转向消息的接收。真正处理
DeliverResponse
的地方在blocksprovider.go#DeliverBlocks
,这里for循环判断!b.isDone()
则使用msg, err := b.client.Recv()
接收信息。core/deliverservice/client.go#Recv
这个方法里,使用try
方法,实际上就是重试机制,当失败时,并且没有达到重试阈值时(前文构造client时传入的参数backoffPolicy
)则重新执行,这里包括先判断connection是否为nil,如果是则调用前面分析过的connect
方法,然后调用action
,也就是b.client.Recv()
传入的方法参数,实际上是执行bc.BlocksDeliverer.Recv()
。这里注意,在前面分析的afterConnect
方法里,已经设置bc.BlocksDeliverer=abc
(即orderer.AtomicBroadcast_DeliverClient
),所以这里就是从orderer的deliver grpc stream里读取,然后逐级返回到blocksprovider.go#DeliverBlocks
。处理接收到的block这部分与gossip相关,留待后续再一起分析。