From edf57644ec35a0068d6ce2b67cd6c1905b316e2d Mon Sep 17 00:00:00 2001 From: Rea Rustagi <85902999+rustagir@users.noreply.github.com> Date: Tue, 16 Jan 2024 12:11:35 -0500 Subject: [PATCH] DOCSP-33427: split large cs events (#145) * DOCSP-33427: split large cs events * CC PR suggestions (cherry picked from commit 9667a4f912b3b84dc1764d8101e33cc735242a67) --- examples/src/test/kotlin/ChangeStreamsTest.kt | 32 +++++++++ ...sTest.snippet.split-large-change-stream.kt | 8 +++ .../crud/read-operations/change-streams.txt | 66 +++++++++++++++++-- 3 files changed, 101 insertions(+), 5 deletions(-) create mode 100644 source/examples/generated/ChangeStreamsTest.snippet.split-large-change-stream.kt diff --git a/examples/src/test/kotlin/ChangeStreamsTest.kt b/examples/src/test/kotlin/ChangeStreamsTest.kt index 7086f7c2..710eb0ed 100644 --- a/examples/src/test/kotlin/ChangeStreamsTest.kt +++ b/examples/src/test/kotlin/ChangeStreamsTest.kt @@ -13,6 +13,7 @@ import config.getConfig import kotlinx.coroutines.delay import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking +import org.bson.BsonDocument import org.bson.Document import org.junit.jupiter.api.AfterAll import org.junit.jupiter.api.AfterEach @@ -136,6 +137,37 @@ internal class ChangeStreamsTest { } + // Ignore annotation added because this test requires a MongoDB 7.0 deployment + @Ignore + fun splitLargeChangeStreamTest() = runBlocking { + val changeEvents = mutableListOf>() + // :snippet-start: split-large-change-stream + val pipeline = listOf(BsonDocument().append("\$changeStreamSplitLargeEvent", BsonDocument())) + + val job = launch { + val changeStream = collection.watch(pipeline) + changeStream.collect { + println("Received a change event: $it") + changeEvents.add(it) // :remove: + } + } + // :snippet-end: + + // Perform MongoDB operations that trigger change events... + delay(1) + val testData = Document("city", "Rio de Janeiro") + collection.insertOne(testData) + + // Wait for change events + delay(1000) + + // Cancel the change stream when you're done listening for events. + job.cancel() + + // Change stream only captures the insert event, not the delete event. + assertEquals(1, changeEvents.size) + } + // NOTE: Test is being ignored because it will not work with a shared M0 cluster. // Must have a local cluster with a replica set or >=M10 on Atlas to successfully run. @Ignore diff --git a/source/examples/generated/ChangeStreamsTest.snippet.split-large-change-stream.kt b/source/examples/generated/ChangeStreamsTest.snippet.split-large-change-stream.kt new file mode 100644 index 00000000..15f35830 --- /dev/null +++ b/source/examples/generated/ChangeStreamsTest.snippet.split-large-change-stream.kt @@ -0,0 +1,8 @@ +val pipeline = listOf(BsonDocument().append("\$changeStreamSplitLargeEvent", BsonDocument())) + +val job = launch { + val changeStream = collection.watch(pipeline) + changeStream.collect { + println("Received a change event: $it") + } +} diff --git a/source/fundamentals/crud/read-operations/change-streams.txt b/source/fundamentals/crud/read-operations/change-streams.txt index 0e50b1eb..347d95ad 100644 --- a/source/fundamentals/crud/read-operations/change-streams.txt +++ b/source/fundamentals/crud/read-operations/change-streams.txt @@ -5,6 +5,13 @@ Open Change Streams =================== +.. facet:: + :name: genre + :values: reference + +.. meta:: + :keywords: code example, monitoring, aggregation + .. contents:: On this page :local: :backlinks: none @@ -18,15 +25,16 @@ In this guide, you can learn how to use a **change stream** to monitor real-time changes to your database. A change stream is a {+mdb-server+} feature that allows your application to subscribe to data changes on a single collection, database, or deployment. You can specify a set of aggregation -operators to filter and transform the data your application receives. When -connecting to a MongoDB deployment v6.0 or later, you can configure the -events to include the document data before and after the change. +operators to filter and transform the data your application receives. +When connecting to MongoDB v6.0 or later, you can configure the events +to include the document data before and after the change. Learn how to open and configure your change streams in the following sections: - :ref:`` - :ref:`` +- :ref:`` - :ref:`` .. _kotlin-change-stream-open: @@ -107,9 +115,8 @@ The following code example shows how you can apply an aggregation pipeline to configure your change stream to receive change events for only insert and update operations: - .. literalinclude:: /examples/generated/ChangeStreamsTest.snippet.apply-aggregation-operations-to-change-stream.kt - :language: java + :language: kotlin When the change stream receives an update change event, the preceding code example outputs the following text: @@ -122,6 +129,55 @@ example outputs the following text: resumeToken={...}, ... +.. _kotlin-change-stream-split-large-event: + +Split Large Change Stream Events +-------------------------------- + +When connecting to MongoDB v7.0 or later, +you can use the ``$changeStreamSplitLargeEvent`` aggregation operator to +split event documents that exceed 16 MB into smaller fragments. + +Use the ``$changeStreamSplitLargeEvent`` operator only when you expect +the change stream events to exceed the document size limit. For +example, you might use this feature if your application requires full +document pre-images or post-images. + +A ``$changeStreamSplitLargeEvent`` aggregation stage returns +fragments sequentially. You can access the fragments by using a change +stream cursor. Each fragment document includes a ``splitEvent`` object that +contains the following fields: + +.. list-table:: + :header-rows: 1 + :widths: 35 65 + + * - Field + - Description + + * - ``fragment`` + - The index of the fragment, starting at ``1`` + + * - ``of`` + - The total number of fragments that compose the split event + +The following example opens a change stream that includes an aggregation +pipeline with an ``$changeStreamSplitLargeEvent`` aggregation stage to +split large events: + +.. literalinclude:: /examples/generated/ChangeStreamsTest.snippet.split-large-change-stream.kt + :language: kotlin + +.. note:: + + You can have only one ``$changeStreamSplitLargeEvent`` stage in your + aggregation pipeline, and it must be the last stage in the pipeline. + +To learn more about the ``$changeStreamSplitLargeEvent`` aggregation operator, +see :manual:`$changeStreamSplitLargeEvent (aggregation) +` in the +Server manual. + .. _kotlin-change-stream-configure-pre-post: Include Pre-images and Post-images