From 672cfb5b0a2b1a94d3d2d629cf620dece0c193ef Mon Sep 17 00:00:00 2001 From: randomJoe211 <69501902+randomJoe211@users.noreply.github.com> Date: Tue, 28 Dec 2021 13:43:08 +0800 Subject: [PATCH] Update kafka value parser (#1336) * Update kafka value parser * Update ex-ug-import-from-kafka.md * Update ex-ug-import-from-kafka.md * Update ex-ug-import-from-kafka.md * Update ex-ug-import-from-kafka.md * Update ex-ug-import-from-kafka.md * Update ex-ug-import-from-kafka.md --- .../use-exchange/ex-ug-import-from-kafka.md | 34 ++++++++----------- 1 file changed, 14 insertions(+), 20 deletions(-) diff --git a/docs-2.0/nebula-exchange/use-exchange/ex-ug-import-from-kafka.md b/docs-2.0/nebula-exchange/use-exchange/ex-ug-import-from-kafka.md index aefbbb6f85..52685d35b5 100644 --- a/docs-2.0/nebula-exchange/use-exchange/ex-ug-import-from-kafka.md +++ b/docs-2.0/nebula-exchange/use-exchange/ex-ug-import-from-kafka.md @@ -76,10 +76,6 @@ ### 步骤 2:修改配置文件 -!!! note - - 如果部分数据存储在 Kafka 的 value 域内,需要自行修改源码,从 Kafka 中获取 value 域,将 value 通过 from_json 函数解析,然后作为 Dataframe 返回。 - 编译 Exchange 后,复制`target/classes/application.conf`文件设置 Kafka 数据源相关的配置。在本示例中,复制的文件名为`kafka_application.conf`。各个配置项的详细说明请参见[配置说明](../parameter-reference/ex-ug-parameter.md)。 ```conf @@ -145,16 +141,15 @@ # 消息类别。 topic: "topic_name1" - # Kafka 数据有固定的域名称:key、value、topic、partition、offset、timestamp、timestampType。 - # Spark 读取为 DataFrame 后,如果需要指定多个字段,用英文逗号(,)隔开。 - # 在 fields 里指定字段名称,例如用 key 对应 Nebula 中的 name, value 对应 Nebula 中的 age,示例如下: - fields: [key,value] - nebula.fields: [name,age] + # 在 fields 里指定 Kafka value 中的字段名称,多个字段用英文逗号(,)隔开。Spark Structured Streaming 读取 Kafka 数据后会将其以 JSON 格式存储于 value 字段中,而这里的 fields 要配置 JSON 的 key 名。示例如下: + fields: [personName, personAge] + # 设置与 fields 中的 key 对应的 Nebula Graph 属性名,key 的 value 将保存为相应的属性值。下方设置会将 personName 的 value 保存到 Nebula Graph 中的 name 属性,personAge 的 value 则保存到 age 属性。 + nebula.fields: [name, age] # 指定表中某一列数据为 Nebula Graph 中点 VID 的来源。 # 这里的值 key 和上面的 key 重复,表示 key 既作为 VID,也作为属性 name。 vertex:{ - field:key + field:personId } # 单批次写入 Nebula Graph 的数据条数。 @@ -177,7 +172,7 @@ fields: [key] nebula.fields: [name] vertex:{ - field:key + field:teamId } batch: 10 partition: 10 @@ -207,20 +202,19 @@ # 消息类别。 topic: "topic_name3" - # Kafka 数据有固定的域名称:key、value、topic、partition、offset、timestamp、timestampType。 - # Spark 读取为 DataFrame 后,如果需要指定多个字段,用英文逗号(,)隔开。 - # 在 fields 里指定字段名称,例如用 key 对应 Nebula 中的 degree,示例如下: - fields: [key] + # 在 fields 里指定 Kafka value 中的字段名称,多个字段用英文逗号(,)隔开。Spark Structured Streaming 读取 Kafka 数据后会将其以 JSON 格式存储于 value 字段中,而这里的 fields 要配置 JSON 的 key 名。示例如下: + fields: [degree] + # 设置与 fields 中的 key 对应的 Nebula Graph 属性名,key 的 value 将保存为相应的属性值。下方设置会将 degree 的 value 保存到 Nebula Graph 中的 degree 属性。 nebula.fields: [degree] # 在 source 里,将 topic 中某一列作为边的起始点数据源。 # 在 target 里,将 topic 中某一列作为边的目的点数据源。 source:{ - field:timestamp + field:srcPersonId } target:{ - field:offset + field:dstPersonId } # 单批次写入 Nebula Graph 的数据条数。 @@ -243,14 +237,14 @@ service: "127.0.0.1:9092" topic: "topic_name4" - fields: [timestamp,offset] + fields: [startYear,endYear] nebula.fields: [start_year,end_year] source:{ - field:key + field:personId } target:{ - field:value + field:teamId } batch: 10