Fabric 1.4源码分析 - chaincode instantiate(3)endorser的propocessProposal主过程

由peer的grpc请求到/protos.Endorser/ProcessProposal,在endorser里对应着protos/peer/peer.pb.go#_Endorser_ProcessProposal_Handler,进入处理方法core/endorser/endorser.go#ProcessProposal。endorser的处理过程主要分为三个阶段,preProcess, simulate和endorse。其中,preProcess主要做校验工作,包括数据结构的完整性(header),唯一性防止重放攻击,校验签名,对application chaincode(以后简称acc)检查ACL策略等等。这部分校验比较直观和简单,这里不做深入分析。需要注意的是,在这里还获取了TX模拟器txsim, err = e.s.GetTxSimulator(chainID, txid);和历史请求执行器historyQueryExecutor, err = e.s.GetHistoryQueryExecutor(chainID);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// GetTxSimulator returns the transaction simulator for the specified ledger
// a client may obtain more than one such simulator; they are made unique
// by way of the supplied txid
func (s *SupportImpl) GetTxSimulator(ledgername string, txid string) (ledger.TxSimulator, error) {
lgr := s.Peer.GetLedger(ledgername)
return lgr.NewTxSimulator(txid)
}

// NewTxSimulator implements method in interface `txmgmt.TxMgr`
func (txmgr *LockBasedTxMgr) NewTxSimulator(txid string) (ledger.TxSimulator, error) {
s, err := newLockBasedTxSimulator(txmgr, txid)
txmgr.commitRWLock.RLock()
return s, nil
}

这个模拟器和执行器是对应于账本ledger的,也就是channel,入参ledgername就是channelId。chainless的proposals的ledger.TxSimulator和ledger.HistoryQueryExecutor都是nil。模拟器是基于锁的,在生成模拟器时,同时加上了读锁,在下一阶段simulate结束后就立即释放。同时,也生成了历史数据查询器。关于这个锁,代码里有下面一段解释。

txsim acquires a shared lock on the stateDB. As this would impact the block commits (i.e., commit of valid write-sets to the stateDB), we must release the lock as early as possible. Hence, this txsim object is closed in simulateProposal() as soon as the tx is simulated and rwset is collected before gossip dissemination if required for privateData.

接下来主要分析simulate过程。在endorser.SimulateProposal这个方法中,可以看到存在多处对当前chaincode是否System chaincode系统链码的判断。(注意,当前请求channel是lscc)在当前篇幅里主要分析instantiate的过程,其他的判断分支以后再详述。在SimulateProposal里,首先判断当前是否是scc,这里是lscc,进入分支获取util.GetSysCCVersion(),scc的版本固定为string常量‘latest’.
整个流程方法调用顺序为SimulateProposal->endorser.callChaincode->endorser.support.Execute->endorser.support.ChaincodeSupport.Execute->ChaincodeSupport.Invoke->ChaincodeSupport.execute->cs.HandlerRegistry.Handler(cname).Execute
在这里,首先关键方法是e.callChaincode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/ call specified chaincode (system or user)
func (e *Endorser) callChaincode(ctxt context.Context, chainID string, version string, txid string, signedProp *pb.SignedProposal, prop *pb.Proposal, cis *pb.ChaincodeInvocationSpec, cid *pb.ChaincodeID, txsim ledger.TxSimulator) (*pb.Response, *pb.ChaincodeEvent, error) {
ctxt = context.WithValue(ctxt, chaincode.TXSimulatorKey, txsim)

// is this a system chaincode
scc := e.s.IsSysCC(cid.Name)
res, ccevent, err = e.s.Execute(ctxt, chainID, cid.Name, version, txid, scc, signedProp, prop, cis)

// ----- BEGIN - SECTION THAT MAY NEED TO BE DONE IN LSCC ------
// if this a call to deploy a chaincode, We need a mechanism
// to pass TxSimulator into LSCC. Till that is worked out this
// special code does the actual deploy, upgrade here so as to collect
// all state under one TxSimulator
//
// NOTE that if there's an error all simulation, including the chaincode table changes in lscc will be thrown away
if cid.Name == "lscc" && len(cis.ChaincodeSpec.Input.Args) >= 3 && (string(cis.ChaincodeSpec.Input.Args[0]) == "deploy" || string(cis.ChaincodeSpec.Input.Args[0]) == "upgrade") {
userCDS, err := putils.GetChaincodeDeploymentSpec(cis.ChaincodeSpec.Input.Args[2])

// this should not be a system chaincode

_, _, err = e.s.Execute(ctxt, chainID, cds.ChaincodeSpec.ChaincodeId.Name, cds.ChaincodeSpec.ChaincodeId.Version, txid, false, signedProp, prop, cds)
}
// ----- END -------

return res, ccevent, err
}

