Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove ability to reject reentrant ops #20621

Merged
merged 9 commits into from
Apr 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 1 addition & 13 deletions packages/dds/shared-object-base/src/sharedObject.ts
Original file line number Diff line number Diff line change
Expand Up @@ -585,19 +585,7 @@ export abstract class SharedObjectCore<TEvent extends ISharedObjectEvents = ISha
*/
public emit(event: EventEmitterEventType, ...args: any[]): boolean {
return this.callbacksHelper.measure(() => {
// Creating ops while handling a DDS event can lead
// to undefined behavior and events observed in the wrong order.
// For example, we have two callbacks registered for a DDS, A and B.
// Then if on change #1 callback A creates change #2, the invocation flow will be:
//
// A because of #1
// A because of #2
// B because of #2
// B because of #1
//
// The runtime must enforce op coherence by not allowing any ops to be created during
// the event handler
return this.runtime.ensureNoDataModelChanges(() => super.emit(event, ...args));
return super.emit(event, ...args);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -723,7 +723,6 @@ export interface IContainerRuntimeOptions {
readonly chunkSizeInBytes?: number;
readonly compressionOptions?: ICompressionRuntimeOptions;
readonly enableGroupedBatching?: boolean;
readonly enableOpReentryCheck?: boolean;
readonly enableRuntimeIdCompressor?: IdCompressorMode;
readonly explicitSchemaControl?: boolean;
readonly flushMode?: FlushMode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ export function wrapContext(context: IFluidParentContext): IFluidParentContext {
getAudience: (...args) => {
return context.getAudience(...args);
},
// back-compat, to be removed in 2.0
ensureNoDataModelChanges: (...args) => {
return context.ensureNoDataModelChanges(...args);
},
Expand Down
96 changes: 19 additions & 77 deletions packages/runtime/container-runtime/src/containerRuntime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ import {
OpSplitter,
Outbox,
RemoteMessageProcessor,
getLongStack,
} from "./opLifecycle/index.js";
import { pkgVersion } from "./packageVersion.js";
import {
Expand Down Expand Up @@ -459,16 +458,6 @@ export interface IContainerRuntimeOptions {
*/
readonly enableRuntimeIdCompressor?: IdCompressorMode;

/**
* If enabled, the runtime will block all attempts to send an op inside the
* {@link ContainerRuntime#ensureNoDataModelChanges} callback. The callback is used by
* {@link @fluidframework/shared-object-base#SharedObjectCore} for event handlers so enabling this
* will disallow modifying DDSes while handling DDS events.
*
* By default, the feature is disabled. If enabled from options, the `Fluid.ContainerRuntime.DisableOpReentryCheck`
* can be used to disable it at runtime.
*/
readonly enableOpReentryCheck?: boolean;
/**
* If enabled, the runtime will group messages within a batch into a single
* message to be sent to the service.
Expand Down Expand Up @@ -808,7 +797,6 @@ export class ContainerRuntime
maxBatchSizeInBytes = defaultMaxBatchSizeInBytes,
enableRuntimeIdCompressor,
chunkSizeInBytes = defaultChunkSizeInBytes,
enableOpReentryCheck = false,
enableGroupedBatching = false,
explicitSchemaControl = false,
} = runtimeOptions;
Expand Down Expand Up @@ -1021,7 +1009,6 @@ export class ContainerRuntime
chunkSizeInBytes,
// Requires<> drops undefined from IdCompressorType
enableRuntimeIdCompressor: enableRuntimeIdCompressor as "on" | "delayed",
enableOpReentryCheck,
enableGroupedBatching,
explicitSchemaControl,
},
Expand Down Expand Up @@ -1224,14 +1211,6 @@ export class ContainerRuntime

private ensureNoDataModelChangesCalls = 0;

/**
* Tracks the number of detected reentrant ops to report,
* in order to self-throttle the telemetry events.
*
* This should be removed as part of ADO:2322
*/
private opReentryCallsToReport = 5;

/**
* Invokes the given callback and expects that no ops are submitted
* until execution finishes. If an op is submitted, an error will be raised.
Expand Down Expand Up @@ -1265,7 +1244,6 @@ export class ContainerRuntime

private dirtyContainer: boolean;
private emitDirtyDocumentEvent = true;
private readonly enableOpReentryCheck: boolean;
private readonly disableAttachReorder: boolean | undefined;
private readonly closeSummarizerDelayMs: number;
/**
Expand Down Expand Up @@ -1549,14 +1527,6 @@ export class ContainerRuntime
this.validateSummaryHeuristicConfiguration(this.summaryConfiguration);
}

const disableOpReentryCheck = this.mc.config.getBoolean(
"Fluid.ContainerRuntime.DisableOpReentryCheck",
);
this.enableOpReentryCheck =
runtimeOptions.enableOpReentryCheck === true &&
// Allow for a break-glass config to override the options
disableOpReentryCheck !== true;

this.summariesDisabled = this.isSummariesDisabled();
this.maxOpsSinceLastSummary = this.getMaxOpsSinceLastSummary();
this.initialSummarizerDelayMs = this.getInitialSummarizerDelayMs();
Expand Down Expand Up @@ -1880,7 +1850,6 @@ export class ContainerRuntime
idCompressorMode: this.idCompressorMode,
featureGates: JSON.stringify({
...featureGatesForTelemetry,
disableOpReentryCheck,
disableChunking,
disableAttachReorder: this.disableAttachReorder,
disablePartialFlush,
Expand Down Expand Up @@ -2562,20 +2531,25 @@ export class ContainerRuntime
// but will not modify the contents object (likely it will replace it on the message).
const messageCopy = { ...messageArg };
for (const message of this.remoteMessageProcessor.process(messageCopy)) {
if (modernRuntimeMessage) {
this.processCore({
// Cast it since we expect it to be this based on modernRuntimeMessage computation above.
// There is nothing really ensuring that anytime original message.type is Operation that
// the result messages will be so. In the end modern bool being true only directs to
// throw error if ultimately unrecognized without compat details saying otherwise.
message: message as InboundSequencedContainerRuntimeMessage,
local,
modernRuntimeMessage,
});
} else {
// Unrecognized message will be ignored.
this.processCore({ message, local, modernRuntimeMessage });
}
const msg: MessageWithContext = modernRuntimeMessage
? {
// Cast it since we expect it to be this based on modernRuntimeMessage computation above.
// There is nothing really ensuring that anytime original message.type is Operation that
// the result messages will be so. In the end modern bool being true only directs to
// throw error if ultimately unrecognized without compat details saying otherwise.
message: message as InboundSequencedContainerRuntimeMessage,
local,
modernRuntimeMessage,
}
: // Unrecognized message will be ignored.
{
message,
local,
modernRuntimeMessage,
};

// ensure that we observe any re-entrancy, and if needed, rebase ops
this.ensureNoDataModelChanges(() => this.processCore(msg));
}
}

Expand Down Expand Up @@ -3889,7 +3863,6 @@ export class ContainerRuntime
metadata?: { localId: string; blobId?: string },
): void {
this.verifyNotClosed();
this.verifyCanSubmitOps();

// There should be no ops in detached container state!
assert(
Expand Down Expand Up @@ -4044,37 +4017,6 @@ export class ContainerRuntime
}
}

private verifyCanSubmitOps() {
if (this.ensureNoDataModelChangesCalls > 0) {
const errorMessage =
"Op was submitted from within a `ensureNoDataModelChanges` callback";
if (this.opReentryCallsToReport > 0) {
this.mc.logger.sendTelemetryEvent(
{ eventName: "OpReentry" },
// We need to capture the call stack in order to inspect the source of this usage pattern
getLongStack(() => new UsageError(errorMessage)),
);
this.opReentryCallsToReport--;
}

// Creating ops while processing ops can lead
// to undefined behavior and events observed in the wrong order.
// For example, we have two callbacks registered for a DDS, A and B.
// Then if on change #1 callback A creates change #2, the invocation flow will be:
//
// A because of #1
// A because of #2
// B because of #2
// B because of #1
//
// The runtime must enforce op coherence by not allowing ops to be submitted
// while ops are being processed.
if (this.enableOpReentryCheck) {
throw new UsageError(errorMessage);
}
}
}

private reSubmitBatch(batch: IPendingBatchMessage[]) {
this.orderSequentially(() => {
for (const message of batch) {
Expand Down
1 change: 1 addition & 0 deletions packages/runtime/container-runtime/src/dataStoreContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ export abstract class FluidDataStoreContext
return this._containerRuntime;
}

// back-compat, to be removed in 2.0
public ensureNoDataModelChanges<T>(callback: () => T): T {
return this.parentContext.ensureNoDataModelChanges(callback);
}
Expand Down
148 changes: 0 additions & 148 deletions packages/runtime/container-runtime/src/test/containerRuntime.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -542,151 +542,6 @@ describe("Runtime", () => {
});
}));

describe("Op reentry enforcement", () => {
let containerRuntime: ContainerRuntime;

it("By default, don't enforce the op reentry check", async () => {
containerRuntime = await ContainerRuntime.loadRuntime({
context: getMockContext() as IContainerContext,
registryEntries: [],
provideEntryPoint: mockProvideEntryPoint,
existing: false,
});

assert.ok(
containerRuntime.ensureNoDataModelChanges(() => {
submitDataStoreOp(containerRuntime, "id", "test");
return true;
}),
);

assert.ok(
containerRuntime.ensureNoDataModelChanges(() =>
containerRuntime.ensureNoDataModelChanges(() =>
containerRuntime.ensureNoDataModelChanges(() => {
submitDataStoreOp(containerRuntime, "id", "test");
return true;
}),
),
),
);
});

it("If option enabled, enforce the op reentry check", async () => {
containerRuntime = await ContainerRuntime.loadRuntime({
context: getMockContext() as IContainerContext,
registryEntries: [],
runtimeOptions: {
enableOpReentryCheck: true,
},
provideEntryPoint: mockProvideEntryPoint,
existing: false,
});

assert.throws(() =>
containerRuntime.ensureNoDataModelChanges(() =>
submitDataStoreOp(containerRuntime, "id", "test"),
),
);

assert.throws(() =>
containerRuntime.ensureNoDataModelChanges(() =>
containerRuntime.ensureNoDataModelChanges(() =>
containerRuntime.ensureNoDataModelChanges(() =>
submitDataStoreOp(containerRuntime, "id", "test"),
),
),
),
);
});

it("If option enabled but disabled via feature gate, don't enforce the op reentry check", async () => {
containerRuntime = await ContainerRuntime.loadRuntime({
context: getMockContext({
"Fluid.ContainerRuntime.DisableOpReentryCheck": true,
}) as IContainerContext,
registryEntries: [],
runtimeOptions: {
enableOpReentryCheck: true,
},
provideEntryPoint: mockProvideEntryPoint,
existing: false,
});

containerRuntime.ensureNoDataModelChanges(() =>
submitDataStoreOp(containerRuntime, "id", "test"),
);

containerRuntime.ensureNoDataModelChanges(() =>
containerRuntime.ensureNoDataModelChanges(() =>
containerRuntime.ensureNoDataModelChanges(() =>
submitDataStoreOp(containerRuntime, "id", "test"),
),
),
);
});

it("Report at most 5 reentrant ops", async () => {
const mockLogger = new MockLogger();
containerRuntime = await ContainerRuntime.loadRuntime({
context: getMockContext({}, mockLogger) as IContainerContext,
registryEntries: [],
provideEntryPoint: mockProvideEntryPoint,
existing: false,
});

mockLogger.clear();
containerRuntime.ensureNoDataModelChanges(() => {
for (let i = 0; i < 10; i++) {
submitDataStoreOp(containerRuntime, "id", "test");
}
});

// We expect only 5 events
mockLogger.assertMatchStrict(
Array.from(Array(5).keys()).map(() => ({
eventName: "ContainerRuntime:OpReentry",
error: "Op was submitted from within a `ensureNoDataModelChanges` callback",
})),
);
});

it("Can't call flush() inside ensureNoDataModelChanges's callback", async () => {
containerRuntime = await ContainerRuntime.loadRuntime({
context: getMockContext() as IContainerContext,
registryEntries: [],
runtimeOptions: {
flushMode: FlushMode.Immediate,
},
provideEntryPoint: mockProvideEntryPoint,
existing: false,
});

assert.throws(() =>
containerRuntime.ensureNoDataModelChanges(() => {
containerRuntime.orderSequentially(() => {});
}),
);
});

it("Can't create an infinite ensureNoDataModelChanges recursive call ", async () => {
containerRuntime = await ContainerRuntime.loadRuntime({
context: getMockContext() as IContainerContext,
registryEntries: [],
provideEntryPoint: mockProvideEntryPoint,
existing: false,
});

const callback = () => {
containerRuntime.ensureNoDataModelChanges(() => {
submitDataStoreOp(containerRuntime, "id", "test");
callback();
});
};
assert.throws(() => callback());
});
});

describe("orderSequentially with rollback", () =>
[
FlushMode.TurnBased,
Expand Down Expand Up @@ -1569,7 +1424,6 @@ describe("Runtime", () => {
maxBatchSizeInBytes: 700 * 1024,
chunkSizeInBytes: 204800,
enableRuntimeIdCompressor: undefined,
enableOpReentryCheck: false,
enableGroupedBatching: false,
explicitSchemaControl: false,
} satisfies IContainerRuntimeOptions;
Expand Down Expand Up @@ -1598,7 +1452,6 @@ describe("Runtime", () => {
const featureGates = {
"Fluid.ContainerRuntime.CompressionDisabled": true,
"Fluid.ContainerRuntime.CompressionChunkingDisabled": true,
"Fluid.ContainerRuntime.DisableOpReentryCheck": false,
"Fluid.ContainerRuntime.IdCompressorEnabled": true,
"Fluid.ContainerRuntime.DisableGroupedBatching": true,
};
Expand All @@ -1619,7 +1472,6 @@ describe("Runtime", () => {
featureGates: JSON.stringify({
disableGroupedBatching: true,
disableCompression: true,
disableOpReentryCheck: false,
disableChunking: true,
}),
groupedBatchingEnabled: false,
Expand Down
Loading
Loading