-
Notifications
You must be signed in to change notification settings - Fork 42
Writing to Couchbase
Like you can read from Couchbase, you can also store data into couchbase. We are working on expanding that functionality in the future, but for now you can:
- Store any kind of
RDD[Document[_]]
directly. - Store
PairRDD
data where the document type is inferred.
The easiest way is to store documents directly. If you have a supported RDD of type RDD[Document[_]]
, for example an RDD[JsonDocument]
you can save it like:
val doc1 = JsonDocument.create("doc1", JsonObject.create().put("some", "content"))
val doc2 = JsonArrayDocument.create("doc2", JsonArray.from("more", "content", "in", "here"))
val data = sc
.parallelize(Seq(doc1, doc2))
.saveToCouchbase()
This method is using the underlying upsert
method of the SDK. As with reading, you can omit the bucket name on the saveToCouchbase
method if only one bucket is configured.
If you don't need super tight control of how the documents are created (for example if you want to specify an expiration time), you can use the higher level converters that are provided out of the box. We want to make this more extendible in the future as well.
If a PairRDD
is used and you call toCouchbaseDocument
on it, a converter will try to find the proper document format to convert it to based on the pair value. Currently, the mapping is as follows:
-
JsonObject
->JsonDocument
-
Map[String, _]
->JsonDocument
-
JsonArray
->JsonArrayDocument
-
Seq[_]
->JsonArrayDocument
Here is an example:
val doc1 = ("doc1", JsonObject.create().put("some", "content"))
val doc2 = ("doc2", JsonObject.create().put("more", "content!"))
val data = sc
.parallelize(Seq(doc1, doc2))
.toCouchbaseDocument[JsonDocument]
.saveToCouchbase()
The only drawback to this approach is that you can only have one document type instead of many more because Document[_]
is not serializable (only certain subclasses are, like JsonDocument
but not BinaryDocument
for example).
You can also store maps and sequences the same way. Currently you can not store nested data with scala collections because the java-client converter has no clue on how to convert them. So you either need to convert them to java maps and lists on your own or you wait until we come up with something more elegant ;)
val doc1 = ("doc1", Map("key" -> "value"))
val doc2 = ("doc2", Map("a" -> 1, "b" -> true))
val data = sc
.parallelize(Seq(doc1, doc2))
.toCouchbaseDocument[JsonDocument]
.saveToCouchbase()
val doc1 = ("doc1", Seq("foo", "bar", "baz"))
val doc2 = ("doc2", Seq(1, 2, 3))
val data = sc
.parallelize(Seq(doc1, doc2))
.toCouchbaseDocument[JsonArrayDocument]
.saveToCouchbase()