Skip to content

Commit

Permalink
Update kafka value parser (#1336)
Browse files Browse the repository at this point in the history
* Update kafka value parser

* Update ex-ug-import-from-kafka.md

* Update ex-ug-import-from-kafka.md

* Update ex-ug-import-from-kafka.md

* Update ex-ug-import-from-kafka.md

* Update ex-ug-import-from-kafka.md

* Update ex-ug-import-from-kafka.md
  • Loading branch information
randomJoe211 authored Dec 28, 2021
1 parent 2ed848b commit 672cfb5
Showing 1 changed file with 14 additions and 20 deletions.
34 changes: 14 additions & 20 deletions docs-2.0/nebula-exchange/use-exchange/ex-ug-import-from-kafka.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,6 @@

### 步骤 2:修改配置文件

!!! note

如果部分数据存储在 Kafka 的 value 域内,需要自行修改源码,从 Kafka 中获取 value 域,将 value 通过 from_json 函数解析,然后作为 Dataframe 返回。

编译 Exchange 后,复制`target/classes/application.conf`文件设置 Kafka 数据源相关的配置。在本示例中,复制的文件名为`kafka_application.conf`。各个配置项的详细说明请参见[配置说明](../parameter-reference/ex-ug-parameter.md)

```conf
Expand Down Expand Up @@ -145,16 +141,15 @@
# 消息类别。
topic: "topic_name1"
# Kafka 数据有固定的域名称:key、value、topic、partition、offset、timestamp、timestampType。
# Spark 读取为 DataFrame 后,如果需要指定多个字段,用英文逗号(,)隔开。
# 在 fields 里指定字段名称,例如用 key 对应 Nebula 中的 name, value 对应 Nebula 中的 age,示例如下:
fields: [key,value]
nebula.fields: [name,age]
# 在 fields 里指定 Kafka value 中的字段名称,多个字段用英文逗号(,)隔开。Spark Structured Streaming 读取 Kafka 数据后会将其以 JSON 格式存储于 value 字段中,而这里的 fields 要配置 JSON 的 key 名。示例如下:
fields: [personName, personAge]
# 设置与 fields 中的 key 对应的 Nebula Graph 属性名,key 的 value 将保存为相应的属性值。下方设置会将 personName 的 value 保存到 Nebula Graph 中的 name 属性,personAge 的 value 则保存到 age 属性。
nebula.fields: [name, age]
# 指定表中某一列数据为 Nebula Graph 中点 VID 的来源。
# 这里的值 key 和上面的 key 重复,表示 key 既作为 VID,也作为属性 name。
vertex:{
field:key
field:personId
}
# 单批次写入 Nebula Graph 的数据条数。
Expand All @@ -177,7 +172,7 @@
fields: [key]
nebula.fields: [name]
vertex:{
field:key
field:teamId
}
batch: 10
partition: 10
Expand Down Expand Up @@ -207,20 +202,19 @@
# 消息类别。
topic: "topic_name3"
# Kafka 数据有固定的域名称:key、value、topic、partition、offset、timestamp、timestampType。
# Spark 读取为 DataFrame 后,如果需要指定多个字段,用英文逗号(,)隔开。
# 在 fields 里指定字段名称,例如用 key 对应 Nebula 中的 degree,示例如下:
fields: [key]
# 在 fields 里指定 Kafka value 中的字段名称,多个字段用英文逗号(,)隔开。Spark Structured Streaming 读取 Kafka 数据后会将其以 JSON 格式存储于 value 字段中,而这里的 fields 要配置 JSON 的 key 名。示例如下:
fields: [degree]
# 设置与 fields 中的 key 对应的 Nebula Graph 属性名,key 的 value 将保存为相应的属性值。下方设置会将 degree 的 value 保存到 Nebula Graph 中的 degree 属性。
nebula.fields: [degree]
# 在 source 里,将 topic 中某一列作为边的起始点数据源。
# 在 target 里,将 topic 中某一列作为边的目的点数据源。
source:{
field:timestamp
field:srcPersonId
}
target:{
field:offset
field:dstPersonId
}
# 单批次写入 Nebula Graph 的数据条数。
Expand All @@ -243,14 +237,14 @@
service: "127.0.0.1:9092"
topic: "topic_name4"
fields: [timestamp,offset]
fields: [startYear,endYear]
nebula.fields: [start_year,end_year]
source:{
field:key
field:personId
}
target:{
field:value
field:teamId
}
batch: 10
Expand Down

0 comments on commit 672cfb5

Please sign in to comment.