此处将模拟器txsim加入context。此处存在两次调用endorser.support.Execute.第一次调用的是外层的ChaincodeInvokeSpeccis。在endorser.support.Execute里先构建ccprovider.NewCCContext(cccid),这里的cccid.canonicalName设置为name + ":" + version,即lscc:latest。然后获取系统定义的decorator,这个是在node启动时peer/node.go#Serve里初始化的,从配置文件core.yaml里读取配置。配置文件里的定义:”append or mutate the chaincode input passed to the chaincode”. library: /opt/lib/decorator.so。这里将decorator作用于此cis。下一步就是调用ChaincodeSupport.Execute
ChaincodeSupport.Execute先执行ChaincodeSupport.Invoke。在invoke方法里,先执行cs.Launch(ctxt, cccid, spec)。这里执行的chaincode是lscc,系统链码已经在启动阶段launch状态为running了,这个在后面会说到,因此直接返回到ChaincodeSupport.Invoke。然后用chaincodeSpec.Input(这里其实是["deploy", "${channelId}", "${chaincodDeploySpec}"])构建pb.ChaincodeMessage,进入到Chaincode.execute方法执行cs.execute(ctxt, cccid, ccMsg)。这里的pb.ChaincodeMessage类型为pb.ChaincodeMessage_TRANSACTION(cis).

Launch starts executing chaincode if it is not already running. This method blocks until the peer side handler gets into ready state or encounters a fatal error. If the chaincode is already running, it simply returns.

下面是三个入参的结构

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
ctx = context.WithValue(ctx, chaincode.HistoryQueryExecutorKey, historyQueryExecutor)
ctxt = context.WithValue(ctxt, chaincode.TXSimulatorKey, txsim)

cccid := &CCContext{
ChainID: cname, // channelID
Name: name, // ChaincodeID.name (lscc)
Version: version, // metadata.Version (latest)
TxID: txid,
Syscc: syscc, // is this a system chaincode (true)
SignedProposal: signedProp,
Proposal: prop,
canonicalName: name + ":" + version,
ProposalDecorations: nil,
}

ccmsg := &pb.ChaincodeMessage{
Type: messageType, // pb.ChaincodeMessage_TRANSACTION
Payload: payload, // chaincodeSpec.Input (cis.cs.Input)
Txid: txid,
ChannelId: cid,
}

ChaincodeSupport.execute先判断处理当前chaincode(根据canonicalName区分,lscc:latest)的handler是否已经注册,这个handler注册的过程后面介绍。然后获取这个handler执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
func (h *Handler) Execute(ctxt context.Context, cccid *ccprovider.CCContext, msg *pb.ChaincodeMessage, timeout time.Duration) (*pb.ChaincodeMessage, error) {
txctx, err := h.TXContexts.Create(ctxt, msg.ChannelId, msg.Txid, cccid.SignedProposal, cccid.Proposal)
defer h.TXContexts.Delete(msg.ChannelId, msg.Txid)

h.setChaincodeProposal(cccid.SignedProposal, cccid.Proposal, msg)

h.serialSendAsync(msg, true)

var ccresp *pb.ChaincodeMessage
select {
case ccresp = <-txctx.ResponseNotifier:
// response is sent to user or calling chaincode. ChaincodeMessage_ERROR are typically treated as error
case <-time.After(timeout):
err = errors.New("timeout expired while executing transaction")
}

return ccresp, err
}

// serialSendAsync serves the same purpose as serialSend (serialize msgs so gRPC will be happy). In addition, it is also asynchronous so send-remoterecv--localrecv loop can be nonblocking. Only errors need to be handled and these are handled by communication on supplied error channel. A typical use will be a non-blocking or nil channel
func (h *Handler) serialSendAsync(msg *pb.ChaincodeMessage, sendErr bool) {
go func() {
if err := h.serialSend(msg); err != nil {
if sendErr {
// provide an error response to the caller
h.Notify(resp)
}
}
}()
}

func (h *Handler) Notify(msg *pb.ChaincodeMessage) {
tctx := h.TXContexts.Get(msg.ChannelId, msg.Txid)
tctx.ResponseNotifier <- msg
tctx.CloseQueryIterators()
}

// serialSend serializes msgs so gRPC will be happy
func (h *Handler) serialSend(msg *pb.ChaincodeMessage) error {
h.serialLock.Lock()
defer h.serialLock.Unlock()

if err := h.chatStream.Send(msg); err != nil {}
}

  1. 构造TransactionContext,其中有个参数为ResponseNotifier: make(chan *pb.ChaincodeMessage, 1),用以异步执行时接收结果。构造前先判断channelID和txId的组合是否已存在,构造过程加锁,避免重复处理。
  2. core/chaincode/handler.go#serialSendAsync异步处理,实际上即启用go routine调用handler#serialSend,使用handler.chatStream发送,此处也是grpc调用。错误以及结果都发送到txctx.ResponseNotifie这个chan内(rpc结果通过调用handler#Notify)。由于是异步调用,函数立即返回,处理结果在外层handler.Execute等待从这个chan读取数据,这里也加上了超时的机制。这里的lscc的调用后面再分析。

如果这里调用一切正常,流程将退回到callChaincode继续往下走,然后满足“lscc”,参数为“deploy”的条件,所以从cis里的Input参数获取cds并反序列化成ChaincodeDeploymentSpec对象,重复刚才的步骤。这里稍微有些不同的是,这里的pb.ChaincodeMessage类型为pb.ChaincodeMessage_INIT(cds),并且这个用户自定义的application chaincode需要launch。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
// Launch starts executing chaincode if it is not already running. This method
// blocks until the peer side handler gets into ready state or encounters a fatal
// error. If the chaincode is already running, it simply returns.
func (cs *ChaincodeSupport) Launch(ctx context.Context, cccid *ccprovider.CCContext, spec ccprovider.ChaincodeSpecGetter) error {
cname := cccid.GetCanonicalName()
if cs.HandlerRegistry.Handler(cname) != nil {
return nil
}

// UserRunsCC值一直回溯到 node.start#registerChaincodeSupport 里的 userRunsCC := chaincode.IsDevMode()
// 这个值实际上是配置文件 core.yaml 里的 peer.mode : net,配置文件的描述如下
// # In dev mode, user runs the chaincode after starting peer from command line on local machine.
// # In net mode, peer will run chaincode in a docker container.

if cs.UserRunsCC && !cccid.Syscc {
chaincodeLogger.Error(
"You are attempting to perform an action other than Deploy on Chaincode that is not ready and you are in developer mode.",
)
}

// The only user of this context value is the in-process controller used to support system chaincode.
// context加入“CCHANDLER” : ChaincodeSupport
ctx = context.WithValue(ctx, ccintf.GetCCHandlerKey(), cs)

return cs.Launcher.Launch(ctx, cccid, spec)
}


// core/chaincode/runtime_launcher.go
// Launch chaincode with the appropriate runtime.
func (r *RuntimeLauncher) Launch(ctx context.Context, cccid *ccprovider.CCContext, spec ccprovider.ChaincodeSpecGetter) error {
chaincodeID := spec.GetChaincodeSpec().ChaincodeId
cds, _ := spec.(*pb.ChaincodeDeploymentSpec)
if cds == nil {
cds, err = r.getDeploymentSpec(ctx, cccid, chaincodeID)
}

if cds.CodePackage == nil && cds.ExecEnv != pb.ChaincodeDeploymentSpec_SYSTEM {
ccpack, err := r.PackageProvider.GetChaincode(chaincodeID.Name, chaincodeID.Version)
cds = ccpack.GetDepSpec()
}

err := r.start(ctx, cccid, cds)
return nil
}

这里需要部署chaincode需要代码code,在instantiate的cds里参数CodePackage为nil,同时ExecEnv=pb.ChaincodeDeploymentSpec_DOCKER。这里是通过r.PackageProvider.GetChaincode(chaincodeID.Name, chaincodeID.Version)获取具体的代码,实际上是core/common/ccprovider/ccprovider.go#GetChaincodeFromPath从本地路径获取,而这些code是在peer chaincode install时由lscc放到本地目录下,在instantiate时按照chaincodeName和chaincodeVersion取出。这里的path为chaincodeInstallPath,这个是peer启动时从配置文件core.yaml里读取的,配置项为peer.fileSystemPath.

然后执行r.start(ctx, cccid, cds)用这部分代码启动。HandlerRegistry.Launching(cname)先注册launching状态,防止被重复部署。接着启动go routine执行r.Runtime.Start(ctx, cccid, cds)。这里首先获取容器启动参数设置,环境变量等配置,这些配置将作用于后续启动的容器。这里需要留意的是,设置了lc.Args = []string{"chaincode", fmt.Sprintf("-peer.address=%s", c.PeerAddress)},这里是chaincode容器启动的执行命令。然后根据cds的ExecEnv设置选择容器类型,这里共有两种类型,SYSTEM和DOCKER,用户定义的chaincode是启动在docker容器里的。然后开始启动容器的步骤c.Processor.Process(ctxt, vmtype, scr);。先对容器名加锁vmc.lockContainer(ccid.GetName())防止同时执行创建容器操作,然后执行container.StartContainerReqDo方法。这里的dockercontroller.Start方法是真正创建容器的地方。

  1. 构造镜像名(需要保证唯一性)和容器名,停止重名的容器(如果存在,stop,kill,remove),然后使用该镜像创建容器。创建过程遇到err == docker.ErrNoSuchImage,事实上这是必须的,因为要保证是第一次部署,当前环境没有该镜像。入参提供了container.Builder,此处尝试重新构建和部署镜像,然后重新创建容器。如果还是失败,则排除了镜像不存在的错误原因,直接返回错误结果。
  2. 这里,存在attachStdout := viper.GetBool("vm.docker.attachStdout")这个配置,将容器的输出直接到控制台,方便开发调试。
  3. 将需要上传的文件传到容器里,这里的文件是各种配置,例如TLS的key和cert。
  4. 使用第三方库fsouza/go-dockerclient来启动docker容器err = client.StartContainer(containerName, nil).
  5. 由于启动容器是异步操作,马上返回到runtime_launcher.start,这里是select-cast,等待启动完成的chan信号,或者异常,或者超时。如果启动失败,则停止容器,取消注册RuntimeLauncher.Registry里到handler以及launchState状态。

core/container/dockercontroller/dockercontroller.go:preFormatImageName定义了docker镜像和容器名,其中,镜像名为${NetworkID}-${PeerId}-${ChaincodeName}-${Version}-${Hash},容器名为${NetworkID}-${PeerId}-${ChaincodeName}-${Version},其中NetworkID从配置文件core.yml里的peer.NetworkId配置项。

这部分docker相关比较繁琐,暂时没有时间精力深入学习和分析源码,可以参考fabric源码解析20——ACC的部署