-
Notifications
You must be signed in to change notification settings - Fork 42
Reading from Couchbase
I'm pretty sure you want to read data from Couchbase, right? The following methods are currently supported:
- Reading documents through
get
- Reading rows through
views
andspatial views
- Reading rows through
N1QL
In order to get access to all those methods you need to have the context methods imported:
import com.couchbase.spark._
Do not forget this, otherwise the methods shown below will not be found.
To make it as easy as possible, every time you have access to a RDD[String]
you can use it to load documents where the strings are the document IDs. Strictly speaking, it's like a map from RDD[String]
to RDD[D <: Document[_]]
. Now what does this mean? You also need to provide the target document type. These are the regular document types supported by the Java SDK. Do not forget to provide the document type, otherwise your next operations will work on Nothing.
Here is how to read 3 documents and print them:
val data = sc
.parallelize(Seq("id1", "id2", "id3"))
.couchbaseGet[JsonDocument]()
.collect()
You can also access the couchbaseGet
directly from the context:
val data = sc
.couchbaseGet(Seq("id1", "id2", "id3"))
.collect()
This will only work if you have one bucket
configured. If there are more you need to explicitly provide the bucket name:
val data = sc
.couchbaseGet("bucket", Seq("id1", "id2", "id3"))
.collect()
In addition, you can provide a parallelism
(which defaults to the spark defaultParallelism
):
val data = sc
.couchbaseGet(Seq("id1", "id2", "id3"), numSlices = 512)
.collect()
In addition, you can run any ViewQuery
you like on the bucket:
val rows = sc
.couchbaseView(ViewQuery.from("user", "all").limit(100).descending())
.collect()
If you want to fetch every document for the row (if you don't have reduce
enabled) you can simply do it like this:
val docs = sc
.couchbaseView(ViewQuery.from("user", "all").limit(100).descending().reduce(false))
.map(_.id)
.couchbaseGet[JsonDocument]()
.collect()
The same functionality is available for spatial views, make sure to pass in the SpatialViewQuery
and set the ranges as needed:
val docs = sc
.couchbaseSpatialView(SpatialViewQuery.from("user", "all"))
.map(_.id)
.couchbaseGet[JsonDocument]()
.collect()
If you are using a developer preview of N1QL, you manually need to enable it through a system property. If you are using 4.0 DP or later, this is not needed.:
System.setProperty("com.couchbase.queryEnabled", "true")
Make sure to run the cbq-engine
for the developer previews and then you can use it. Here is a count of all docs in the beer-sample
bucket (with a primary index created before):
If you run 4.0 DP or later, make sure to create the primary index first.
val docs = sc
.couchbaseQuery(Query.simple("select count(*) as cnt FROM `beer-sample`"))
.map(row => row.value.getInt("cnt"))
.foreach(println)
You can also use the DSL and even pass in arguments:
val query = Select.select("count(*) as cnt").from("`beer-sample`")
val params = QueryParams.build().consistency(ScanConsistency.NOT_BOUNDED)
val docs = sc
.couchbaseQuery(Query.simple(query, params))
.map(row => row.value.getInt("cnt"))
.foreach(println)