Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update kafka value parser #1336

Merged
merged 7 commits into from
Dec 28, 2021
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 14 additions & 18 deletions docs-2.0/nebula-exchange/use-exchange/ex-ug-import-from-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@

### 步骤 2:修改配置文件

!!! note

如果部分数据存储在 Kafka 的 value 域内,需要自行修改源码,从 Kafka 中获取 value 域,将 value 通过 from_json 函数解析,然后作为 Dataframe 返回。

编译 Exchange 后,复制`target/classes/application.conf`文件设置 Kafka 数据源相关的配置。在本示例中,复制的文件名为`kafka_application.conf`。各个配置项的详细说明请参见[配置说明](../parameter-reference/ex-ug-parameter.md)。

```conf
Expand Down Expand Up @@ -146,15 +142,15 @@
topic: "topic_name1"

# Kafka 数据有固定的域名称:key、value、topic、partition、offset、timestamp、timestampType。
# Spark 读取为 DataFrame 后,如果需要指定多个字段,用英文逗号(,)隔开。
# 在 fields 里指定字段名称,例如用 key 对应 Nebula 中的 name, value 对应 Nebula 中的 age,示例如下:
fields: [key,value]
nebula.fields: [name,age]
# 在 fields 里指定 Kafka value 中的字段名称,多个字段用英文逗号(,)隔开。Spark Structured Streaming 读取 Kafka 数据后会将其以 JSON 格式存储于 value 字段中,而这里的 fields 要配置 JSON 的 key 名。示例如下:
fields: [personName, age]
# 设置 fields 中的 key 保存到 Nebula Graph 时转换成的属性名。下方设置会将 name 保存为 Nebula Graph 中的 personName 属性,age 则保存为同名属性。
randomJoe211 marked this conversation as resolved.
Show resolved Hide resolved
nebula.fields: [name, age]
randomJoe211 marked this conversation as resolved.
Show resolved Hide resolved

# 指定表中某一列数据为 Nebula Graph 中点 VID 的来源。
# 这里的值 key 和上面的 key 重复,表示 key 既作为 VID,也作为属性 name。
vertex:{
field:key
field:personId
}

# 单批次写入 Nebula Graph 的数据条数。
Expand All @@ -177,7 +173,7 @@
fields: [key]
nebula.fields: [name]
vertex:{
field:key
field:teamId
}
batch: 10
partition: 10
Expand Down Expand Up @@ -208,19 +204,19 @@
topic: "topic_name3"

# Kafka 数据有固定的域名称:key、value、topic、partition、offset、timestamp、timestampType。
randomJoe211 marked this conversation as resolved.
Show resolved Hide resolved
# Spark 读取为 DataFrame 后,如果需要指定多个字段,用英文逗号(,)隔开。
# 在 fields 里指定字段名称,例如用 key 对应 Nebula 中的 degree,示例如下:
fields: [key]
# 在 fields 里指定 Kafka value 中的字段名称,多个字段用英文逗号(,)隔开。Spark Structured Streaming 读取 Kafka 数据后会将其以 JSON 格式存储于 value 字段中,而这里的 fields 要配置 JSON 的 key 名。示例如下:
fields: [degree]
# 设置 fields 中的 key 保存到 Nebula Graph 时转换成的属性名。下方设置会将 degree 保存为 Nebula Graph 中的同名属性。
nebula.fields: [degree]

# 在 source 里,将 topic 中某一列作为边的起始点数据源。
# 在 target 里,将 topic 中某一列作为边的目的点数据源。
source:{
field:timestamp
field:personId
}

target:{
field:offset
field:personId
randomJoe211 marked this conversation as resolved.
Show resolved Hide resolved
}

# 单批次写入 Nebula Graph 的数据条数。
Expand All @@ -243,14 +239,14 @@
service: "127.0.0.1:9092"
topic: "topic_name4"

fields: [timestamp,offset]
fields: [startYear,endYear]
nebula.fields: [start_year,end_year]
source:{
field:key
field:personId
}

target:{
field:value
field:teamId
}

batch: 10
Expand Down