Skip to content

Commit

Permalink
[Cosmos] Bulk/Batch APIs with v1/v2 hashing in JS (#10168)
Browse files Browse the repository at this point in the history
Co-authored-by: Steve Faulkner <[email protected]>
  • Loading branch information
zfoster and southpolesteve authored Jul 28, 2020
1 parent 7e2b6df commit 821f350
Show file tree
Hide file tree
Showing 19 changed files with 1,995 additions and 496 deletions.
970 changes: 482 additions & 488 deletions common/config/rush/pnpm-lock.yaml

Large diffs are not rendered by default.

29 changes: 26 additions & 3 deletions sdk/cosmosdb/cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,29 @@

## 3.7.5 (Unreleased)

- FEATURE: Adds bulk request to container.items. Allows aggregate bulk request for up to 100 operations on items with the types: Create, Upsert, Read, Replace, Delete

```js
// up to 100 operations
const operations: OperationInput[] = [
{
operationType: "Create",
resourceBody: { id: "doc1", name: "sample", key: "A" }
},
{
operationType: "Upsert",
resourceBody: { id: "doc2", name: "other", key: "A" }
},
{
operationType: "Read",
id: "readItemId",
partitionKey: "key"
}
];

await database.container.items.bulk(operations);
```

- FEATURE: Throws when initializing ClientContext with an invalid endpoint

## 3.7.4 (2020-6-30)
Expand Down Expand Up @@ -215,14 +238,14 @@ Constructor options have been simplified:
const client = new CosmosClient({
endpoint: "https://your-database.cosmos.azure.com",
auth: {
masterKey: "your-primary-key",
},
masterKey: "your-primary-key"
}
});

