Skip to content

Commit

Permalink
Merge pull request #1912 from Danielle9897/RDoc-3005-subscription-con…
Browse files Browse the repository at this point in the history
…sumption-API-for-pr

RDoc-3005 [Node.js] Subscriptions > Consumption > API overview
  • Loading branch information
ppekrol authored Oct 2, 2024
2 parents 17cd432 + e841b1c commit a67e6bb
Show file tree
Hide file tree
Showing 9 changed files with 660 additions and 473 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,13 @@
using System.Threading.Tasks;
using Raven.Client.Documents;
using Raven.Client.Documents.Queries;
using Raven.Client.Documents.Session;
using Raven.Client.Documents.Session.Loaders;
using Raven.Client.Documents.Subscriptions;
using Raven.Client.Exceptions.Database;
using Raven.Client.Exceptions.Documents.Subscriptions;
using Raven.Client.Exceptions.Security;
using Raven.Documentation.Samples.ClientApi.Session.Querying;
using Raven.Documentation.Samples.Orders;
using Sparrow.Json;
using Xunit;
Expand Down Expand Up @@ -88,22 +90,69 @@ public class SubscriptionUpdateOptions : SubscriptionCreationOptions
}
#endregion
}

private interface ISubscriptionWorkerOptions
{
#region worker_options
public class SubscriptionWorkerOptions
{
public string SubscriptionName { get; }
public int MaxDocsPerBatch { get; set; }
public int SendBufferSizeInBytes { get; set; }
public int ReceiveBufferSizeInBytes { get; set; }
public bool IgnoreSubscriberErrors { get; set; }
public bool CloseWhenNoDocsLeft { get; set; }
public TimeSpan TimeToWaitBeforeConnectionRetry { get; set; }
public TimeSpan ConnectionStreamTimeout { get; set; }
public TimeSpan MaxErroneousPeriod { get; set; }
public SubscriptionOpeningStrategy Strategy { get; set; }
}
#endregion
}

private interface ISubscriptionBatchItem
{
#region batch_item
public struct Item
{
public T Result { get; internal set; }
public string ExceptionMessage { get; internal set; }
public string Id { get; internal set; }
public string ChangeVector { get; internal set; }
public bool Projection { get; internal set; }
public bool Revision { get; internal set; }
public BlittableJsonReaderObject RawResult { get; internal set; }
public BlittableJsonReaderObject RawMetadata { get; internal set; }
public IMetadataDictionary Metadata { get; internal set; }
}
#endregion
}

public interface ISubscriptionConsumptionOverloads
{
#region subscriptionWorkerGeneration
SubscriptionWorker<dynamic> GetSubscriptionWorker(string subscriptionName, string database = null);
SubscriptionWorker<dynamic> GetSubscriptionWorker(SubscriptionWorkerOptions options, string database = null);
SubscriptionWorker<T> GetSubscriptionWorker<T>(string subscriptionName, string database = null) where T : class;
SubscriptionWorker<T> GetSubscriptionWorker<T>(SubscriptionWorkerOptions options, string database = null) where T : class;
SubscriptionWorker<dynamic> GetSubscriptionWorker(
string subscriptionName, string database = null);

SubscriptionWorker<dynamic> GetSubscriptionWorker(
SubscriptionWorkerOptions options, string database = null);

SubscriptionWorker<T> GetSubscriptionWorker<T>(
string subscriptionName, string database = null) where T : class;

SubscriptionWorker<T> GetSubscriptionWorker<T>(
SubscriptionWorkerOptions options, string database = null) where T : class;
#endregion
}

public interface ISubscriptionWorkerRunning<T>
{
#region subscriptionWorkerRunning
Task Run(Action<SubscriptionBatch<T>> processDocuments, CancellationToken ct = default(CancellationToken));
Task Run(Func<SubscriptionBatch<T>, Task> processDocuments, CancellationToken ct = default(CancellationToken));
Task Run(Action<SubscriptionBatch<T>> processDocuments,
CancellationToken ct = default(CancellationToken));

Task Run(Func<SubscriptionBatch<T>, Task> processDocuments,
CancellationToken ct = default(CancellationToken));
#endregion
}

Expand Down Expand Up @@ -1037,6 +1086,26 @@ await secondaryWorker.Run(x =>
#endregion
}

private interface StrategyEnum
{
#region strategy_enum
public enum SubscriptionOpeningStrategy
{
// Connect if no other worker is connected
OpenIfFree,

// Take over the connection
TakeOver,

// Wait for currently connected worker to disconnect
WaitForFree,

// Connect concurrently
Concurrent
}
#endregion
}

