Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding support for upserts of nested arrays #1838

Merged
merged 7 commits into from
Jan 20, 2022

Conversation

masseyke
Copy link
Member

This commit adds support for upserts of nested array fields.
Closes #1190

@masseyke
Copy link
Member Author

I'm marking this as a draft because it fixes the specific case described in #1190 but I'm not very familiar with this part of Elasticsearch, and I'm not sure it's general enough.

Copy link
Member

@jbaiera jbaiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, small question about testing.

}
Result.SUCCESFUL()
}
Result.FAILED()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is an empty array a failure scenario here for sure?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't remember for sure at this point, but I think that my thinking was that in this case we had been asked to write something, but hadn't actually written an array, so that's failure. Maybe that doesn't make sense though. I'll take a look at what's done with that result. Also I can't remember why I'm not just writing an empty array here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It actually turns out that we ignore this result. See https://github.com/elastic/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/AbstractBulkFactory.java#L153. We probably ought to be throwing an exception there. But an empty array actually is a failure (I think). I'm about to write more in the comment below.

"es.update.script.inline" -> update_script
)
val sqlContext = new SQLContext(sc)
var data = Seq(Row("1", List(Row("hello"), Row("world"))))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we test this with an empty array in the samples field?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just tried this. If I use the code as-is, it doesn't write anything. So something gets tripped up when Elasticsearch tries to apply the script:

13:49:26.766 [Executor task launch worker for task 0.0 in stage 0.0 (TID 0)] ERROR org.apache.spark.TaskContextImpl - Error in TaskCompletionListener
org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest: org.elasticsearch.hadoop.rest.EsHadoopRemoteException: x_content_parse_exception: [1:74] [script] failed to parse field [params]
{"update":{"_id":"1"}}
{"script":{"source":"ctx._source.samples = params.new_samples","params":{"new_samples":}},"upsert":{"samples":[]}}

	at org.elasticsearch.hadoop.rest.RestClient.checkResponse(RestClient.java:487) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:444) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:438) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.hadoop.rest.RestClient.execute(RestClient.java:418) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.hadoop.rest.RestClient.bulk(RestClient.java:236) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.tryFlush(BulkProcessor.java:215) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.flush(BulkProcessor.java:518) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.hadoop.rest.bulk.BulkProcessor.close(BulkProcessor.java:560) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.hadoop.rest.RestRepository.close(RestRepository.java:219) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.hadoop.rest.RestService$PartitionWriter.close(RestService.java:122) ~[elasticsearch-hadoop-mr-8.1.0-SNAPSHOT.jar:8.1.0-SNAPSHOT]
	at org.elasticsearch.spark.rdd.EsRDDWriter$$anon$1.onTaskCompletion(EsRDDWriter.scala:74) ~[elasticsearch-spark_2.12-8.1.0-SNAPSHOT-spark30scala212.jar:8.1.0-SNAPSHOT]
	at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1(TaskContextImpl.scala:124) ~[spark-core_2.12-3.2.0.jar:3.2.0]
	at org.apache.spark.TaskContextImpl.$anonfun$markTaskCompleted$1$adapted(TaskContextImpl.scala:124) ~[spark-core_2.12-3.2.0.jar:3.2.0]
	at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1(TaskContextImpl.scala:137) ~[spark-core_2.12-3.2.0.jar:3.2.0]
	at org.apache.spark.TaskContextImpl.$anonfun$invokeListeners$1$adapted(TaskContextImpl.scala:135) ~[spark-core_2.12-3.2.0.jar:3.2.0]
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) ~[scala-library-2.12.15.jar:?]
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) ~[scala-library-2.12.15.jar:?]
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) ~[scala-library-2.12.15.jar:?]
	at org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:135) ~[spark-core_2.12-3.2.0.jar:3.2.0]
	at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:124) ~[spark-core_2.12-3.2.0.jar:3.2.0]
	at org.apache.spark.scheduler.Task.run(Task.scala:147) ~[spark-core_2.12-3.2.0.jar:3.2.0]
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506) ~[spark-core_2.12-3.2.0.jar:3.2.0]
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1462) [spark-core_2.12-3.2.0.jar:3.2.0]
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509) [spark-core_2.12-3.2.0.jar:3.2.0]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_292]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_292]
	at java.lang.Thread.run(Thread.java:748) [?:1.8.0_292]

If instead I write an empty array in DataFrameValueWriter, then when I read the data out the returned DataFrame is kind of useless. It has no columns. If you try do do something like resultDf.select("samples") you get:

'Project ['samples]
+- Relation [] ElasticsearchRelation(Map(es.read.field.as.array.include -> samples, es.resource -> nested_fields_upsert_test),org.apache.spark.sql.SQLContext@5cfeb239,None)

org.apache.spark.sql.AnalysisException: cannot resolve 'samples' given input columns: [];
'Project ['samples]
+- Relation [] ElasticsearchRelation(Map(es.read.field.as.array.include -> samples, es.resource -> nested_fields_upsert_test),org.apache.spark.sql.SQLContext@5cfeb239,None)

Still trying to track down why that is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK I added support for empty arrays (and added them to the test). My initial problems writing out empty arrays were due to the fact that they were the first and only thing I was writing, so Elasticsearch did not infer the correct mappings.

@jbaiera
Copy link
Member

jbaiera commented Jan 6, 2022

In terms of draft status: I think this is decently written. There might be a case where we're returning a MapType and this can be tripped up. Perhaps it makes sense even more so to simply delegate any unknown typed values to the primitives method?

In the case of parameters and field extraction, most primitive values are handled in the FieldWriter class (https://github.com/elastic/elasticsearch-hadoop/blob/master/mr/src/main/java/org/elasticsearch/hadoop/serialization/bulk/AbstractBulkFactory.java#L135-L142) instead of in each of the integrations value writer classes. This is because most of the time the data we're working with doesn't need to be unwrapped any further (at that point in the code, we usually just have an int or a String, etc). Anything that doesn't meet that linked conditional statement will be passed on to the integration specific serialization code to be converted to json to be added to the bulk request header.

@jbaiera
Copy link
Member

jbaiera commented Jan 6, 2022

Looking at the expected primitives in the DataFrameValueWriter at https://github.com/elastic/elasticsearch-hadoop/blob/master/spark/sql-30/src/main/scala/org/elasticsearch/spark/sql/DataFrameValueWriter.scala#L164-L174 and comparing them to what we handle in the FieldWriter (above) it looks like we might need special logic for the following values (testing will be needed to confirm if they blow up or if they're already managed):

  • BinaryType fields with Array[Byte] values
  • MapType fields with scala Map[_, _] values or java Map<?, ?> values

@masseyke
Copy link
Member Author

OK I've updated it so that writes of maps don't fail. They don't exactly behave intuitively, but I've tried to show what they do in an itest. It's not perfect, but I think it's better than complete failure for now.

@masseyke masseyke marked this pull request as ready for review January 14, 2022 21:53
Copy link
Member

@jbaiera jbaiera left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quick question, but otherwise LGTM

private def inferType(value: Any): DataType = {
value match {
case _: String => StringType
case Int => IntegerType
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala question: Should this be _: Int ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whoops -- good catch! I'll fix all of those.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And I'm pretty sure the java ones are redundant, but I'll leave them in because they don't hurt anything.

@masseyke masseyke merged commit 3c805a9 into elastic:master Jan 20, 2022
@masseyke masseyke deleted the fix/nested-fields-upsert branch January 20, 2022 14:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Nested fields upsert with Spark not working
2 participants