经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 其他 » 区块链 » 查看文章
Fabric1.4源码解析:Peer节点启动过程
来源:cnblogs  作者:触不可及`  时间:2019/7/8 8:35:55  对本文有异议

看一下Peer节点的启动过程,通常在Fabric网络中,Peer节点的启动方式有两种,通过Docker容器启动,或者是通过执行命令直接启动。
一般情况下,我们都是执行docker-compose -f docker-*.yaml up命令通过容器启动了Peer节点,而如果直接启动Peer节点则是执行了peer node start这条命令。看起来,这两种方式所使用的命令毫无关系,但事实上,在Docker容器中启动Peer节点也是通过执行了peer node start这条命令来启动Peer节点,只不过是Docker替我们执行了,这条命令就在之前通过启动Docker容器的那个文件中写到。所以说,无论是哪种方式启动Peer节点,都是通过peer node start这条命令,接下来,我们就分析一下执行完这条命令后,Peer节点的启动过程。
和之前一样,首先找到切入点,在/fabric/peer/main.go文件中,第46行:

  1. mainCmd.AddCommand(node.Cmd())

这里包含了与对Peer节点进行相关操作的命令集合,其中就有启动Peer节点的命令,我们点进行看一下:

  1. func Cmd() *cobra.Command {
  2. nodeCmd.AddCommand(startCmd())
  3. nodeCmd.AddCommand(statusCmd())
  4. return nodeCmd
  5. }

共有两条命令:启动Peer节点,以及查看节点的状态,我们看一下启动Peer节点这条命令,首先调用了peer/node/start.go文件中的startCmd(),之后转到了nodeStartCmd,以及serve(args)这个方法。其中,serve(args)这个方法就是本文要说明了主要方法,我们就从这里开始分析,在peer/node/start.go文件中第125行:

  1. func serve(args []string) error {
  2. #首先获取MSP的类型,msp指的是成员关系服务提供者,相当于许可证
  3. mspType := mgmt.GetLocalMSP().GetType()
  4. #如果MSP的类型不是FABRIC,返回错误信息
  5. if mspType != msp.FABRIC {
  6. panic("Unsupported msp type " + msp.ProviderTypeToString(mspType))
  7. }
  8. ...
  9. #创建ACL提供者,access control list访问控制列表
  10. aclProvider := aclmgmt.NewACLProvider(
  11. aclmgmt.ResourceGetter(peer.GetStableChannelConfig),
  12. )
  13. #平台注册,可以使用的语言类型,最后一个car不太理解,可能和官方的一个例子有关
  14. pr := platforms.NewRegistry(
  15. &golang.Platform{},
  16. &node.Platform{},
  17. &java.Platform{},
  18. &car.Platform{},
  19. )

定义一个用于部署链码的Provider结构体:

  1. deployedCCInfoProvider := &lscc.DeployedCCInfoProvider{}
  2. ==========================DeployedCCInfoProvider==========================
  3. type DeployedChaincodeInfoProvider interface {
  4. Namespaces() []string #命名空间
  5. UpdatedChaincodes(stateUpdates map[string][]*kvrwset.KVWrite) ([]*ChaincodeLifecycleInfo, error) #保存更新的链码
  6. ChaincodeInfo(chaincodeName string, qe SimpleQueryExecutor) (*DeployedChaincodeInfo, error) #保存链码信息
  7. CollectionInfo(chaincodeName, collectionName string, qe SimpleQueryExecutor) (*common.StaticCollectionConfig, error)
  8. } #保存链码数据信息
  9. ==========================DeployedCCInfoProvider==========================

下面是对Peer节点的一些属性的设置了:

  1. identityDeserializerFactory := func(chainID string) msp.IdentityDeserializer {
  2. #获取通道管理者
  3. return mgmt.GetManagerForChain(chainID)
  4. }
  5. #相当于配置Peer节点的运行环境了,主要就是保存Peer节点的IP地址,端口,证书等相关基本信息
  6. opsSystem := newOperationsSystem()
  7. err := opsSystem.Start()
  8. if err != nil {
  9. return errors.WithMessage(err, "failed to initialize operations subystems")
  10. }
  11. defer opsSystem.Stop()
  12. metricsProvider := opsSystem.Provider
  13. #创建观察者,对Peer节点进行记录
  14. logObserver := floggingmetrics.NewObserver(metricsProvider)
  15. flogging.Global.SetObserver(logObserver)
  16. #创建成员关系信息Provider,简单来说就是保存其他Peer节点的信息,以便通信等等
  17. membershipInfoProvider := privdata.NewMembershipInfoProvider(createSelfSignedData(), identityDeserializerFactory)
  18. #账本管理器初始化,主要就是之前所定义的一些属性
  19. ledgermgmt.Initialize(
  20. &ledgermgmt.Initializer{
  21. #与Tx处理相关
  22. CustomTxProcessors: peer.ConfigTxProcessors,
  23. #之前定义的所使用的语言
  24. PlatformRegistry: pr,
  25. #与链码相关
  26. DeployedChaincodeInfoProvider: deployedCCInfoProvider,
  27. #与Peer节点交互相关
  28. MembershipInfoProvider: membershipInfoProvider,
  29. #这个不太清楚,与Peer节点的属性相关?
  30. MetricsProvider: metricsProvider,
  31. #健康检查
  32. HealthCheckRegistry: opsSystem,
  33. },
  34. )
  35. #判断是否处于开发模式下
  36. if chaincodeDevMode {
  37. logger.Info("Running in chaincode development mode")
  38. logger.Info("Disable loading validity system chaincode")
  39. viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode)
  40. }
  41. #里面有两个方法,分别是获取本地地址与获取当前Peer节点实例地址,将地址进行缓存
  42. if err := peer.CacheConfiguration(); err != nil {
  43. return err
  44. }
  45. #获取当前Peer节点实例地址,如果没有进行缓存,则会执行上一步的CacheConfiguration()方法
  46. peerEndpoint, err := peer.GetPeerEndpoint()
  47. if err != nil {
  48. err = fmt.Errorf("Failed to get Peer Endpoint: %s", err)
  49. return err
  50. }
  51. #简单的字符串操作,获取Host
  52. peerHost, _, err := net.SplitHostPort(peerEndpoint.Address)
  53. if err != nil {
  54. return fmt.Errorf("peer address is not in the format of host:port: %v", err)
  55. }
  56. #获取监听地址,该属性在opsSystem中定义过
  57. listenAddr := viper.GetString("peer.listenAddress")
  58. #返回当前Peer节点的gRPC服务器配置,该方法主要就是设置TLS与心跳信息,在/core/peer/config.go文件中第128行。
  59. serverConfig, err := peer.GetServerConfig()
  60. if err != nil {
  61. logger.Fatalf("Error loading secure config for peer (%s)", err)
  62. }
  63. #设置gRPC最大并发 grpcMaxConcurrency=2500
  64. throttle := comm.NewThrottle(grpcMaxConcurrency)
  65. #设置日志信息
  66. serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "PeerServer")
  67. serverConfig.MetricsProvider = metricsProvider
  68. #设置拦截器,不再细说
  69. serverConfig.UnaryInterceptors = append(
  70. serverConfig.UnaryInterceptors,
  71. grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)),
  72. grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
  73. throttle.UnaryServerIntercptor,
  74. )
  75. serverConfig.StreamInterceptors = append(
  76. serverConfig.StreamInterceptors,
  77. grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)),
  78. grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
  79. throttle.StreamServerInterceptor,
  80. )

到这里创建了Peer节点的gRPC服务器,将之前的监听地址与服务器配置传了进去:

  1. peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)
  2. if err != nil {
  3. logger.Fatalf("Failed to create peer server (%s)", err)
  4. }

关于权限的一些配置:

  1. #TLS的相关设置
  2. if serverConfig.SecOpts.UseTLS {
  3. logger.Info("Starting peer with TLS enabled")
  4. // set up credential support
  5. cs := comm.GetCredentialSupport()
  6. roots, err := peer.GetServerRootCAs()
  7. if err != nil {
  8. logger.Fatalf("Failed to set TLS server root CAs: %s", err)
  9. }
  10. cs.ServerRootCAs = roots
  11. // set the cert to use if client auth is requested by remote endpoints
  12. clientCert, err := peer.GetClientCertificate()
  13. if err != nil {
  14. logger.Fatalf("Failed to set TLS client certificate: %s", err)
  15. }
  16. comm.GetCredentialSupport().SetClientCertificate(clientCert)
  17. }
  18. mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert
  19. #策略检查Provider,看传入的参数就比较清楚了,Envelope,通道ID,环境变量
  20. policyCheckerProvider := func(resourceName string) deliver.PolicyCheckerFunc {
  21. return func(env *cb.Envelope, channelID string) error {
  22. return aclProvider.CheckACL(resourceName, channelID, env)
  23. }
  24. }

创建了另一个服务器,与上面的权限设置相关,用于交付与过滤区块的事件服务器:

  1. abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)
  2. #将之前创建的gRPC服务器与用于交付与过滤区块的事件服务器注册到这里
  3. pb.RegisterDeliverServer(peerServer.Server(), abServer)

接下来是与链码相关的操作:

  1. #启动与链码相关的服务器,看传入的值 Peer节点的主机名,访问控制列表Provider,pr是之前提到与语言相关的,以及之前的运行环境
  2. #主要完成三个操作:1.设置本地链码安装路径,2.创建自签名CA,3,启动链码gRPC监听服务,该方法在本文件中第709行
  3. chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)
  4. logger.Debugf("Running peer")
  5. #启动管理员服务,这个不太懂干嘛的
  6. startAdminServer(listenAddr, peerServer.Server(), metricsProvider)
  7. privDataDist := func(channel string, txID string, privateData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error {
  8. #看这个方法是分发私有数据到其他节点
  9. return service.GetGossipService().DistributePrivateData(channel, txID, privateData, blkHt)
  10. }
  11. ========================TxPvtReadWriteSetWithConfigInfo==========================
  12. #看这里,主要是私有的读写集以及配置信息
  13. type TxPvtReadWriteSetWithConfigInfo struct {
  14. EndorsedAt uint64 `protobuf:"varint,1,opt,name=endorsed_at,json=endorsedAt,proto3" json:"endorsed_at,omitempty"`
  15. PvtRwset *rwset.TxPvtReadWriteSet `protobuf:"bytes,2,opt,name=pvt_rwset,json=pvtRwset,proto3" json:"pvt_rwset,omitempty"`
  16. CollectionConfigs map[string]*common.CollectionConfigPackage `protobuf:"bytes,3,rep,name=collection_configs,json=collectionConfigs,proto3" json:"collection_configs,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
  17. XXX_NoUnkeyedLiteral struct{} `json:"-"`
  18. XXX_unrecognized []byte `json:"-"`
  19. XXX_sizecache int32 `json:"-"`
  20. }
  21. ============================TxPvtReadWriteSetWithConfigInfo==========================
  22. #获取本地的已签名的身份信息,主要是看当前节点具有的功能,比如背书,验证
  23. signingIdentity := mgmt.GetLocalSigningIdentityOrPanic()
  24. serializedIdentity, err := signingIdentity.Serialize()
  25. if err != nil {
  26. logger.Panicf("Failed serializing self identity: %v", err)
  27. }
  28. #
  29. libConf := library.Config{}
  30. ================================Config=============================
  31. type Config struct {
  32. #权限过滤
  33. AuthFilters []*HandlerConfig `mapstructure:"authFilters" yaml:"authFilters"`
  34. #这个不清楚
  35. Decorators []*HandlerConfig `mapstructure:"decorators" yaml:"decorators"`
  36. #背书
  37. Endorsers PluginMapping `mapstructure:"endorsers" yaml:"endorsers"`
  38. #验证
  39. Validators PluginMapping `mapstructure:"validators" yaml:"validators"`
  40. }
  41. ==================================Config=============================
  42. if err = viperutil.EnhancedExactUnmarshalKey("peer.handlers", &libConf); err != nil {
  43. return errors.WithMessage(err, "could not load YAML config")
  44. }
  45. #创建一个Registry实例,将上面的配置注册到这里
  46. reg := library.InitRegistry(libConf)
  47. #这一部分是背书操作的相关设置,不贴出来了
  48. ...
  49. #设置完之后注册背书服务
  50. pb.RegisterEndorserServer(peerServer.Server(), auth)
  51. #创建通道策略管理者,比如哪些节点或用户具有可读,可写,可操作的权限,都是由它管理
  52. policyMgr := peer.NewChannelPolicyManagerGetter()
  53. #创建用于广播的服务,就是区块链中用于向其他节点发送消息的服务
  54. err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)

到这里,链码的相关配置已经差不多了,到了部署系统链码的地方了:

  1. #这一行代码就是将系统链码部署上去
  2. sccp.DeploySysCCs("", ccp)
  3. logger.Infof("Deployed system chaincodes")
  4. installedCCs := func() ([]ccdef.InstalledChaincode, error) {
  5. #查看已经安装的链码
  6. return packageProvider.ListInstalledChaincodes()
  7. }
  8. #与链码的生命周期相关
  9. lifecycle, err := cc.NewLifeCycle(cc.Enumerate(installedCCs))
  10. if err != nil {
  11. logger.Panicf("Failed creating lifecycle: +%v", err)
  12. }
  13. #处理链码的元数据更新,由其他节点广播
  14. onUpdate := cc.HandleMetadataUpdate(func(channel string, chaincodes ccdef.MetadataSet) {
  15. service.GetGossipService().UpdateChaincodes(chaincodes.AsChaincodes(), gossipcommon.ChainID(channel))
  16. })
  17. #添加监听器监听链码元数据更新
  18. lifecycle.AddListener(onUpdate)

这一部分是与通道的初始化相关的内容:

  1. peer.Initialize(func(cid string) {
  2. logger.Debugf("Deploying system CC, for channel <%s>", cid)
  3. sccp.DeploySysCCs(cid, ccp)
  4. #获取通道的描述信息,就是通道的基本属性
  5. sub, err := lifecycle.NewChannelSubscription(cid, cc.QueryCreatorFunc(func() (cc.Query, error) {
  6. #根据通道ID获取账本的查询执行器
  7. return peer.GetLedger(cid).NewQueryExecutor()
  8. }))
  9. if err != nil {
  10. logger.Panicf("Failed subscribing to chaincode lifecycle updates")
  11. }
  12. #为通道注册监听器
  13. cceventmgmt.GetMgr().Register(cid, sub)
  14. }, ccp, sccp, txvalidator.MapBasedPluginMapper(validationPluginsByName),
  15. pr, deployedCCInfoProvider, membershipInfoProvider, metricsProvider)
  16. #当前节点状态改变后是否可以被发现
  17. if viper.GetBool("peer.discovery.enabled") {
  18. registerDiscoveryService(peerServer, policyMgr, lifecycle)
  19. }
  20. #获取Peer节点加入的网络ID
  21. networkID := viper.GetString("peer.networkId")
  22. logger.Infof("Starting peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address)
  23. #查看是否已经定义了配置文件
  24. profileEnabled := viper.GetBool("peer.profile.enabled")
  25. profileListenAddress := viper.GetString("peer.profile.listenAddress")
  26. #创建进程启动gRPC服务器
  27. serve := make(chan error)
  28. go func() {
  29. var grpcErr error
  30. if grpcErr = peerServer.Start(); grpcErr != nil {
  31. grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr)
  32. } else {
  33. logger.Info("peer server exited")
  34. }
  35. serve <- grpcErr
  36. }()
  37. #如果已经定义了配置文件,则启动监听服务
  38. if profileEnabled {
  39. go func() {
  40. logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress)
  41. if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr != nil {
  42. logger.Errorf("Error starting profiler: %s", profileErr)
  43. }
  44. }()
  45. }
  46. #开始处理接收到的消息了
  47. go handleSignals(addPlatformSignals(map[os.Signal]func(){
  48. syscall.SIGINT: func() { serve <- nil },
  49. syscall.SIGTERM: func() { serve <- nil },
  50. }))
  51. logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address)
  52. #阻塞在这里,除非gRPC服务停止
  53. return <-serve
  54. }

到这里Peer节点已经启动完成了,过程还是很复杂的,这里总结一下整体的过程:

  1. 首先就是读取配置信息,创建Cache结构,以及检测其他Peer节点的信息。
    1. CacheConfiguration(),主要保存其他Peer节点的相关信息。
  2. 创建PeerServer
    1. peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)
  3. 创建DeliverEventsServer
    1. abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)
    2. pb.RegisterDeliverServer(peerServer.Server(), abServer)
    3. fabric/core/peer/deliverevents.go,该服务主要用于区块的交付与过滤,主要方法:Deliver(),DeliverFiltered()
  4. 启动ChaincodeServer
    1. chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)
    2. core/chaincode/chaincode_support.go,返回了ChaincodeSupport:为Peer提供执行链码的接口,主要功能有Launch():启动一个停止运行的链码,Stop():停止链码的运行,HandleChaincodeStream():处理链码流信息,Register():将链码注册到当前Peer节点 ,createCCMessage():创建一个交易,ExecuteLegacyInit():链码的实例化,Execute():执行链码并返回回原始的响应,processChaincodeExecutionResult():处理链码的执行结果,InvokeInit():调用链码的Init方法,Invoke():调用链码,execute():执行一个交易
  5. 启动AdminServer
    1. startAdminServer(listenAddr, peerServer.Server(), metricsProvider)
    2. core/protos/peer/admin.go文件,具有GetStatus(),StartServer(),GetModuleLogLevel(),SetModuleLogLevel()等方法
  6. 创建EndorserServer
    1. pb.RegisterEndorserServer(peerServer.Server(), auth)
    2. core/endorser/endorser.go文件,注册背书服务器,提供了一个很重要的方法:ProcessProposal(),这个方法值得看一下。
  7. 创建GossipService
    1. err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)
    2. gossip/service/gossip_service.go,具有InitializeChannel(),createSelfSignedData(),updateAnchors(),AddPayload()等方法
  8. 部署系统链码。
  9. 初始化通道。
  10. 启动gRPC服务。
  11. 如果启用了profile,还会启动监听服务。

流程图:,由于Fabric在不断更新,所以代码和图中还是有一些不同的。
参考:这里

原文链接:http://www.cnblogs.com/cbkj-xd/p/11141717.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号