public DataSubscriptions()
{
IDocumentStore store = new DocumentStore();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
import * as assert from "assert";
import { DocumentStore } from "ravendb";

const documentStore = new DocumentStore();
const session = store.openSession();

async function consumptionApi() {

{
//region consume_syntax_1
await documentStore.subscriptions.getSubscriptionWorker(subscriptionName);
await documentStore.subscriptions.getSubscriptionWorker(subscriptionName, database);

await documentStore.subscriptions.getSubscriptionWorker(options);
await documentStore.subscriptions.getSubscriptionWorker(options, database);
//endregion
}
{
//region consume_syntax_2
// The SubscriptionWorkerOptions object:
// =====================================
{
subscriptionName;
documentType;
ignoreSubscriberErrors;
closeWhenNoDocsLeft;
maxDocsPerBatch;
timeToWaitBeforeConnectionRetry;
maxErroneousPeriod;
strategy;
}
//endregion
}
{
let subscriptionWorker;
//region consume_syntax_3
subscriptionWorker.on("batch", (batch, callback) => {
// Process incoming items:
// =======================

// 'batch':
// Contains the documents to be processed.

// callback():
// Needs to be called after processing the batch
// to notify the worker that you're done processing.
});
//endregion
}
{
//region consume_syntax_4
class Item
{
result;
exceptionMessage;
id;
changeVector;
projection;
revision;
rawResult;
rawMetadata;
metadata;
}
//endregion
}

{
// for later
subscriptionWorker.on("error", (error) => {});
subscriptionWorker.on("end", () => {});
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,6 @@ const session = store.openSession();
//endregion
}

async function example() {
const subscription = await store.subscriptions.create({});
//region subscriptionWorkerGeneration
store.subscriptions.getSubscriptionWorker(options, [database]);
store.subscriptions.getSubscriptionWorker(subscriptionName, [database]);

store.subscriptions.getSubscriptionWorkerForRevisions(options, [database]);
store.subscriptions.getSubscriptionWorkerForRevisions(subscriptionName, [database]);
//endregion
}

{
let subscriptionWorker;
//region subscriptionWorkerRunning
subscriptionWorker.on("batch", (batch, callback) => { });
subscriptionWorker.on("error", (error) => {});
subscriptionWorker.on("end", () => {});
//endregion
}

//region subscriptions_example
async function worker() {

Expand Down Expand Up @@ -222,4 +202,4 @@ const session = store.openSession();
}

class Order {}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Threading.Tasks;
using Raven.Client.Documents;
using Raven.Client.Documents.Queries;
using Raven.Client.Documents.Session;
using Raven.Client.Documents.Session.Loaders;
using Raven.Client.Documents.Subscriptions;
using Raven.Client.Exceptions.Database;
Expand Down Expand Up @@ -88,22 +89,69 @@ public class SubscriptionUpdateOptions : SubscriptionCreationOptions
}
#endregion
}

private interface ISubscriptionWorkerOptions
{
#region worker_options
public class SubscriptionWorkerOptions
{
public string SubscriptionName { get; }
public int MaxDocsPerBatch { get; set; }
public int SendBufferSizeInBytes { get; set; }
public int ReceiveBufferSizeInBytes { get; set; }
public bool IgnoreSubscriberErrors { get; set; }
public bool CloseWhenNoDocsLeft { get; set; }
public TimeSpan TimeToWaitBeforeConnectionRetry { get; set; }
public TimeSpan ConnectionStreamTimeout { get; set; }
public TimeSpan MaxErroneousPeriod { get; set; }
public SubscriptionOpeningStrategy Strategy { get; set; }
}
#endregion
}

private interface ISubscriptionBatchItem
{
#region batch_item
public struct Item
{
public T Result { get; internal set; }
public string ExceptionMessage { get; internal set; }
public string Id { get; internal set; }
public string ChangeVector { get; internal set; }
public bool Projection { get; internal set; }
public bool Revision { get; internal set; }
public BlittableJsonReaderObject RawResult { get; internal set; }
public BlittableJsonReaderObject RawMetadata { get; internal set; }
public IMetadataDictionary Metadata { get; internal set; }
}
#endregion
}

public interface ISubscriptionConsumptionOverloads
{
#region subscriptionWorkerGeneration
SubscriptionWorker<dynamic> GetSubscriptionWorker(string subscriptionName, string database = null);
SubscriptionWorker<dynamic> GetSubscriptionWorker(SubscriptionWorkerOptions options, string database = null);
SubscriptionWorker<T> GetSubscriptionWorker<T>(string subscriptionName, string database = null) where T : class;
SubscriptionWorker<T> GetSubscriptionWorker<T>(SubscriptionWorkerOptions options, string database = null) where T : class;
SubscriptionWorker<dynamic> GetSubscriptionWorker(
string subscriptionName, string database = null);

SubscriptionWorker<dynamic> GetSubscriptionWorker(
SubscriptionWorkerOptions options, string database = null);

SubscriptionWorker<T> GetSubscriptionWorker<T>(
string subscriptionName, string database = null) where T : class;

SubscriptionWorker<T> GetSubscriptionWorker<T>(
SubscriptionWorkerOptions options, string database = null) where T : class;
#endregion
}

public interface ISubscriptionWorkerRunning<T>
{
#region subscriptionWorkerRunning
Task Run(Action<SubscriptionBatch<T>> processDocuments, CancellationToken ct = default(CancellationToken));
Task Run(Func<SubscriptionBatch<T>, Task> processDocuments, CancellationToken ct = default(CancellationToken));
Task Run(Action<SubscriptionBatch<T>> processDocuments,
CancellationToken ct = default(CancellationToken));

Task Run(Func<SubscriptionBatch<T>, Task> processDocuments,
CancellationToken ct = default(CancellationToken));
#endregion
}

Expand Down Expand Up @@ -980,6 +1028,26 @@ await secondaryWorker.Run(x =>
}
#endregion
}

private interface StrategyEnum
{
#region strategy_enum
public enum SubscriptionOpeningStrategy
{
// Connect if no other worker is connected
OpenIfFree,

// Take over the connection
TakeOver,

// Wait for currently connected worker to disconnect
WaitForFree,

// Connect concurrently
Concurrent
}
#endregion
}

public DataSubscriptions()
{
Expand Down Expand Up @@ -1076,4 +1144,8 @@ public void GenerateInvoice(Order o)
//public delegate void AfterBatch(int documentsProcessed);
//#endregion
}

internal class T
{
}
}
Loading

0 comments on commit a67e6bb

Please sign in to comment.