// v3
const client = new CosmosClient({
endpoint: "https://your-database.cosmos.azure.com",
key: "your-primary-key",
key: "your-primary-key"
});
```

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmosdb/cosmos/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
"@types/debug": "^4.1.4",
"debug": "^4.1.1",
"fast-json-stable-stringify": "^2.0.0",
"jsbi": "^3.1.3",
"node-abort-controller": "^1.0.4",
"node-fetch": "^2.6.0",
"os-name": "^3.1.0",
Expand Down
84 changes: 84 additions & 0 deletions sdk/cosmosdb/cosmos/review/cosmos.api.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,14 @@ export class ChangeFeedResponse<T> {
export class ClientContext {
constructor(cosmosClientOptions: CosmosClientOptions, globalEndpointManager: GlobalEndpointManager);
// (undocumented)
bulk<T>({ body, path, resourceId, partitionKeyRangeId, options }: {
body: T;
path: string;
partitionKeyRangeId: string;
resourceId: string;
options?: RequestOptions;
}): Promise<Response<any>>;
// (undocumented)
clearSessionToken(path: string): void;
// (undocumented)
create<T, U = T>({ body, path, resourceType, resourceId, options, partitionKey }: {
Expand Down Expand Up @@ -330,6 +338,9 @@ export const Constants: {
EnableScriptLogging: string;
ScriptLogResults: string;
ALLOW_MULTIPLE_WRITES: string;
IsBatchRequest: string;
IsBatchAtomic: string;
ForceRefresh: string;
};
WritableLocations: string;
ReadableLocations: string;
Expand Down Expand Up @@ -481,6 +492,11 @@ export interface CosmosHeaders {
[key: string]: any;
}

// @public (undocumented)
export type CreateOperation = OperationWithItem & {
operationType: "Create";
};

// @public
export class Database {
constructor(client: CosmosClient, id: string, clientContext: ClientContext);
Expand Down Expand Up @@ -565,6 +581,12 @@ export enum DataType {
// @public (undocumented)
export const DEFAULT_PARTITION_KEY_PATH: "/_partitionKey";

// @public (undocumented)
export type DeleteOperation = OperationBase & {
operationType: "Delete";
id: string;
};

// @public (undocumented)
export interface ErrorBody {
// (undocumented)
Expand Down Expand Up @@ -759,6 +781,7 @@ export class ItemResponse<T extends ItemDefinition> extends ResourceResponse<T &
// @public
export class Items {
constructor(container: Container, clientContext: ClientContext);
bulk(operations: OperationInput[], options?: RequestOptions): Promise<OperationResponse[]>;
changeFeed(partitionKey: string | number | boolean, changeFeedOptions?: ChangeFeedOptions): ChangeFeedIterator<any>;
changeFeed(changeFeedOptions?: ChangeFeedOptions): ChangeFeedIterator<any>;
changeFeed<T>(partitionKey: string | number | boolean, changeFeedOptions?: ChangeFeedOptions): ChangeFeedIterator<T>;
Expand Down Expand Up @@ -855,8 +878,47 @@ export class Offers {
readAll(options?: FeedOptions): QueryIterator<OfferDefinition & Resource>;
}

// @public (undocumented)
export type Operation = CreateOperation | UpsertOperation | ReadOperation | DeleteOperation | ReplaceOperation;

// @public (undocumented)
export interface OperationBase {
// (undocumented)
ifMatch?: string;
// (undocumented)
ifNoneMatch?: string;
// (undocumented)
partitionKey?: string;
}

// @public (undocumented)
export interface OperationInput {
// (undocumented)
ifMatch?: string;
// (undocumented)
ifNoneMatch?: string;
// (undocumented)
partitionKey?: string | number | null | {} | undefined;
// (undocumented)
resourceBody?: JSONObject;
}

// @public (undocumented)
export interface OperationResponse {
// (undocumented)
eTag?: string;
// (undocumented)
requestCharge: number;
// (undocumented)
resourceBody?: JSONObject;
// (undocumented)
statusCode: number;
}

// @public (undocumented)
export enum OperationType {
// (undocumented)
Batch = "batch",
// (undocumented)
Create = "create",
// (undocumented)
Expand All @@ -873,6 +935,11 @@ export enum OperationType {
Upsert = "upsert"
}

// @public (undocumented)
export type OperationWithItem = OperationBase & {
resourceBody: JSONObject;
};

// @public (undocumented)
export interface PartitionedQueryExecutionInfo {
// (undocumented)
Expand Down Expand Up @@ -1131,6 +1198,18 @@ export interface QueryRange {
min: string;
}

// @public (undocumented)
export type ReadOperation = OperationBase & {
operationType: "Read";
id: string;
};

// @public (undocumented)
export type ReplaceOperation = OperationWithItem & {
operationType: "Replace";
id: string;
};

// @public (undocumented)
export interface RequestContext {
// (undocumented)
Expand Down Expand Up @@ -1589,6 +1668,11 @@ export interface UniqueKeyPolicy {
uniqueKeys: UniqueKey[];
}

// @public (undocumented)
export type UpsertOperation = OperationWithItem & {
operationType: "Upsert";
};

// @public
export class User {
constructor(database: Database, id: string, clientContext: ClientContext);
Expand Down
49 changes: 49 additions & 0 deletions sdk/cosmosdb/cosmos/src/ClientContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,55 @@ export class ClientContext {
return this.globalEndpointManager.getReadEndpoint();
}

public async bulk<T>({
body,
path,
resourceId,
partitionKeyRangeId,
options = {}
}: {
body: T;
path: string;
partitionKeyRangeId: string;
resourceId: string;
options?: RequestOptions;
}) {
try {
const request: RequestContext = {
globalEndpointManager: this.globalEndpointManager,
requestAgent: this.cosmosClientOptions.agent,
connectionPolicy: this.connectionPolicy,
method: HTTPMethod.post,
client: this,
operationType: OperationType.Batch,
path,
body,
resourceType: ResourceType.item,
resourceId,
plugins: this.cosmosClientOptions.plugins,
options
};

request.headers = await this.buildHeaders(request);
request.headers[Constants.HttpHeaders.IsBatchRequest] = true;
request.headers[Constants.HttpHeaders.PartitionKeyRangeID] = partitionKeyRangeId;
request.headers[Constants.HttpHeaders.IsBatchAtomic] = false;

this.applySessionToken(request);

request.endpoint = await this.globalEndpointManager.resolveServiceEndpoint(
request.resourceType,
request.operationType
);
const response = await executePlugins(request, executeRequest, PluginOn.operation);
this.captureSessionToken(undefined, path, OperationType.Batch, response.headers);
return response;
} catch (err) {
this.captureSessionToken(err, path, OperationType.Upsert, (err as ErrorResponse).headers);
throw err;
}
}

private captureSessionToken(
err: ErrorResponse,
path: string,
Expand Down
108 changes: 106 additions & 2 deletions sdk/cosmosdb/cosmos/src/client/Item/Items.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,21 @@ import { extractPartitionKey } from "../../extractPartitionKey";
import { FetchFunctionCallback, SqlQuerySpec } from "../../queryExecutionContext";
import { QueryIterator } from "../../queryIterator";
import { FeedOptions, RequestOptions } from "../../request";
import { Container } from "../Container";
import { Container, PartitionKeyRange } from "../Container";
import { Item } from "./Item";
import { ItemDefinition } from "./ItemDefinition";
import { ItemResponse } from "./ItemResponse";
import {
Batch,
isKeyInRange,
Operation,
getPartitionKeyToHash,
addPKToOperation,
OperationResponse,
OperationInput
} from "../../utils/batch";
import { hashV1PartitionKey } from "../../utils/hashing/v1";
import { hashV2PartitionKey } from "../../utils/hashing/v2";

/**
* @ignore
Expand All @@ -39,7 +50,7 @@ export class Items {
constructor(
public readonly container: Container,
private readonly clientContext: ClientContext
) { }
) {}

/**
* Queries all items.
Expand Down Expand Up @@ -373,4 +384,97 @@ export class Items {
ref
);
}

/**
* Execute bulk operations on items.
*
* Bulk takes an array of Operations which are typed based on what the operation does.
* The choices are: Create, Upsert, Read, Replace, and Delete
*
* Usage example:
*
* // partitionKey is optional at the top level if present in the resourceBody
* const operations: OperationInput[] = [
* {
* operationType: "Create",
* resourceBody: { id: "doc1", name: "sample", key: "A" }
* },
* {
* operationType: "Upsert",
* partitionKey: 'A',
* resourceBody: { id: "doc2", name: "other", key: "A" }
* }
* ]
*
* await database.container.items.bulk(operation)
*
* @param operations. List of operations. Limit 100
* @param options Used for modifying the request.
*/
public async bulk(
operations: OperationInput[],
options?: RequestOptions
): Promise<OperationResponse[]> {
const {
resources: partitionKeyRanges
} = await this.container.readPartitionKeyRanges().fetchAll();
const { resource: definition } = await this.container.getPartitionKeyDefinition();
const batches: Batch[] = partitionKeyRanges.map((keyRange: PartitionKeyRange) => {
return {
min: keyRange.minInclusive,
max: keyRange.maxExclusive,
rangeId: keyRange.id,
indexes: [],
operations: []
};
});
operations
.map((operation) => addPKToOperation(operation, definition))
.forEach((operation: Operation, index: number) => {
const partitionProp = definition.paths[0].replace("/", "");
const isV2 = definition.version && definition.version === 2;
const toHashKey = getPartitionKeyToHash(operation, partitionProp);
const hashed = isV2 ? hashV2PartitionKey(toHashKey) : hashV1PartitionKey(toHashKey);
const batchForKey = batches.find((batch: Batch) => {
return isKeyInRange(batch.min, batch.max, hashed);
});
batchForKey.operations.push(operation);
batchForKey.indexes.push(index);
});

const path = getPathFromLink(this.container.url, ResourceType.item);

const orderedResponses: OperationResponse[] = [];
await Promise.all(
batches
.filter((batch: Batch) => batch.operations.length)
.map(async (batch: Batch) => {
if (batch.operations.length > 100) {
throw new Error("Cannot run bulk request with more than 100 operations per partition");
}
try {
const response = await this.clientContext.bulk({
body: batch.operations,
partitionKeyRangeId: batch.rangeId,
path,
resourceId: this.container.url,
options
});
response.result.forEach((operationResponse: OperationResponse, index: number) => {
orderedResponses[batch.indexes[index]] = operationResponse;
});
} catch (err) {
// In the case of 410 errors, we need to recompute the partition key ranges
// and redo the batch request, however, 410 errors occur for unsupported
// partition key types as well since we don't support them, so for now we throw
if (err.code === 410) {
throw new Error(
"Partition key error. Either the partitions have split or an operation has an unsupported partitionKey type"
);
}
}
})
);
return orderedResponses;
}
}
Loading

0 comments on commit 821f350

Please sign in to comment.