链码容器启动
peer节点背书提案的入口方法 endorser.go
的 ProcessProposal()
方法
func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedProposal) (*pb.ProposalResponse, error) {
// start time for computing elapsed time metric for successfully endorsed proposals
// 获取peer节点处理提案的开始时间
startTime := time.Now()
// Peer节点收到提案的数量+1
e.Metrics.ProposalsReceived.Add(1)
// 从上下文中获取发起提案的地址
addr := util.ExtractRemoteAddress(ctx)
endorserLogger.Debug("Entering: request from", addr)
提案预处理
对提案进行模拟执行
endorser.go
的SimulateProposal()
方法
func (e *Endorser) SimulateProposal(txParams *ccprovider.TransactionParams, cid *pb.ChaincodeID) (ccprovider.ChaincodeDefinition, *pb.Response, []byte, *pb.ChaincodeEvent, error) {
endorserLogger.Debugf("[%s][%s] Entry chaincode: %s", txParams.ChannelID, shorttxid(txParams.TxID), cid)
defer endorserLogger.Debugf("[%s][%s] Exit", txParams.ChannelID, shorttxid(txParams.TxID))
// we do expect the payload to be a ChaincodeInvocationSpec
// if we are supporting other payloads in future, this be glaringly point
// as something that should change
cis, err := putils.GetChaincodeInvocationSpec(txParams.Proposal)
if err != nil {
return nil, nil, nil, nil, err
}
var cdLedger ccprovider.ChaincodeDefinition
var version string
if !e.s.IsSysCC(cid.Name) {
cdLedger, err = e.s.GetChaincodeDefinition(cid.Name, txParams.TXSimulator)
if err != nil {
return nil, nil, nil, nil, errors.WithMessage(err, fmt.Sprintf("make sure the chaincode %s has been successfully instantiated and try again", cid.Name))
}
version = cdLedger.CCVersion()
err = e.s.CheckInstantiationPolicy(cid.Name, version, cdLedger)
if err != nil {
return nil, nil, nil, nil, err
}
} else {
version = util.GetSysCCVersion()
}
// ---3. execute the proposal and get simulation results
var simResult *ledger.TxSimulationResults
var pubSimResBytes []byte
var res *pb.Response
var ccevent *pb.ChaincodeEvent
res, ccevent, err = e.callChaincode(txParams, version, cis.ChaincodeSpec.Input, cid)
if err != nil {
endorserLogger.Errorf("[%s][%s] failed to invoke chaincode %s, error: %+v", txParams.ChannelID, shorttxid(txParams.TxID), cid, err)
return nil, nil, nil, nil, err
}
if txParams.TXSimulator != nil {
if simResult, err = txParams.TXSimulator.GetTxSimulationResults(); err != nil {
txParams.TXSimulator.Done()
return nil, nil, nil, nil, err
}
if simResult.PvtSimulationResults != nil {
if cid.Name == "lscc" {
// TODO: remove once we can store collection configuration outside of LSCC
txParams.TXSimulator.Done()
return nil, nil, nil, nil, errors.New("Private data is forbidden to be used in instantiate")
}
pvtDataWithConfig, err := e.AssemblePvtRWSet(simResult.PvtSimulationResults, txParams.TXSimulator)
// To read collection config need to read collection updates before
// releasing the lock, hence txParams.TXSimulator.Done() moved down here
txParams.TXSimulator.Done()
if err != nil {
return nil, nil, nil, nil, errors.WithMessage(err, "failed to obtain collections config")
}
endorsedAt, err := e.s.GetLedgerHeight(txParams.ChannelID)
if err != nil {
return nil, nil, nil, nil, errors.WithMessage(err, fmt.Sprint("failed to obtain ledger height for channel", txParams.ChannelID))
}
// Add ledger height at which transaction was endorsed,
// `endorsedAt` is obtained from the block storage and at times this could be 'endorsement Height + 1'.
// However, since we use this height only to select the configuration (3rd parameter in distributePrivateData) and
// manage transient store purge for orphaned private writesets (4th parameter in distributePrivateData), this works for now.
// Ideally, ledger should add support in the simulator as a first class function `GetHeight()`.
pvtDataWithConfig.EndorsedAt = endorsedAt
if err := e.distributePrivateData(txParams.ChannelID, txParams.TxID, pvtDataWithConfig, endorsedAt); err != nil {
return nil, nil, nil, nil, err
}
}
txParams.TXSimulator.Done()
if pubSimResBytes, err = simResult.GetPubSimulationBytes(); err != nil {
return nil, nil, nil, nil, err
}
}
return cdLedger, res, pubSimResBytes, ccevent, nil
}
callChaincode()
方法模拟提案执行
func (e *Endorser) callChaincode(txParams *ccprovider.TransactionParams, version string, input *pb.ChaincodeInput, cid *pb.ChaincodeID) (*pb.Response, *pb.ChaincodeEvent, error) {
endorserLogger.Infof("[%s][%s] Entry chaincode: %s", txParams.ChannelID, shorttxid(txParams.TxID), cid)
defer func(start time.Time) {
logger := endorserLogger.WithOptions(zap.AddCallerSkip(1))
elapsedMilliseconds := time.Since(start).Round(time.Millisecond) / time.Millisecond
logger.Infof("[%s][%s] Exit chaincode: %s (%dms)", txParams.ChannelID, shorttxid(txParams.TxID), cid, elapsedMilliseconds)
}(time.Now())
var err error
var res *pb.Response
var ccevent *pb.ChaincodeEvent
// is this a system chaincode
res, ccevent, err = e.s.Execute(txParams, txParams.ChannelID, cid.Name, version, txParams.TxID, txParams.SignedProp, txParams.Proposal, input)
if err != nil {
return nil, nil, err
}
// per doc anything < 400 can be sent as TX.
// fabric errors will always be >= 400 (ie, unambiguous errors )
// "lscc" will respond with status 200 or 500 (ie, unambiguous OK or ERROR)
if res.Status >= shim.ERRORTHRESHOLD {
return res, nil, nil
}
// ----- BEGIN - SECTION THAT MAY NEED TO BE DONE IN LSCC ------
// if this a call to deploy a chaincode, We need a mechanism
// to pass TxSimulator into LSCC. Till that is worked out this
// special code does the actual deploy, upgrade here so as to collect
// all state under one TxSimulator
//
// NOTE that if there's an error all simulation, including the chaincode
// table changes in lscc will be thrown away
if cid.Name == "lscc" && len(input.Args) >= 3 && (string(input.Args[0]) == "deploy" || string(input.Args[0]) == "upgrade") {
userCDS, err := putils.GetChaincodeDeploymentSpec(input.Args[2], e.PlatformRegistry)
if err != nil {
return nil, nil, err
}
var cds *pb.ChaincodeDeploymentSpec
cds, err = e.SanitizeUserCDS(userCDS)
if err != nil {
return nil, nil, err
}
// this should not be a system chaincode
if e.s.IsSysCC(cds.ChaincodeSpec.ChaincodeId.Name) {
return nil, nil, errors.Errorf("attempting to deploy a system chaincode %s/%s", cds.ChaincodeSpec.ChaincodeId.Name, txParams.ChannelID)
}
_, _, err = e.s.ExecuteLegacyInit(txParams, txParams.ChannelID, cds.ChaincodeSpec.ChaincodeId.Name, cds.ChaincodeSpec.ChaincodeId.Version, txParams.TxID, txParams.SignedProp, txParams.Proposal, cds)
if err != nil {
// increment the failure to indicate instantion/upgrade failures
meterLabels := []string{
"channel", txParams.ChannelID,
"chaincode", cds.ChaincodeSpec.ChaincodeId.Name + ":" + cds.ChaincodeSpec.ChaincodeId.Version,
}
e.Metrics.InitFailed.With(meterLabels...).Add(1)
return nil, nil, err
}
}
// ----- END -------
return res, ccevent, err
}
其中e.s.Execute
实际调用了support.go
中SupportImpl
的Execute()
方法
func (s *SupportImpl) Execute(txParams *ccprovider.TransactionParams, cid, name, version, txid string, signedProp *pb.SignedProposal, prop *pb.Proposal, input *pb.ChaincodeInput) (*pb.Response, *pb.ChaincodeEvent, error) {
cccid := &ccprovider.CCContext{
Name: name,
Version: version,
}
// decorate the chaincode input
decorators := library.InitRegistry(library.Config{}).Lookup(library.Decoration).([]decoration.Decorator)
input.Decorations = make(map[string][]byte)
input = decoration.Apply(prop, input, decorators...)
txParams.ProposalDecorations = input.Decorations
return s.ChaincodeSupport.Execute(txParams, cccid, input)
}
链码容器启动
ChaincodeSupport.Invoke
方法中
func (cs *ChaincodeSupport) Invoke(txParams *ccprovider.TransactionParams, cccid *ccprovider.CCContext, input *pb.ChaincodeInput) (*pb.ChaincodeMessage, error) {
# 这一句启动链码容器
h, err := cs.Launch(txParams.ChannelID, cccid.Name, cccid.Version, txParams.TXSimulator)
if err != nil {
return nil, err
}
// TODO add Init exactly once semantics here once new lifecycle
// is available. Enforced if the target channel is using the new lifecycle
//
// First, the function name of the chaincode to invoke should be checked. If it is
// "init", then consider this invocation to be of type pb.ChaincodeMessage_INIT,
// otherwise consider it to be of type pb.ChaincodeMessage_TRANSACTION,
//
// Secondly, A check should be made whether the chaincode has been
// inited, then, if true, only allow cctyp pb.ChaincodeMessage_TRANSACTION,
// otherwise, only allow cctype pb.ChaincodeMessage_INIT,
cctype := pb.ChaincodeMessage_TRANSACTION
return cs.execute(cctype, txParams, cccid, input, h)
}
最终跟到文件dockercontroller.go
,在此文件中完成docker
容器的创建
链码镜像生成以及容器生成
在dockercontroller.go
中
// 容器创建-启动的主方法
func (vm *DockerVM) Start(ccid ccintf.CCID, args, env []string, filesToUpload map[string][]byte, builder container.Builder) error {
imageName, err := vm.GetVMNameForDocker(ccid)
if err != nil {
return err
}
attachStdout := viper.GetBool("vm.docker.attachStdout")
containerName := vm.GetVMName(ccid)
logger := dockerLogger.With("imageName", imageName, "containerName", containerName)
client, err := vm.getClientFnc()
if err != nil {
logger.Debugf("failed to get docker client", "error", err)
return err
}
vm.stopInternal(client, containerName, 0, false, false)
// 创建容器
err = vm.createContainer(client, imageName, containerName, args, env, attachStdout)
if err == docker.ErrNoSuchImage {
// 链码实例化一般来说都会进入此块,因为不存在镜像
// 此方法生成的reader就是DockerFile的文件流 很重要
reader, err := builder.Build()
if err != nil {
return errors.Wrapf(err, "failed to generate Dockerfile to build %s", containerName)
}
err = vm.deployImage(client, ccid, reader)
if err != nil {
return err
}
其中builder.Build()
就是生产Dockerfile
的地方,builder
的实现是PlatformBuilder
func (b *PlatformBuilder) Build() (io.Reader, error) {
return b.PlatformRegistry.GenerateDockerBuild(
b.Type,
b.Path,
b.Name,
b.Version,
b.CodePackage,
)
}
进入PlatformRegistry
的方法GenerateDockerBuild
func (r *Registry) GenerateDockerBuild(ccType, path, name, version string, codePackage []byte) (io.Reader, error) {
inputFiles := make(map[string][]byte)
// Generate the Dockerfile specific to our context
// 生成dockerfile
dockerFile, err := r.GenerateDockerfile(ccType, name, version)
if err != nil {
return nil, fmt.Errorf("Failed to generate a Dockerfile: %s", err)
}
inputFiles["Dockerfile"] = []byte(dockerFile)
// ----------------------------------------------------------------------------------------------------
// Finally, launch an asynchronous process to stream all of the above into a docker build context
// ----------------------------------------------------------------------------------------------------
input, output := io.Pipe()
go func() {
gw := gzip.NewWriter(output)
tw := tar.NewWriter(gw)
err := r.StreamDockerBuild(ccType, path, codePackage, inputFiles, tw)
if err != nil {
logger.Error(err)
}
tw.Close()
gw.Close()
output.CloseWithError(err)
}()
return input, nil
}
进入方法GenerateDockerfile
func (r *Registry) GenerateDockerfile(ccType, name, version string) (string, error) {
platform, ok := r.Platforms[ccType]
if !ok {
return "", fmt.Errorf("Unknown chaincodeType: %s", ccType)
}
var buf []string
// ----------------------------------------------------------------------------------------------------
// Let the platform define the base Dockerfile
// ----------------------------------------------------------------------------------------------------
// 根据不同的链码语言生成dockerfile
base, err := platform.GenerateDockerfile()
if err != nil {
return "", fmt.Errorf("Failed to generate platform-specific Dockerfile: %s", err)
}
buf = append(buf, base)
// ----------------------------------------------------------------------------------------------------
// Add some handy labels
// ----------------------------------------------------------------------------------------------------
buf = append(buf, fmt.Sprintf(`LABEL %s.chaincode.id.name="%s" \`, metadata.BaseDockerLabel, name))
buf = append(buf, fmt.Sprintf(` %s.chaincode.id.version="%s" \`, metadata.BaseDockerLabel, version))
buf = append(buf, fmt.Sprintf(` %s.chaincode.type="%s" \`, metadata.BaseDockerLabel, ccType))
buf = append(buf, fmt.Sprintf(` %s.version="%s" \`, metadata.BaseDockerLabel, metadata.Version))
buf = append(buf, fmt.Sprintf(` %s.base.version="%s"`, metadata.BaseDockerLabel, metadata.BaseVersion))
// ----------------------------------------------------------------------------------------------------
// Then augment it with any general options
// ----------------------------------------------------------------------------------------------------
//append version so chaincode build version can be compared against peer build version
buf = append(buf, fmt.Sprintf("ENV CORE_CHAINCODE_BUILDLEVEL=%s", metadata.Version))
// ----------------------------------------------------------------------------------------------------
// Finalize it
// ----------------------------------------------------------------------------------------------------
contents := strings.Join(buf, "\n")
logger.Debugf("\n%s", contents)
return contents, nil
我们主要看下JAVA链码平台core/chaincode/platforms/java/platform.go
的GenerateDockerfile
方法
func (javaPlatform *Platform) GenerateDockerfile() (string, error) {
var buf []string
buf = append(buf, "FROM "+cutil.GetDockerfileFromConfig("chaincode.java.runtime"))
buf = append(buf, "ADD binpackage.tar /root/chaincode-java/chaincode")
dockerFileContents := strings.Join(buf, "\n")
return dockerFileContents, nil
}
从上代码可以看到,Java链码容器依赖的镜像是从配置chaincode.java.runtime
中读取的,而这个配置在integration/nwo/core_template.go
中(以下只是部分配置)
chaincode:
builder: $(DOCKER_NS)/fabric-ccenv:$(ARCH)-$(PROJECT_VERSION)
pull: false
golang:
runtime: $(BASE_DOCKER_NS)/fabric-baseos:$(ARCH)-$(BASE_VERSION)
dynamicLink: false
car:
runtime: $(BASE_DOCKER_NS)/fabric-baseos:$(ARCH)-$(BASE_VERSION)
java:
runtime: $(DOCKER_NS)/fabric-javaenv:$(ARCH)-$(PROJECT_VERSION)
可以看到,JAVA链码容器依赖镜像hyperledger/fabric-javaenv
,这也是为什么我们在很多教程中可以看到搭建初始化网络时会让大家先pull这个镜像。