fedml-4.与fabric通信
- 直接采用“偷梁换柱”的模式,把修改后的代码复制到pip安装的位置
pip install的位置
通过python -m site
命令查找包的安装路径
1 | sys.path = [ |
思考:需要修改哪些代码?
- 修改Aggregator。Aggregator的作用是(1)保存各个节点上传的本地模型;(2)对本地模型进行aggregate操作。
- 分离aggregator的功能,分为本地Aggregator和链上Aggregator,本地Aggregator不保存模型,将收到的模型转发给区块链,聚集操作时先向区块链取模型,再进行聚集操作
- 修改FedMLServerManager
- 在适当位置调用http接口,适当根据逻辑需要修改其它代码
- 修改ClientMasterManager
- 在适当位置调用http接口,适当根据逻辑需要修改其它代码
- 修改message_define,文件中定义了C/S之间相互通信的名称,参数名称
- 对其适当增删,达到C/S间协同的目的
修改FedMLAggregator
model_dict
: 用于保存收到的本地模型,与他相关的sample_num_dict
,保存收到的本地模型的样本个数,flag_client_model_uploaded_dict
用于保存是否收到模型。区块链端的Aggregator
需要维护这三个数组- add_local_trained_result,这个函数通过ml_engine_adapter.
model_params_to_device
获得本地模型,将这个函数注释掉 check_whether_all_receive
,这个函数通过检查flag_client_model_uploaded_dict
是否全是true来判断是否全部收到。这里改成从区块链获取aggregate
,这个函数使用了sample_num_dict和model_dict
,也全都改成从区块链获取- 其他函数未涉及这三个dict,暂不做修改。
修改FedMLServerManager
handle_message_receive_model_from_client
,这个函数处理节点发送来的模型,从消息中读取模型参数和样本个数。不从消息中获取这两个参数(后续修改Client,也不发送这两个参数),但是仍然向Server发送这个消息。收到消息后延迟一定时间向区块链查询是否全部收到(调用本地aggregator的check_whether_all_receive
)- 当查询到全部都收到后,会进行聚集操作,聚集操作后产生更新后的模型,这个模型会发送给client,在发送给client的同时发送一份给区块链存储
修改ClientMasterManager
send_model_to_server
,对应Server的handle_message_receive_model_from_client
。不发送模型和样本数给server,而是发送给区块链
统一发送模型的格式
ml_engine_adapter
中的函数model_params_to_device
用于处理收到的模型,根据ml_engine
的类型对模型进行处理,希望在发送之前在Client端就进行处理
通过测试,输出type(ml_engine_adapter.model_params_to_device(self.args, weights, self.trainer_dist_adapter.device)),得到其类型是
collections.OrderedDict
它也是一种dict, 存储的是model_params(string) to Tensor
对他进行处理,变成python原生的dict,对Tensor处理,变成python原生的list,目的是使其可以变成json串
server收到后,对其dumps, 再将list转换成Tensor
1 | from ...ml.engine import ml_engine_adapter |
编写ChainCode
链上Aggregator实现思路
ChainCode要实现存储模型,并检测每轮模型是否上传完毕,参考fabcar示例中队Car的定义以及对Car的增改查的代码,设计Round结构
1 | type Round struct { |
- 只需要检查Round中len(Model)与ClientNum是否相等即可
ChainCode合约定义
根据上面的分析,需要一下几个函数
- AddModel 接受并存储本地模型 保存: model-key model-weight model-sample-num
- params:
- RoundID string
- ClientID string
- model string(原始json串)
- params:
- QueryWetherAllReceived 检查是否全部收到,检查是否全部收到,返回True or False
- params:
- RoundID string
- params:
- QueryAllReceived 若全部收到,返回模型,模型权重和模型样本数目,否则返回空
- params:
- RoundID string
- params:
- UpdateGlobal 接受Server聚集后的,更新后的模型,对区块链内的内容进行更新
- params:
- ServerID string
- model-weight string
- NewRound 新增一个Round,每轮开始时创建
- params:
- RoundID string
- ClientNum int64
- params:
根据以上分析,需要在fedml节点运行时生成一个id,server的id自然成为server-id,client的id自然成为ClientID。在每轮训练开始前,需要生成RoundID,并且调用NewRound通知区块链。RoundID的前n为与ServerID相同,再拼接随机生成的后n位。
- 通过以下命令进行对ChainCode的测试与Debug
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60# RoundID = "ri"
# ServerID = "si"
# ClientID = "ci"
peer chaincode invoke -o orderer.fedfab.com:7050 --isInit --ordererTLSHostnameOverride orderer.fedfab.com --cafile /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/ordererOrganizations/fedfab.com/orderers/orderer.fedfab.com/msp/tlscacerts/tlsca.fedfab.com-cert.pem --channelID channel2 --name fedfab --tls true \
--peerAddresses peer0.org1.fedfab.com:8051 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer0.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer1.org1.fedfab.com:8053 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer1.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer2.org1.fedfab.com:8055 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer2.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer0.org2.fedfab.com:9051 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org2.fedfab.com/peers/peer0.org2.fedfab.com/tls/ca.crt \
-c '{"Args":["initialize", "Initializing chaincode"]}'
# chaincode 初始化
peer chaincode invoke -o orderer.fedfab.com:7050 --cafile /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/ordererOrganizations/fedfab.com/orderers/orderer.fedfab.com/msp/tlscacerts/tlsca.fedfab.com-cert.pem --channelID channel2 --name fedfab --tls true \
--peerAddresses peer0.org1.fedfab.com:8051 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer0.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer1.org1.fedfab.com:8053 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer1.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer2.org1.fedfab.com:8055 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer2.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer0.org2.fedfab.com:9051 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org2.fedfab.com/peers/peer0.org2.fedfab.com/tls/ca.crt \
-c '{"Args":["NewRound", "r1", "2"]}'
peer chaincode invoke -o orderer.fedfab.com:7050 --cafile /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/ordererOrganizations/fedfab.com/orderers/orderer.fedfab.com/msp/tlscacerts/tlsca.fedfab.com-cert.pem --channelID channel2 --name fedfab --tls true \
--peerAddresses peer0.org1.fedfab.com:8051 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer0.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer1.org1.fedfab.com:8053 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer1.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer2.org1.fedfab.com:8055 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer2.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer0.org2.fedfab.com:9051 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org2.fedfab.com/peers/peer0.org2.fedfab.com/tls/ca.crt \
-c '{"Args":["AddModel", "r1", "c1", "{\"a\":[1,2,3], \"b\":[2,3,4], \"c\":[3,4,5]}"]}'
peer chaincode invoke -o orderer.fedfab.com:7050 --cafile /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/ordererOrganizations/fedfab.com/orderers/orderer.fedfab.com/msp/tlscacerts/tlsca.fedfab.com-cert.pem --channelID channel2 --name fedfab --tls true \
--peerAddresses peer0.org1.fedfab.com:8051 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer0.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer1.org1.fedfab.com:8053 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer1.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer2.org1.fedfab.com:8055 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer2.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer0.org2.fedfab.com:9051 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org2.fedfab.com/peers/peer0.org2.fedfab.com/tls/ca.crt \
-c '{"Args":["QueryWetherAllReceived", "r1"]}'
peer chaincode invoke -o orderer.fedfab.com:7050 --cafile /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/ordererOrganizations/fedfab.com/orderers/orderer.fedfab.com/msp/tlscacerts/tlsca.fedfab.com-cert.pem --channelID channel2 --name fedfab --tls true \
--peerAddresses peer0.org1.fedfab.com:8051 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer0.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer1.org1.fedfab.com:8053 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer1.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer2.org1.fedfab.com:8055 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer2.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer0.org2.fedfab.com:9051 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org2.fedfab.com/peers/peer0.org2.fedfab.com/tls/ca.crt \
-c '{"Args":["AddModel", "r1", "c2", "{\"a\":[1,2,3], \"b\":[2,3,4], \"c\":[3,4,5]}"]}'
peer chaincode invoke -o orderer.fedfab.com:7050 --cafile /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/ordererOrganizations/fedfab.com/orderers/orderer.fedfab.com/msp/tlscacerts/tlsca.fedfab.com-cert.pem --channelID channel2 --name fedfab --tls true \
--peerAddresses peer0.org1.fedfab.com:8051 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer0.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer1.org1.fedfab.com:8053 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer1.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer2.org1.fedfab.com:8055 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer2.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer0.org2.fedfab.com:9051 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org2.fedfab.com/peers/peer0.org2.fedfab.com/tls/ca.crt \
-c '{"Args":["QueryWetherAllReceived", "r1"]}'
peer chaincode invoke -o orderer.fedfab.com:7050 --cafile /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/ordererOrganizations/fedfab.com/orderers/orderer.fedfab.com/msp/tlscacerts/tlsca.fedfab.com-cert.pem --channelID channel2 --name fedfab --tls true \
--peerAddresses peer0.org1.fedfab.com:8051 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer0.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer1.org1.fedfab.com:8053 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer1.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer2.org1.fedfab.com:8055 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer2.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer0.org2.fedfab.com:9051 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org2.fedfab.com/peers/peer0.org2.fedfab.com/tls/ca.crt \
-c '{"Args":["QueryAllReceived", "r1"]}'
peer chaincode invoke -o orderer.fedfab.com:7050 --cafile /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/ordererOrganizations/fedfab.com/orderers/orderer.fedfab.com/msp/tlscacerts/tlsca.fedfab.com-cert.pem --channelID channel2 --name fedfab --tls true \
--peerAddresses peer0.org1.fedfab.com:8051 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer0.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer1.org1.fedfab.com:8053 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer1.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer2.org1.fedfab.com:8055 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org1.fedfab.com/peers/peer2.org1.fedfab.com/tls/ca.crt \
--peerAddresses peer0.org2.fedfab.com:9051 --tlsRootCertFiles /opt/gopath/src/github.com/hyperledger/fabric/peer/crypto/peerOrganizations/org2.fedfab.com/peers/peer0.org2.fedfab.com/tls/ca.crt \
-c '{"Args":["UpdateGlobal", "s1", "{\"a\":[1,2,3], \"b\":[2,3,4], \"c\":[3,4,5]}"]}'
fedml拿到返回值后可能要进行两次json解析
把模型变成json后,当作普通字符串作为请求发送,发送请求时有进行了依次json编码
增加了json的转义字符,增加了冗余字符
减少了ChainCode编写的复杂度
Server需要知道Client的id? –不需要,QueryAll后对每个模型求平均就好,不需要发送给Client
更新同名的ChainCode时,label要有变化,命令中的sequence参数要比上一次递增。
编写ts API与HTTP Server
根据上周的ttsacc的api和Http Server,相同的方法实现上面5个功能对应的api和接口
使用postMan进行测试
newRound调用时,client_num使用string
编写并测试Yaml-Requests的yaml配置文件
1 | url: '10.128.196.184' |
fedml中调用api
- 根据之前的分析,在对应位置进行调用,并处理返回值
编写dockerfile
- 创建自定义镜像,修改镜像中的fedml代码
- 使用镜像创建container,在container中运行修改后的代码进行测试
修改过的文件有:1
2
3
4
5
6/home/tt/.local/lib/python3.10/site-packages/fedml/cross_silo/client/fedml_client_master_manager.py
/home/tt/.local/lib/python3.10/site-packages/fedml/cross_silo/server/fedml_aggregator.py
/home/tt/.local/lib/python3.10/site-packages/fedml/cross_silo/server/fedml_server_manager.py
/home/tt/.local/lib/python3.10/site-packages/fedml/cross_silo/server/fedml_server_manager.py
/home/tt/.local/lib/python3.10/site-packages/fedml/cross_silo/client/message_define.py
/home/tt/.local/lib/python3.10/site-packages/fedml/cross_silo/server/message_define.py
docker COPY
COPY 遵循的规则
src
路径必须在构建的上下文中src
是目录- 则复制目录的全部内容,包括文件系统元数据
- 不会复制目录本身,只会复制其内容
src
是任何其他类型的文件- 则将其与其元数据一起单独复制
dest
以斜杠 / 结尾,它将被视为一个目录,并且src
的内容将写入dest
/base(src
)
- 指定了多个
src
资源,或者由于使用了通配符- 则
dest
必须是一个目录,并且必须以斜杠 / 结尾
- 则
dest
不以斜杠结尾- 它将被视为常规文件,并且
src
的内容将写入dest
- 它将被视为常规文件,并且
dest
不存在- 路径中所有缺失的目录都会自动创建
docker 错误记录
1 | Error response from daemon: could not select device driver "" with capabilities: [[gpu]] |
解决方法
1 | curl -s -L https://nvidia.github.io/nvidia-docker/gpgkey | \ |
如果上面出错了,先使用下面的命令
1 | sudo rm -f $(grep -l "nvidia.github.io" /etc/apt/sources.list.d/* | grep -vE "/nvidia-container-toolkit.list\$") |
dockerfile 文件
1 | FROM fedml/fedml:cuda-11.4.0-devel-ubuntu20.04 |
- 编译dockerfile的脚本
1 | # docker image rm fedml:tt |
运行测试
问题: 会随机有几个请求被拒绝连接
减少出块时间到50ms
依然没有解决
修改configtx.yaml
中的一个参数1
BatchTimeout: 50ms
给invoke函数加Mutex互斥锁,让其只能按顺序执行,锁解除后在sleep 65ms
依然没有解决
观察到观察到MVCC_READ_CONFLICT总是在AddModel总是在AddModel, 时出现
在Client发送Model前,sleep(1*client_real_id),排队依次上传,给区块链反应的时间(假设模型训练时间都差不多)
还是没有解决
总结
- 经过多天的debug与调试,最终基本可以正常训练,存储到区块链上,但是最多只训练了44轮,还是在AddModel环节出现了
MVCC_READ_CONFLICT
。 - 很多地方要增加sleep操作,增加了运行时间
- 需要一个协调方式,避免同时AddModel造成BlockChain崩溃。
- 可以尝试用Server调用AddModel,Client的本地Model还是通过MQTT协议传送给Server,由Server统一上链
- 或者可以捕获异常,保证server持续运行,Client发现Add失败则重新Add
- 寻找方法将请求串行化
最新进展
- 解决了
MVCC_READ_CONFLICT
,fedml和fabric可以相互配合完成整个训练过程,并且不发生MVCC_READ_CONFLICT
等问题
新定义三个mqtt消息
1 | ## tt define |
解释:
MSG_TYPE_C2S_ADD_MODEL_READY
, Client to Server,训练完毕,可以向区块链发送AddModel。知会Server,server的Aggregator收集这个信息MSG_TYPE_S2C_START_ADD_MODEL
, Server to Client, 当全部ready时,会向Client开始依次发送这个消息。MSG_TYPE_C2S_ADD_MODEL_SEND
, Client to Server,当一个Client将Model发送到Chain后,发送这个消息到Server,Server收到后向下一个Client发送MSG_TYPE_S2C_START_ADD_MODEL
消息,让下一个Client开始上传模型
方法带来的问题与思考:
由于invoke函数的mutex锁和刻意的延迟,以及上面通过三个信号实现的排队向ChainCode发送模型,导致整个系统运行的速度非常慢
有些节点算力强,训练快,有些算力低训练慢。
只有训练速度相似的节点会竞争AddModel的权利,而现在要让所有的Client都训练完成后才开始上传区块链,这样一定程度上降低了系统的效率
解决方法(设想):
- 维护一个队列ready,一个变量sending,一个字典send
- ready表示目前收到ready的Client
- sending表示当前正在发送但没收到send消息的Client
- dict中存储ClientID to Bool,表示ClientID是否已经发送,即是否为send状态,将字典初始化全为False
- 收到Ci的ready消息后
- 若sending为None,则让Ci开始发送模型,将sending设为Ci
- 若sending不为None,加入ready队列
- 收到Ci的send消息后,将sending设为None,send[Ci]变成True,检测send字典是否全为True
- 若全为True,进行下一轮
- 若不全为True
- 若ready队列不为空,则从队列中取出下一个Client Cj让其发送模型,sending设为Cj
- 若ready队列为空,则继续等待ready消息
- 维护一个队列ready,一个变量sending,一个字典send