-
Notifications
You must be signed in to change notification settings - Fork 5.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
graph engine #31226
graph engine #31226
Conversation
Thanks for your contribution! |
✅ This PR's description meets the template requirements! |
@@ -56,4 +56,4 @@ For example: no sample code; The sample code is not helpful; The sample code not | |||
For example:Chinese API in this doc is inconsistent with English API, including params, description, sample code, formula, etc. | |||
|
|||
#### Other | |||
For example: The doc link is broken; The doc page is missing; Dead link in docs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why update this ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ran pre-commit --all-file, so some files code style got changed.
#include "paddle/fluid/string/string_helper.h" | ||
namespace paddle { | ||
namespace distributed { | ||
int GraphPsService_Stub::thread_num = 3; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hard code?
auto promise = std::make_shared<std::promise<int32_t>>(); | ||
closure->add_promise(promise); | ||
std::future<int> fut = promise->get_future(); | ||
; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
delete this.
@@ -0,0 +1,169 @@ | |||
#!/bin/bash |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个文件和 paddle/scripts/submit_local.sh.in
基本重复,请问是什么原因要重新建一个呢? @seemingwang
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
好的,这个文件是错误添加的,会删除
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
多谢指正
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
另外,paddle/scripts/docker/root
底下都是个人的配置信息,应该删除
我并不是root下面文件的创造者,只是之前我这边跑pre-commit的时候修改了一下style加了部分注释内容。 |
PR types
New features
PR changes
Others
Describe
项目说明:
1.
分布式存储节点,一个节点的所有邻居信息都存放在同一台server上, 初始化设定server-num和shard-num, 若一个节点id = key,则
存放这个节点的shard的编号为 key % shard-num,存放这个节点的server为 (key % shard_num) / sparse_local_shard_num(shard_num, server_num);
其中sparse_local_shard_num定义为
static int32_t sparse_local_shard_num(uint32_t shard_num,
uint32_t server_num) {
if (shard_num % server_num == 0) {
return shard_num / server_num;
}
size_t local_shard_num = shard_num / server_num + 1;
return local_shard_num; }
简而言之就是,如果shard-num是server-num的整数倍,则每台server上分配
shard-num/server-num 个shard,否则第1个到server-num-1个server上存shard-num/server-num + 1个shard,剩下不够的shard全放到最后一个server上。
例如有30个shard,3台server,则每台server上放10个shard,每台server的shard编号范围是
0: 【0-9】
1: 【10-19】
2: 【20-29】
如果有28个shard,3台server,则前2台server放10个shard,最后一个放8个shard,每台server的shard编号范围是
0: 【0-9】
1: 【10-19】
2: 【20-27】
2.每个节点有个节点id,节点存放在table里,每个table有个字符串名字,客户端通过table名字来指明查询哪个table。为了优化查询,实际使用的时候,拆分出2种类型的table:node-table 和 edge-table, node-table存放node节点特征,查询node节点特征的请求都会发给这样的table,edge-table 存放边,邻居采样的请求则发给edge-table,node-table和edge-table的节点id集合是一样的,只是存放的额外信息有差别。
load api(load_node_file,load_edge_file):client端都实现了load_edge_file和load_edge_file函数,参数是存储node信息或者边信息的文件地址,这个操作可以由任意一个client发起,之后这个client会通知所有server从这个文件地址load图数据。
4.邻居采样api(batch_sample_neighbor): 由客户端发起batch_sample_neighbor命令,有3个参数,
table名,node-id 数组,sample-size采样长度, client端会通过计算决定当前node属于哪台server,然后向对应server发起邻居采样sample请求,对应server拿到请求,查询map获取到具体node的地址,通过调用这个node的sample函数返回一系列node(采样的结果)
5.分段查询api(pull_graph_list ):客户端发起 pull_graph_list 请求,有几个重要参数,server编号,start(起始位置),size(查询长度),step(步长),则对应server找到当前机器上存储node列表,通过多线程从start位置取size个node,每2个相邻node-id的差是step,返回。
server端会从对应table里把属于node-id数组的节点都取出来,然后获取其属于属性数组的属性,返回。
7.随机节点采样api(random_sample_nodes),需要3个参数,table名,server 编号,采样长度sample-size,对应的server将从本地存储的table内容里,随机返回sample-size个node,若总量不足sample-size则返回全体node。
8.添加属性描述api,add_table_feat_conf: 4个参数,table名,属性名,属性类型,属性数组长度。例如这个命令add_table_feat_conf("user", "a", "float32", 1)给user table加一个属性a,值是float32类型,属性数组长度为1.这个api需要在load_edge_file和load_edge_file前调用。
单机测试具体调用样例:
本地准备一个文件input.txt (边)
每行内不同元素用tab分割
37 45 0.34
37 145 0.31
37 112 0.21
96 48 1.4
96 247 0.31
96 111 1.21
59 45 0.34
59 145 0.31
59 122 0.21
97 48 0.34
97 247 0.31
97 111 0.21
以第一行为例子:
37 45 0.34 表示点37到45之间有一条边,weight为0.34
本地准备一个文件node_input.txt (节点)
user 37 a 0.34 b 13 14 c hello d 0.2
user 96 a 0.31 b 15 10 c hello d 0.3
user 59 a 0.11 b 11 14
user 97 a 0.11 b 12 11
item 45 a 0.21
item 145 a 0.21
item 112 a 0.21
item 48 a 0.21
item 247 a 0.21
item 111 a 0.21
第一列为table名,第二列为node-id,后面是属性
table和node-id之间,node-id和属性,属性和属性之间都用tab分割,属性和属性对应的值之间用空格分割
以第一行为例子:分割符号分别是:
user【tab分割】37【tab分割】a【空格分割】0.34【tab分割】b【空格分割】13【空格分割】14【tab分割】c hello【tab分割】d【空格分割】0.2
user表里,37有a,b,c,d4个属性,值分别是0.34, 【13,14】,"hello", 0.2
里的属性和add-feat-conf函数添加属性对应,可以参考调试代码。
存储文件,接着运行以下脚本
from paddle.fluid.core import GraphPyService,GraphPyServer,GraphPyClient
import numpy as np;
ips_str = "127.0.0.1:4211;127.0.0.1:4212" #打算启动2台server,ip分别是127.0.0.1:4211;127.0.0.1:4212
server1 = GraphPyServer()
server2 = GraphPyServer()
client1 = GraphPyClient()
client2 = GraphPyClient() #模拟分布式环境,启动2台client
node_types = ["user"] #节点table名
edge_types = ["user2item"] #边table名
server1.set_up(ips_str,127,node_types,edge_types,0); 启动server1,需要5个参数,ip列表,shard个数,节点table名数组,边table名数组,server编号(下面的client的set-up和这个用法类似,最后一个参数表示client编号)
server1.add_table_feat_conf("user", "a", "float32", 1); #给user表添加一个属性a,类型为float32,个数为1
server1.add_table_feat_conf("user", "b", "int32", 2);
server1.add_table_feat_conf("user", "c", "string", 1);
server1.add_table_feat_conf("user", "d", "float32", 1);
server2.set_up(ips_str,127,node_types,edge_types,1);
server2.add_table_feat_conf("user", "a", "float32", 1);
server2.add_table_feat_conf("user", "b", "int32", 2);
server2.add_table_feat_conf("user", "c", "string", 1);
server2.add_table_feat_conf("user", "d", "float32", 1);
client1.set_up(ips_str,127,node_types,edge_types,0);
client2.set_up(ips_str,127,node_types,edge_types,1);
server1.start_server(False); #启动server,参数=False表示不阻塞,代码继续执行,如果参数=True则阻塞,直到client发送stop-server命令才会解除阻塞。
server2.start_server(False);
client1.start_client();
client2.start_client();
client1.bind_local_server(0,server1); #标记0号channel为本地channel,并绑定本地server,这个函数是优化本地查询的,实际测试过程中可以不用)
client2.bind_local_server(1,server2);#同上
client1.load_edge_file("user2item", "input.txt", 0); #导入边文件
client1.load_node_file("user", "node_input.txt"); #导入点文件
list = client2.pull_graph_list("user2item",0,1,4,1) #批量查询节点,table名为user2item,server编号为0,节点id为1,从第4个元素开始遍历,步长为1
for x in list:
print(x)
list = client1.batch_sample_neighboors("user2item",[96], 4); #从user2item表里采样,采样节点为96,返回4个96号节点的邻居
for x in list:
print(x)
list = client1.random_sample_nodes("user",0,6); #从0号server的user表里随机返回6个node
print("sample nodes result")
for x in list:
print(x)
string
print("test string")
print(client1.get_node_feat("user", [37, 96], ["c"])) #从user表里,把[37,96]号节点取出,然后获取其属性c
int
print("test int")
data = client1.get_node_feat("user", [37, 96], ["b"])
print(np.frombuffer(data[0][0], "int32"))
float
print("test float")
data = client1.get_node_feat("user", [37, 96], ["a", "d"])
print(np.frombuffer(data[0][0], "float32"))