Fabric 1.4源码分析 - peer的deliver过程(1) peer与orderer的交互

peer端的gossip和deliver是紧密结合的。在fabric的结构中,存在leader peer节点,负责与orderer交互,通过deliver服务从orderer得到block。然后通过gossip服务扩散到其他的普通peer,从而达到全网同步。这里先介绍deliver服务。

从前文orderer的deliver过程可知,deliver的grpc服务是”/orderer.AtomicBroadcast/Deliver”,在源码中搜索并且一路回溯到方法peer/node/start.go#serve,这个方法是执行命令peer node start时的执行。(这里先介绍的当前peer重启的情况,新建立channel的情况后面介绍)。然后执行到core/peer/peer.go:Initialize,这里面执行ledgermgmt.Initialize(ConfigTxProcessors)恢复历史账本。然后从这些历史账本信息可以获取到ledgerId(即channelId)。然后对每个channel加载账本,并且从账本中获取最新的ConfigBlock,然后调用createChain重新构造channel信息。在core/peer/peer.go:createChain这个方法里,也是从ledger里获取到配置信息,从而得到orderer地址ordererAddresses := bundle.ChannelConfig().OrdererAddresses(),然后调用service.GetGossipService().InitializeChannel。在gossip/service/gossip_service.go:InitializeChannel里,启动deliver服务需要经过判断是否节点是leader peer,即peer.gossip.useLeaderElection(通过选举的方式动态决定leader peer)和peer.gossip.orgLeader(静态指定当前peer为leader peer),这两种方式是互斥的。前面提到过,只有leader才与orderer进行交互,如果不是,则返回,无需继续往下启动deliver服务。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// Parameters:
// - peer.gossip.useLeaderElection
// - peer.gossip.orgLeader
//
// are mutual exclusive, setting both to true is not defined, hence peer will panic and terminate
leaderElection := viper.GetBool("peer.gossip.useLeaderElection")
isStaticOrgLeader := viper.GetBool("peer.gossip.orgLeader")

if leaderElection && isStaticOrgLeader {
logger.Panic("Setting both orgLeader and useLeaderElection to true isn't supported, aborting execution")
}

if leaderElection {
logger.Debug("Delivery uses dynamic leader election mechanism, channel", chainID)
g.leaderElection[chainID] = g.newLeaderElectionComponent(chainID, g.onStatusChangeFactory(chainID, support.Committer))
} else if isStaticOrgLeader {
logger.Debug("This peer is configured to connect to ordering service for blocks delivery, channel", chainID)
g.deliveryService[chainID].StartDeliverForChannel(chainID, support.Committer, func() {})
} else {
logger.Debug("This peer is not configured to connect to ordering service for blocks delivery, channel", chainID)
}

