Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Experimental: xstate example #61

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open

Experimental: xstate example #61

wants to merge 5 commits into from

Conversation

jackkleeman
Copy link
Contributor

No description provided.

const system = await createSystem(ctx, api, systemName)
const snapshot = await ctx.get<Snapshot<unknown>>("snapshot") ?? undefined;

const parent: FakeParent<TLogic> = {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hack; I can't find a way to create an actor with a particular system set, so I had to create a 'fake' parent for the new actor which has a system already set

if (request.scheduledEvent) {
const events = await ctx.get<{ [key: string]: SerialisableScheduledEvent }>("events") ?? {}
const scheduledEventId = createScheduledEventId(request.scheduledEvent.source, request.scheduledEvent.id)
if (!(scheduledEventId in events)) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Restate doesn't yet support directly cancellable delayed calls, so I sort of implement it myself by just checking to see if the event is still scheduled, and also that it hasn't been replaced

_bookId: () => ctx.rand.uuidv4(),
_register: (sessionId, actorRef) => {
if (actorRef.id in childrenByID) {
// rehydration case; ensure session ID maintains continuity
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

general question; is my use of sessionid right? i treat it as a particular execution of an actor, so we want the id to be unique, but also deterministic on rehydrate (ie, on the next event where we start up the state machine all over again). thats why I deliberately store session ids in state, and then correlate them back with the rehydrated children using id in order to set the same session ids

api,
systemName,

_bookId: () => ctx.rand.uuidv4(),
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of persisting a counter just using deterministic uuids here

>(server: restate.RestateServer, path: string, logic: TLogic): restate.RestateServer => {
return server
.bindKeyedRouter(path, restate.keyedRouter(actorMethods(path, logic)))
.bindRouter(`${path}.promises`, restate.router(promiseMethods(path, logic)))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a separate service to offload long-running promise execution to, so that the main event loop isnt blocked (restate processes events in a keyed service in series). this is more of a restate hack right now, we would like a way to have non locking methods on the same service

});

// note that we sent off the promise so we don't do it again
(system as any)._relay(self, self, {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This differs slightly from the built in fromPromise actor because i think there we just rely on in-memory state to keep track of the promise having already been run, whereas this needs to be resistant to rehydration, i guess? in general we may have a problem of re-executing some types of actions, I think

return [actor.src, ...actorSrc(actor._parent)]
}

export const promiseMethods = <TLogic extends AnyStateMachine>(path: string, logic: TLogic) => {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this stuff all runs in a separate restate service that is unkeyed (ie non locking) but has access to the same state machine logic. the idea is that it gets called with all the information it needs to find the promise creating function in the state machine (could be in a deep chain of nested state machines) and also the inputs to call that function with. it then calls, awaits, and fires an event back to the main loop on completion

basically just using the restate async runtime instead of the JS async runtime

return logic;
}

function actorSrc(actor?: AnyActorRef): string[] {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tries to determine the chain of src fields which is the best way i could find to derive a 'path' through the state machine definition that helps you find the promise creator

const [promiseSrc, ...machineSrcs] = srcs

let stateMachine: AnyStateMachine = logic;
for (const src of machineSrcs) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

apply the 'path' through the state machine definitions, checking for each one in either the implementations or directly in the defintiion

const observers = new Set<Observer<InspectionEvent>>();

const scheduler = {
schedule(_source: AnyActorRef, _target: AnyActorRef, event: EventObject, delay: number, id: string | undefined): void {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

scheduler implementation is the first way we can do stuff across time; allows us to delay sending events by using restates sendDelayed abstraction. however this is not currently cancellable so we need to keep track of what we have sent and cancelled and check again on arrival

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant