Fabric 1.4源码分析 - chaincode instantiate(6)application chaincode的初始化

接着继续分析acc的初始化过程,在instantiate(3)里提到endorser.callChaincode有两次的ChaincodeSupport#Execute调用,由代码分析两次不同的ChaincodeMessageMessageType分别为MessageType_TRANSACTION(对应cis)和MessageType_INIT(对应cds),相应调用lscc的Invoke和acc的Init。具体过程可以参考上节分析。

先看lscc的Invoke.参考上节提及的execute在两端chaincode/handler.go(peer端)和shim/handler.go(chaincode容器端)的流程,lscc相当于chaincode容器(尽管是inproccontainer),使用shim/handler.go#handleReadyselect-case,lscc的ChaincodeMessage_TRANSACTION进入handleTransaction方法。这里的主要流程与上节介绍的handleInit一致,不同的是这里调用的是lscc的Invoke方法。这里需要强调一点,在handleTransaction,handleInit这些handleXXX方法里,进入就是启动goroutinne来运行所有的逻辑,这样可以立刻返回并行来处理下一个请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Invoke implements lifecycle functions "deploy", "start", "stop", "upgrade".
// Deploy's arguments - {[]byte("deploy"), []byte(<chainname>), <unmarshalled pb.ChaincodeDeploymentSpec>}
//
// Invoke also implements some query-like functions
// Get chaincode arguments - {[]byte("getid"), []byte(<chainname>), []byte(<chaincodename>)}
func (lscc *lifeCycleSysCC) Invoke(stub shim.ChaincodeStubInterface) pb.Response {

function := string(args[0])
switch function {
case DEPLOY, UPGRADE:
channel := string(args[1])
// 获取channel的设置,这个在create channel时传入配置
ac, exists := lscc.sccprovider.GetApplicationConfig(channel)

// the maximum number of arguments depends on the capability of the channel
// 校验当前传入参数与channel配置(ac)是否冲突,这里校验主要是ac.Capabilities().PrivateChannelData()

depSpec := args[2]
cds, err := utils.GetChaincodeDeploymentSpec(depSpec)

// optional arguments here (they can each be nil and may or may not be present)
// args[3] is a marshalled SignaturePolicyEnvelope representing the endorsement policy
// args[4] is the name of escc, default "escc"(系统自带)
// args[5] is the name of vscc, default "vscc"(系统自带)
// args[6] is a marshalled CollectionConfigPackage struct
var EP []byte
p := cauthdsl.SignedByAnyMember(peer.GetMSPIDs(channel))
EP, err = utils.Marshal(p)

var collectionsConfig []byte
// we proceed with a non-nil collection configuration only if we support the PrivateChannelData capability
// PrivateChannelData, channel内的私有数据
if ac.Capabilities().PrivateChannelData() && len(args) > 6 {
collectionsConfig = args[6]
}

cd, err := lscc.executeDeployOrUpgrade(stub, channel, cds, EP, escc, vscc, collectionsConfig, function)
cdbytes, err := proto.Marshal(cd)
return shim.Success(cdbytes)
}
...
}

进入lscc的Invoke方法。回顾instantiate(2)里cis的数据结构"Args" : ["deploy", "${channelId}", "${chaincodDeploySpec}"],这里的select-case进入DEPLOY,UPGRADE分支。分支里主要是校验以及处理传入参数,包括指定的scc和private data相关,重要的方法是lscc.executeDeployOrUpgrade,传参就是刚获取和校验过的参数。

进入executeDeployOrUpgrade。首先执行lscc.support.GetChaincodeFromLocalStorage(chaincodeName, chaincodeVersion),获取ccprovider.CCPackage,这是安装阶段执行peer chaincode install命令时打包并且上传到该节点的chaincode定义,放在peer节点本地目录的(这个在install里具体分析其结构)。然后执行lscc.executeDeploy

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// executeDeploy implements the "instantiate" Invoke transaction
func (lscc *lifeCycleSysCC) executeDeploy(
stub shim.ChaincodeStubInterface,
chainname string,
cds *pb.ChaincodeDeploymentSpec,
policy []byte,
escc []byte,
vscc []byte,
cdfs *ccprovider.ChaincodeData,
ccpackfs ccprovider.CCPackage,
collectionConfigBytes []byte,
) (*ccprovider.ChaincodeData, error) {
//just test for existence of the chaincode in the LSCC
chaincodeName := cds.ChaincodeSpec.ChaincodeId.Name
_, err := lscc.getCCInstance(stub, chaincodeName)

//retain chaincode specific data and fill channel specific ones
cdfs.Escc = string(escc)
cdfs.Vscc = string(vscc)
cdfs.Policy = policy

// retrieve and evaluate instantiation policy
cdfs.InstantiationPolicy, err = lscc.support.GetInstantiationPolicy(chainname, ccpackfs)

// get the signed instantiation proposal
signedProp, err := stub.GetSignedProposal()
err = lscc.support.CheckInstantiationPolicy(signedProp, chainname, cdfs.InstantiationPolicy)

err = lscc.putChaincodeData(stub, cdfs)

err = lscc.putChaincodeCollectionData(stub, cdfs, collectionConfigBytes)
return cdfs, nil
}

这里先检查该chaincode是否已经部署过lscc.getCCInstance(stub, chaincodeName),实际上是调用stub.GetState(ccname),构建&pb.ChaincodeMessage{Type: pb.ChaincodeMessage_PUT_STATE, Payload: payloadBytes, Txid: txid, ChannelId: channelId}发送到对端chaincode/handler。这里会执行到handler.callPeerWithChaincodeMsg方法。其实里面就是hanlder本身维护着一个名叫responseChannel的map[string]chan pb.ChaincodeMessage, 先创建一个channel放入map中,key是由channelID和txid构造而成,即关联当前请求,在退出方法时使用defer从map中删除这个channle。而在这个过程中,在handler.sendReceive里使用handler.serialSendAsync发送ChaincodeMessage,然后select-case一直等待从channel里读取消息。

相应的,当对端回复消息pb.ChaincodeMessage_RESPONSEshim/handler会进入fabric/core/chaincode/shim/handler.go:handleReady->handler.sendChannel(msg),这里从responseChannel这个map里拿出相应的channel,放入消息。实际上,通过这个实现了异步通信,结合上面提到的每次进入handlerXXX方法都使用goroutine处理而立刻返回,实现了其并发通信。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// callPeerWithChaincodeMsg sends a chaincode message (for e.g., GetState along with the key) to the peer for a given txid and receives the response.
func (handler *Handler) callPeerWithChaincodeMsg(msg *pb.ChaincodeMessage, channelID, txid string) (pb.ChaincodeMessage, error) {
// Create the channel on which to communicate the response from the peer
var respChan chan pb.ChaincodeMessage
var err error
if respChan, err = handler.createChannel(channelID, txid); err != nil {
return pb.ChaincodeMessage{}, err
}

defer handler.deleteChannel(channelID, txid)

return handler.sendReceive(msg, respChan)
}

//sends a message and selects
func (handler *Handler) sendReceive(msg *pb.ChaincodeMessage, c chan pb.ChaincodeMessage) (pb.ChaincodeMessage, error) {
errc := make(chan error, 1)

for {
select {
case outmsg, val := <-c:
return outmsg, nil
}
}
}

func (handler *Handler) sendChannel(msg *pb.ChaincodeMessage) error {
handler.Lock()
defer handler.Unlock()
txCtxID := handler.getTxCtxId(msg.ChannelId, msg.Txid)
handler.responseChannel[txCtxID] <- *msg
return nil
}

chaincode/handler对该类型请求在hanldeMessageHandleGetState一路判断最后执行res, err = txContext.TXSimulator.GetState(chaincodeName, getState.Key),从本地账本db获取namespace为lscc,key为${chaincodeName}的数据(最终执行queryHelper.txmgr.db.GetState(ns, key)。在executeDeploy稍后可以看到,执行lscc.putChaincodeData(stub, cdfs)操作时执行相对应的put state操作,即deploy之后会把该chaincode写入peer本地db的lscc的namespace里。因此,通过校验get回来的数据为空来判断该chaincode没有deploy过。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/helper.go:40
func (h *queryHelper) getState(ns string, key string) ([]byte, error) {
if err := h.checkDone(); err != nil {
return nil, err
}
versionedValue, err := h.txmgr.db.GetState(ns, key)
val, ver := decomposeVersionedValue(versionedValue)
if h.rwsetBuilder != nil {
h.rwsetBuilder.AddToReadSet(ns, key, ver)
}
return val, nil
}

// core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/helper.go
func decomposeVersionedValue(versionedValue *statedb.VersionedValue) ([]byte, *version.Height) {
var value []byte
var ver *version.Height
if versionedValue != nil {
value = versionedValue.Value
ver = versionedValue.Version
}
return value, ver
}

// core/ledger/kvledger/txmgmt/rwsetutil/rwset_builder.go
// AddToReadSet adds a key and corresponding version to the read-set
func (b *RWSetBuilder) AddToReadSet(ns string, key string, version *version.Height) {
nsPubRwBuilder := b.getOrCreateNsPubRwBuilder(ns)
nsPubRwBuilder.readMap[key] = NewKVRead(key, version)
}

从上面看出,账本的value是statedb.VersionedValue结构,包含value和version(结构为*version.Height{BlockNum uint64, TxNum uint64})。获取的结果加入readSet,其中参数(namesapce:lscc,key:${chaincodeName}, ver: 版本)

然后获取并检查部署策略lscc.support.CheckInstantiationPolicy。然后执行lscc.putChaincodeData(stub, cdfs),调用的是stub.PutState(cd.Name, cdbytes)。入参是chaincode.Name和ccprovider.ChaincodeData序列化字节,这个是通过从本地存储的codepackage和刚刚传入的escc,vscc等参数构建成的。向对端chaincode/handler发送pb.ChaincodeMessage_PUT_STATE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
//-------- ChaincodeData is stored on the LSCC -------

// ChaincodeData defines the datastructure for chaincodes to be serialized by proto
// Type provides an additional check by directing to use a specific package after instantiation
// Data is Type specifc (see CDSPackage and SignedCDSPackage)
type ChaincodeData struct {
// Name of the chaincode
Name string `protobuf:"bytes,1,opt,name=name"`
Version string `protobuf:"bytes,2,opt,name=version"`
Escc string `protobuf:"bytes,3,opt,name=escc"`
Vscc string `protobuf:"bytes,4,opt,name=vscc"`
// Policy endorsement policy for the chaincode instance
Policy []byte `protobuf:"bytes,5,opt,name=policy,proto3"`
// Data data specific to the package
Data []byte `protobuf:"bytes,6,opt,name=data,proto3"`
// Id of the chaincode that's the unique fingerprint for the CC This is not currently used anywhere but serves as a good eyecatcher
Id []byte `protobuf:"bytes,7,opt,name=id,proto3"`
// InstantiationPolicy for the chaincode
InstantiationPolicy []byte `protobuf:"bytes,8,opt,name=instantiation_policy,proto3"`
}

数据分为普通channle公开信息和private消息,这里只讨论前者。相应的,对端chaincode/handlerhanldeMessageHandlePutState一路判断最后执行res, err = txContext.TXSimulator.SetState(chaincodeName, putState.Key, putState.Value).跟上面区别的是,这里在ockbased_tx_simulator.go文件里,清楚指明这是模拟交易。同时,只把(namespace:lscc, key:${chaincode.Name}, value:${ChaincodeData}序列化字节)的组合加入到writeSet内,然后就返回结果ChaincodeMessage_RESPONSE或者错误消息 ChaincodeMessage_ERROR,并没有真正作用于账本db。同时,没有指定version,这个是在orderer排序打包时添加的。关于这里的read-write set,可以参考官网概念Read-Write set semantics

1
2
3
4
5
6
7
8
// core/ledger/kvledger/txmgmt/txmgr/lockbasedtxmgr/lockbased_tx_simulator.go
func (s *lockBasedTxSimulator) SetState(ns string, key string, value []byte) error {
err := s.helper.checkDone()
err := s.checkBeforeWrite()
err := s.helper.txmgr.db.ValidateKeyValue(key, value)
s.rwsetBuilder.AddToWriteSet(ns, key, value)
return nil
}

最后列举下Invoke lscc后从shim/handler返回到chaincode/handler的数据结构pb.ChaincodeMessage

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// pb.ChaincodeMessage
{
"Type": pb.ChaincodeMessage_COMPLETED,
"Payload": ${pb.Response}.byte[],
"Txid": ${msg.Txid},
"ChaincodeEvent": ${pb.chaincodeEvent}, // nil
"ChannelId": ${stub.ChannelId}


// pb.Response
{
"Status": 200, // ok
"Payload": ${ChaincodeData}.byte[]
}

// pb.ChaincodeData
{
"Name": ${application chaincode.name}, // CDSPackage.depSpec.ChaincodeSpec.ChaincodeId.Name, 同“Data”
"Version": {application chaincode.version}, // CDSPackage.depSpec.ChaincodeSpec.ChaincodeId.Version, 同“Data”
"Escc": "escc",
"Vscc": "vscc",
"Policy": ${cb.SignaturePolicyEnvelope}.byte[], // endorsement policy, 来源于lscc.go#Invoke, cauthdsl.SignedByAnyMember(peer.GetMSPIDs(channel))
"Data": , // CDSPackage.datab, 执行lscc.support.GetChaincodeFromLocalStorage(chaincodeName, chaincodeVersion).GetChaincodeData(),来源于install chaincode打包构建的fabric/core/common/ccprovider/ccprovider.go:GetCCPackage
"Id": , // CDSPackage.id, 同“Data”
"InstantiationPolicy": ${InstantiationPolicy}.byte[] // 来源于lscc.go#executeDeploy, lscc.support.GetInstantiationPolicy(chainname, ccpackfs)
}


这里继续直接看acc的Init,之前的步骤与lscc完全一致,可以参考上节。Tutorial里的例子的chaincode代码在github.com/hyperledger/fabric/examples/chaincode/go/example02/chaincode.go,里面的Init方法主要是读入参数,然后调用stub.PutState后返回success。细节如上,不再复述。