-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Cosmos] Bulk/Batch APIs with v1/v2 hashing in JS #10168
Changes from 23 commits
8b7f6b0
a5e78ac
e066237
b212ecf
60034c2
ea7a53f
90aae54
684772b
6c03482
4cdef29
61d41ce
18864b5
68b1fe7
5ef791c
c677566
5e256ff
0247870
5c9841b
0e6948f
7fe7dcb
8025fcf
ec1d194
d74f1e3
00d33b4
03115d1
91bfcac
dd162ea
8e08212
efd1572
6842c09
dcaa388
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -54,6 +54,14 @@ export class ChangeFeedResponse<T> { | |
export class ClientContext { | ||
constructor(cosmosClientOptions: CosmosClientOptions, globalEndpointManager: GlobalEndpointManager); | ||
// (undocumented) | ||
bulk<T>({ body, path, resourceId, partitionKeyRange, options }: { | ||
body: T; | ||
path: string; | ||
partitionKeyRange: string; | ||
resourceId: string; | ||
options?: RequestOptions; | ||
}): Promise<Response<any>>; | ||
// (undocumented) | ||
clearSessionToken(path: string): void; | ||
// (undocumented) | ||
create<T, U = T>({ body, path, resourceType, resourceId, options, partitionKey }: { | ||
|
@@ -330,6 +338,9 @@ export const Constants: { | |
EnableScriptLogging: string; | ||
ScriptLogResults: string; | ||
ALLOW_MULTIPLE_WRITES: string; | ||
IsBatchRequest: string; | ||
IsBatchAtomic: string; | ||
ForceRefresh: string; | ||
}; | ||
WritableLocations: string; | ||
ReadableLocations: string; | ||
|
@@ -759,6 +770,11 @@ export class ItemResponse<T extends ItemDefinition> extends ResourceResponse<T & | |
// @public | ||
export class Items { | ||
constructor(container: Container, clientContext: ClientContext); | ||
// Warning: (ae-forgotten-export) The symbol "Operation" needs to be exported by the entry point index.d.ts | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Please fix. |
||
// Warning: (ae-forgotten-export) The symbol "OperationResponse" needs to be exported by the entry point index.d.ts | ||
// | ||
// (undocumented) | ||
bulk(operations: Operation[], options?: RequestOptions): Promise<OperationResponse[]>; | ||
changeFeed(partitionKey: string | number | boolean, changeFeedOptions?: ChangeFeedOptions): ChangeFeedIterator<any>; | ||
changeFeed(changeFeedOptions?: ChangeFeedOptions): ChangeFeedIterator<any>; | ||
changeFeed<T>(partitionKey: string | number | boolean, changeFeedOptions?: ChangeFeedOptions): ChangeFeedIterator<T>; | ||
|
@@ -857,6 +873,8 @@ export class Offers { | |
|
||
// @public (undocumented) | ||
export enum OperationType { | ||
// (undocumented) | ||
Batch = "batch", | ||
// (undocumented) | ||
Create = "create", | ||
// (undocumented) | ||
|
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -543,6 +543,55 @@ export class ClientContext { | |||||
return this.globalEndpointManager.getReadEndpoint(); | ||||||
} | ||||||
|
||||||
public async bulk<T>({ | ||||||
body, | ||||||
path, | ||||||
resourceId, | ||||||
partitionKeyRange, | ||||||
options = {} | ||||||
}: { | ||||||
body: T; | ||||||
path: string; | ||||||
partitionKeyRange: string; | ||||||
resourceId: string; | ||||||
options?: RequestOptions; | ||||||
}) { | ||||||
try { | ||||||
const request: RequestContext = { | ||||||
globalEndpointManager: this.globalEndpointManager, | ||||||
requestAgent: this.cosmosClientOptions.agent, | ||||||
connectionPolicy: this.connectionPolicy, | ||||||
method: HTTPMethod.post, | ||||||
client: this, | ||||||
operationType: OperationType.Batch, | ||||||
path, | ||||||
body, | ||||||
resourceType: ResourceType.item, | ||||||
resourceId, | ||||||
plugins: this.cosmosClientOptions.plugins, | ||||||
options | ||||||
}; | ||||||
|
||||||
request.headers = await this.buildHeaders(request); | ||||||
request.headers[Constants.HttpHeaders.IsBatchRequest] = "True"; | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
request.headers[Constants.HttpHeaders.PartitionKeyRangeID] = partitionKeyRange; | ||||||
request.headers[Constants.HttpHeaders.IsBatchAtomic] = false; | ||||||
southpolesteve marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
|
||||||
this.applySessionToken(request); | ||||||
|
||||||
request.endpoint = await this.globalEndpointManager.resolveServiceEndpoint( | ||||||
request.resourceType, | ||||||
request.operationType | ||||||
); | ||||||
const response = await executePlugins(request, executeRequest, PluginOn.operation); | ||||||
this.captureSessionToken(undefined, path, OperationType.Batch, response.headers); | ||||||
return response; | ||||||
} catch (err) { | ||||||
this.captureSessionToken(err, path, OperationType.Upsert, (err as ErrorResponse).headers); | ||||||
throw err; | ||||||
} | ||||||
} | ||||||
|
||||||
private captureSessionToken( | ||||||
err: ErrorResponse, | ||||||
path: string, | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -9,10 +9,20 @@ import { extractPartitionKey } from "../../extractPartitionKey"; | |
import { FetchFunctionCallback, SqlQuerySpec } from "../../queryExecutionContext"; | ||
import { QueryIterator } from "../../queryIterator"; | ||
import { FeedOptions, RequestOptions } from "../../request"; | ||
import { Container } from "../Container"; | ||
import { Container, PartitionKeyRange } from "../Container"; | ||
import { Item } from "./Item"; | ||
import { ItemDefinition } from "./ItemDefinition"; | ||
import { ItemResponse } from "./ItemResponse"; | ||
import { | ||
Batch, | ||
isKeyInRange, | ||
Operation, | ||
getPartitionKeyToHash, | ||
addPKToOperation, | ||
OperationResponse, | ||
} from "../../utils/batch"; | ||
import { hashV1PartitionKey } from "../../utils/hashing/v1"; | ||
import { hashV2PartitionKey } from "../../utils/hashing/v2"; | ||
|
||
/** | ||
* @ignore | ||
|
@@ -39,7 +49,7 @@ export class Items { | |
constructor( | ||
public readonly container: Container, | ||
private readonly clientContext: ClientContext | ||
) { } | ||
) {} | ||
|
||
/** | ||
* Queries all items. | ||
|
@@ -85,7 +95,7 @@ export class Items { | |
resultFn: (result) => (result ? result.Documents : []), | ||
query, | ||
options: innerOptions, | ||
partitionKey: options.partitionKey | ||
partitionKey: options.partitionKey, | ||
}); | ||
}; | ||
|
||
|
@@ -287,7 +297,7 @@ export class Items { | |
resourceType: ResourceType.item, | ||
resourceId: id, | ||
options, | ||
partitionKey | ||
partitionKey, | ||
}); | ||
|
||
const ref = new Item( | ||
|
@@ -356,7 +366,7 @@ export class Items { | |
resourceType: ResourceType.item, | ||
resourceId: id, | ||
options, | ||
partitionKey | ||
partitionKey, | ||
}); | ||
|
||
const ref = new Item( | ||
|
@@ -373,4 +383,68 @@ export class Items { | |
ref | ||
); | ||
} | ||
|
||
public async bulk( | ||
southpolesteve marked this conversation as resolved.
Show resolved
Hide resolved
|
||
operations: Operation[], | ||
options?: RequestOptions | ||
): Promise<OperationResponse[]> { | ||
const { | ||
resources: partitionKeyRanges, | ||
} = await this.container.readPartitionKeyRanges().fetchAll(); | ||
southpolesteve marked this conversation as resolved.
Show resolved
Hide resolved
|
||
const { resource: definition } = await this.container.getPartitionKeyDefinition(); | ||
const batches: Batch[] = partitionKeyRanges.map((keyRange: PartitionKeyRange) => { | ||
return { | ||
min: keyRange.minInclusive, | ||
max: keyRange.maxExclusive, | ||
rangeId: keyRange.id, | ||
indexes: [], | ||
operations: [], | ||
}; | ||
}); | ||
operations | ||
.map((operation) => addPKToOperation(operation, definition)) | ||
.forEach((operation: Operation, index: number) => { | ||
const partitionProp = definition.paths[0].replace("/", ""); | ||
southpolesteve marked this conversation as resolved.
Show resolved
Hide resolved
|
||
const isV2 = definition.version && definition.version === 2; | ||
const toHashKey = getPartitionKeyToHash(operation, partitionProp); | ||
const hashed = isV2 ? hashV2PartitionKey(toHashKey) : hashV1PartitionKey(toHashKey); | ||
const batchForKey = batches.find((batch: Batch) => { | ||
return isKeyInRange(batch.min, batch.max, hashed); | ||
}); | ||
batchForKey.operations.push(operation); | ||
batchForKey.indexes.push(index); | ||
}); | ||
|
||
const path = getPathFromLink(this.container.url, ResourceType.item); | ||
|
||
const orderedResponses: OperationResponse[] = []; | ||
await Promise.all( | ||
batches | ||
.filter((batch: Batch) => batch.operations.length) | ||
.map(async (batch: Batch) => { | ||
try { | ||
const response = await this.clientContext.bulk({ | ||
body: batch.operations, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Each service request can take 100 operations and be up to 2 MB (recommend ~200 KB unless a single doc is bigger which which case fit only one operation into the request). |
||
partitionKeyRange: batch.rangeId, | ||
path, | ||
resourceId: this.container.url, | ||
options, | ||
}); | ||
response.result.forEach((operationResponse: OperationResponse, index: number) => { | ||
orderedResponses[batch.indexes[index]] = operationResponse; | ||
}); | ||
} catch (err) { | ||
// In the case of 410 errors, we need to recompute the partition key ranges | ||
// and redo the batch request, however, 410 errors occur for unsupported | ||
// partition key types as well since we don't support them, so for now we throw | ||
if (err.code === 410) { | ||
throw new Error( | ||
"Partition key error. Either the partitions have split or an operation has an unsupported partitionKey type" | ||
); | ||
} | ||
} | ||
}) | ||
); | ||
return orderedResponses; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
import { JSONObject } from "../queryExecutionContext"; | ||
import { extractPartitionKey } from "../extractPartitionKey"; | ||
import { PartitionKeyDefinition } from "../documents"; | ||
|
||
export type Operation = | ||
| CreateOperation | ||
| UpsertOperation | ||
| ReadOperation | ||
| DeleteOperation | ||
| ReplaceOperation; | ||
|
||
export interface Batch { | ||
min: string; | ||
max: string; | ||
rangeId: string; | ||
indexes: number[]; | ||
operations: Operation[]; | ||
} | ||
|
||
export interface OperationResponse { | ||
statusCode: number; | ||
requestCharge: number; | ||
eTag?: string; | ||
resourceBody?: JSONObject; | ||
} | ||
|
||
export function isKeyInRange(min: string, max: string, key: string) { | ||
const isAfterMinInclusive = key.localeCompare(min) >= 0; | ||
const isBeforeMax = key.localeCompare(max) < 0; | ||
return isAfterMinInclusive && isBeforeMax; | ||
} | ||
|
||
interface OperationBase { | ||
partitionKey?: string; | ||
ifMatch?: string; | ||
ifNoneMatch?: string; | ||
} | ||
|
||
type OperationWithItem = OperationBase & { | ||
resourceBody: JSONObject; | ||
}; | ||
|
||
type CreateOperation = OperationWithItem & { | ||
operationType: "Create"; | ||
}; | ||
|
||
type UpsertOperation = OperationWithItem & { | ||
operationType: "Upsert"; | ||
}; | ||
|
||
type ReadOperation = OperationBase & { | ||
operationType: "Read"; | ||
id: string; | ||
}; | ||
|
||
type DeleteOperation = OperationBase & { | ||
operationType: "Delete"; | ||
id: string; | ||
}; | ||
|
||
type ReplaceOperation = OperationWithItem & { | ||
operationType: "Replace"; | ||
id: string; | ||
}; | ||
|
||
export function hasResource( | ||
operation: Operation | ||
): operation is CreateOperation | UpsertOperation | ReplaceOperation { | ||
return (operation as OperationWithItem).resourceBody !== undefined; | ||
} | ||
|
||
export function getPartitionKeyToHash(operation: Operation, partitionProperty: string) { | ||
const toHashKey = hasResource(operation) | ||
? (operation.resourceBody as any)[partitionProperty] | ||
: operation.partitionKey.replace(/[\[\]\"\']/g, ""); | ||
// We check for empty object since replace will stringify the value | ||
// The second check avoids cases where the partitionKey value is actually the string '{}' | ||
if (toHashKey === "{}" && operation.partitionKey === "[{}]") { | ||
return {}; | ||
} | ||
return toHashKey; | ||
} | ||
|
||
export function addPKToOperation(operation: Operation, definition: PartitionKeyDefinition) { | ||
if (operation.partitionKey || !hasResource(operation)) { | ||
return operation; | ||
} | ||
const pk = extractPartitionKey(operation.resourceBody, definition); | ||
return { ...operation, partitionKey: JSON.stringify(pk) }; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
partitionKeyRangeId for consistency