假设当前节点是通过静态指定为leader,则进入创建deliver服务g.deliveryService[chainID].StartDeliverForChannel.在这个方法里主要分两部。

  1. 调用client := d.newClient(chainID, ledgerInfo)构造client。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    func (d *deliverServiceImpl) newClient(chainID string, ledgerInfoProvider blocksprovider.LedgerInfo) *broadcastClient {
    requester := &blocksRequester{
    tls: viper.GetBool("peer.tls.enabled"),
    chainID: chainID,
    }
    broadcastSetup := func(bd blocksprovider.BlocksDeliverer) error {
    return requester.RequestBlocks(ledgerInfoProvider)
    }

    connProd := comm.NewConnectionProducer(d.conf.ConnFactory(chainID), d.conf.Endpoints)
    bClient := NewBroadcastClient(connProd, d.conf.ABCFactory, broadcastSetup, backoffPolicy)
    requester.client = bClient
    return bClient
    }

    func NewBroadcastClient(prod comm.ConnectionProducer, clFactory clientFactory, onConnect broadcastSetup, bos retryPolicy) *broadcastClient {
    return &broadcastClient{prod: prod, onConnect: onConnect, shouldRetry: bos, createClient: clFactory, stopChan: make(chan struct{}, 1)}
    }
  2. 构建blocksProviderImpl,并与channel对应存储d.blockProviders[chainID] = blocksprovider.NewBlocksProvider.然后调用go d.launchBlockProvider(chainID, finalizer),进而调用了core/deliverservice/blocksprovider/blocksprovider.go#DeliverBlocks。在这里调用core/deliverservice/client.go#Recv方法,一路调用直到调用client.go#connect

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    func (bc *broadcastClient) doAction(action func() (interface{}, error), actionOnNewConnection func()) (interface{}, error) {
    if bc.conn == nil {
    err := bc.connect()
    actionOnNewConnection()
    }
    resp, err := action()
    if err != nil {
    bc.Disconnect(false)
    return nil, err
    }
    return resp, nil
    }

    func (bc *broadcastClient) connect() error {
    // ...
    abc, err := bc.createClient(conn).Deliver(ctx)
    err = bc.afterConnect(conn, abc, cf, endpoint)
    // ...
    }

    func (bc *broadcastClient) afterConnect(conn *grpc.ClientConn, abc orderer.AtomicBroadcast_DeliverClient, cf context.CancelFunc, endpoint string) error {
    // ...
    bc.BlocksDeliverer = abc
    err := bc.onConnect(bc)
    // ...
    }

  • 这里先构建grpc client abc, err := bc.createClient(conn).Deliver(ctx).这里的createClient即上面构造broadcastClient时传入的参数d.conf.ABCFactory。这个参数可以回溯到gossip/service/gossip_service.go#Service构造时(这个方法在前面提及的InitializeChannel里被调用),指定ABCFactory: deliverclient.DefaultABCFactory。可见,这里对应的grpc方法即/orderer.AtomicBroadcast/Deliver.

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    func DefaultABCFactory(conn *grpc.ClientConn) orderer.AtomicBroadcastClient {
    return orderer.NewAtomicBroadcastClient(conn)
    }

    func NewAtomicBroadcastClient(cc *grpc.ClientConn) AtomicBroadcastClient {
    return &atomicBroadcastClient{cc}
    }

    func (c *atomicBroadcastClient) Deliver(ctx context.Context, opts ...grpc.CallOption) (AtomicBroadcast_DeliverClient, error) {
    stream, err := grpc.NewClientStream(ctx, &_AtomicBroadcast_serviceDesc.Streams[1], c.cc, "/orderer.AtomicBroadcast/Deliver", opts...)

    x := &atomicBroadcastDeliverClient{stream}
    return x, nil
    }
  • 然后在connect方法里设置bc.BlocksDeliverer = abc,并且调用err = bc.afterConnect(conn, abc, cf, endpoint),进而调用了err := bc.onConnect(bc).在上面的broadcastClient构造时传入参数onConnectbroadcastSetup,即执行requester.RequestBlocks(ledgerInfoProvider)。这里实际上是向orderer请求block,header为common.HeaderType_DELIVER_SEEK_INFO,同时分为两种情况,新的channel从oldest开始消费同步,而重启的channel则从指定的block开始(ledgerInfoProvider.LedgerHeight())。以上为start值,stop选择math.MaxUint64,Behavior选择orderer.SeekInfo_BLOCK_UNTIL_READY.回顾前文,这里表明是一直同步。这里发出的Envelope也就是orderer接收。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    func (b *blocksRequester) RequestBlocks(ledgerInfoProvider blocksprovider.LedgerInfo) error {
    height, err := ledgerInfoProvider.LedgerHeight()
    if height > 0 {
    if err := b.seekLatestFromCommitter(height); err != nil{ return err }
    } else {
    if err := b.seekOldest(); err != nil { return err}
    }
    return nil
    }

    func (b *blocksRequester) seekOldest() error {
    seekInfo := &orderer.SeekInfo{
    Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Oldest{Oldest: &orderer.SeekOldest{}}},
    Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
    Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
    }

    //TODO- epoch and msgVersion may need to be obtained for nowfollowing usage in orderer/configupdate/configupdate.go
    msgVersion := int32(0)
    epoch := uint64(0)
    tlsCertHash := b.getTLSCertHash()
    env, err := utils.CreateSignedEnvelopeWithTLSBinding(common.HeaderType_DELIVER_SEEK_INFO, b.chainID, localmsp.NewSigner(), seekInfo, msgVersion, epoch, tlsCertHash)
    return b.client.Send(env)
    }

    func (b *blocksRequester) seekLatestFromCommitter(height uint64) error {
    seekInfo := &orderer.SeekInfo{
    Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: height}}},
    Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}},
    Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY,
    }

    // ...
    }
  1. 接下来转向消息的接收。真正处理DeliverResponse的地方在blocksprovider.go#DeliverBlocks,这里for循环判断!b.isDone()则使用msg, err := b.client.Recv()接收信息。core/deliverservice/client.go#Recv这个方法里,使用try方法,实际上就是重试机制,当失败时,并且没有达到重试阈值时(前文构造client时传入的参数backoffPolicy)则重新执行,这里包括先判断connection是否为nil,如果是则调用前面分析过的connect方法,然后调用action,也就是b.client.Recv()传入的方法参数,实际上是执行bc.BlocksDeliverer.Recv()。这里注意,在前面分析的afterConnect方法里,已经设置bc.BlocksDeliverer=abc(即orderer.AtomicBroadcast_DeliverClient),所以这里就是从orderer的deliver grpc stream里读取,然后逐级返回到blocksprovider.go#DeliverBlocks。处理接收到的block这部分与gossip相关,留待后续再一起分析。