Skip to content

Commit

Permalink
Add Workflow API e2e test (#264)
Browse files Browse the repository at this point in the history
  • Loading branch information
slinkydeveloper authored Feb 13, 2024
1 parent 5756ec6 commit b0a027b
Show file tree
Hide file tree
Showing 12 changed files with 412 additions and 31 deletions.
5 changes: 5 additions & 0 deletions contracts/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ plugins {
}

dependencies {
annotationProcessor(libs.restate.sdk.api.gen)

compileOnly(libs.javax.annotation.api)

api(libs.protobuf.java)
Expand All @@ -26,6 +28,9 @@ dependencies {
api(libs.grpc.kotlin.stub) { exclude("javax.annotation", "javax.annotation-api") }

protobuf(libs.restate.sdk.common)

api(libs.restate.sdk.workflow.api)
api(libs.restate.sdk.jackson)
}

protobuf {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate e2e tests,
// which are released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/e2e/blob/main/LICENSE

package my.restate.e2e.services;

import dev.restate.sdk.annotation.Service;
import dev.restate.sdk.annotation.ServiceType;
import dev.restate.sdk.annotation.Shared;
import dev.restate.sdk.annotation.Workflow;
import dev.restate.sdk.common.StateKey;
import dev.restate.sdk.workflow.WorkflowContext;
import dev.restate.sdk.workflow.WorkflowSharedContext;

@Service(ServiceType.WORKFLOW)
public interface WorkflowAPIBlockAndWait {

@Workflow
String blockAndWait(WorkflowContext context, String input);

@Shared
void unblock(WorkflowSharedContext context, String output);

StateKey<String> MY_STATE = StateKey.string("my-state");
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@
import dev.restate.e2e.services.upgradetest.UpgradeTestServiceGrpc;
import dev.restate.sdk.http.vertx.RestateHttpEndpointBuilder;
import java.util.Objects;
import my.restate.e2e.services.WorkflowAPIBlockAndWaitImpl;
import my.restate.e2e.services.WorkflowAPIBlockAndWaitServiceAdapter;

public class Main {

Expand All @@ -49,8 +51,9 @@ public static void main(String[] args) {
"SERVICES env variable needs to specify which service to run.");

RestateHttpEndpointBuilder restateHttpEndpointBuilder = RestateHttpEndpointBuilder.builder();
for (String service : env.split(",")) {
switch (service.trim()) {
for (String svc : env.split(",")) {
String fqsn = svc.trim();
switch (fqsn) {
case ListServiceGrpc.SERVICE_NAME:
restateHttpEndpointBuilder.withService(new ListService());
break;
Expand Down Expand Up @@ -95,6 +98,10 @@ public static void main(String[] args) {
case MapServiceGrpc.SERVICE_NAME:
restateHttpEndpointBuilder.withService(new MapService());
break;
case WorkflowAPIBlockAndWaitServiceAdapter.SERVICE_NAME:
restateHttpEndpointBuilder.with(
new WorkflowAPIBlockAndWaitImpl(), new WorkflowAPIBlockAndWaitServiceAdapter());
break;
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate e2e tests,
// which are released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/e2e/blob/main/LICENSE

package my.restate.e2e.services;

import dev.restate.sdk.common.TerminalException;
import dev.restate.sdk.workflow.DurablePromiseKey;
import dev.restate.sdk.workflow.WorkflowContext;
import dev.restate.sdk.workflow.WorkflowSharedContext;

public class WorkflowAPIBlockAndWaitImpl implements WorkflowAPIBlockAndWait {

private static final DurablePromiseKey<String> MY_DURABLE_PROMISE =
DurablePromiseKey.string("durable-promise");

@Override
public String blockAndWait(WorkflowContext context, String input) {
context.set(MY_STATE, input);

// Wait on unblock
String output = context.durablePromise(MY_DURABLE_PROMISE).awaitable().await();

if (!context.durablePromise(MY_DURABLE_PROMISE).isCompleted()) {
throw new TerminalException("Durable promise should be completed");
}
if (context.durablePromise(MY_DURABLE_PROMISE).peek().isEmpty()) {
throw new TerminalException("Durable promise should be completed");
}

return output;
}

@Override
public void unblock(WorkflowSharedContext context, String output) {
context.durablePromiseHandle(MY_DURABLE_PROMISE).resolve(output);
}
}
25 changes: 20 additions & 5 deletions services/node-services/src/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ import {
CancelTestServiceFQN,
BlockingService,
} from "./cancel_test";
import {
WorkflowAPIBlockAndWait,
WorkflowAPIBlockAndWaitFQN,
} from "./workflow";

let serverBuilder;
export let handler: (event: any) => Promise<any>;

Check warning on line 80 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 80 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 80 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 80 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
Expand All @@ -82,7 +86,7 @@ if (process.env.AWS_LAMBDA_FUNCTION_NAME) {

const services = new Map<
string,
restate.ServiceOpts | { router: any } | { keyedRouter: any }
restate.ServiceOpts | { router: any } | { keyedRouter: any } | { bundle: any }

Check warning on line 89 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 89 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 89 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 89 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 89 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 89 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
>([
[
CounterServiceFQN,
Expand Down Expand Up @@ -248,6 +252,12 @@ const services = new Map<
instance: new MapService(),
},
],
[
WorkflowAPIBlockAndWaitFQN,
{
bundle: WorkflowAPIBlockAndWait,
},
],
]);
console.log("Known services: " + services.keys());

Expand All @@ -269,20 +279,25 @@ if (process.env.SERVICES) {
serverBuilder = serverBuilder.bindService(
foundService as restate.ServiceOpts
);
} else if (
(foundService as restate.UnKeyedRouter<any>).router !== undefined
) {
} else if ((foundService as { router: any }).router !== undefined) {

Check warning on line 282 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 282 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
console.log("Mounting router " + service);
serverBuilder = serverBuilder.bindRouter(
service,
(foundService as { router: any }).router

Check warning on line 286 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 286 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
);
} else {
} else if (
(foundService as { keyedRouter: any }).keyedRouter !== undefined

Check warning on line 289 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 289 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
) {
console.log("Mounting keyed router " + service);
serverBuilder = serverBuilder.bindKeyedRouter(
service,
(foundService as { keyedRouter: any }).keyedRouter

Check warning on line 294 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 294 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
);
} else {
console.log("Mounting bundle " + service);
serverBuilder = serverBuilder.bind(
(foundService as { bundle: any }).bundle

Check warning on line 299 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type

Check warning on line 299 in services/node-services/src/app.ts

View workflow job for this annotation

GitHub Actions / build

Unexpected any. Specify a different type
);
}
}

Expand Down
47 changes: 47 additions & 0 deletions services/node-services/src/workflow.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2024 - Restate Software, Inc., Restate GmbH
//
// This file is part of the Restate e2e tests,
// which are released under the MIT license.
//
// You can find a copy of the license in file LICENSE in the root
// directory of this repository or package, or at
// https://github.com/restatedev/e2e/blob/main/LICENSE

import * as restate from "@restatedev/restate-sdk";

const MY_STATE = "my-state";
const MY_DURABLE_PROMISE = "durable-promise";

export const WorkflowAPIBlockAndWaitFQN = "WorkflowAPIBlockAndWait";

export const WorkflowAPIBlockAndWait = restate.workflow.workflow(
WorkflowAPIBlockAndWaitFQN,
{
run: async (ctx: restate.workflow.WfContext, params: { input: string }) => {
ctx.console.log("input: " + JSON.stringify(params));
ctx.set(MY_STATE, params.input);

// Wait on unblock
const p = ctx.promise<string>(MY_DURABLE_PROMISE);
const output = await p.promise();

// Check peek works
ctx.console.assert((await p.peek()) == output);

return output;
},

unblock: async (
ctx: restate.workflow.SharedWfContext,
params: { output: string }
) => {
ctx.promise<string>(MY_DURABLE_PROMISE).resolve(params.output);
},

getState: async (
ctx: restate.workflow.SharedWfContext
): Promise<string> => {
return (await ctx.get(MY_STATE)) ?? "(not yet set)";
},
}
);
4 changes: 4 additions & 0 deletions settings.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,10 @@ dependencyResolutionManagement {
library("restate-sdk-common", "dev.restate", "sdk-common").versionRef("restate")
library("restate-admin", "dev.restate", "admin-client").versionRef("restate")
library("restate-sdk-api", "dev.restate", "sdk-api").versionRef("restate")
library("restate-sdk-api-gen", "dev.restate", "sdk-api-gen").versionRef("restate")
library("restate-sdk-jackson", "dev.restate", "sdk-serde-jackson").versionRef("restate")
library("restate-sdk-http-vertx", "dev.restate", "sdk-http-vertx").versionRef("restate")
library("restate-sdk-workflow-api", "dev.restate", "sdk-workflow-api").versionRef("restate")

library("protoc", "com.google.protobuf", "protoc").versionRef("protobuf")
library("protobuf-java", "com.google.protobuf", "protobuf-java").versionRef("protobuf")
Expand Down Expand Up @@ -93,6 +95,8 @@ if (!System.getenv("JAVA_SDK_LOCAL_BUILD").isNullOrEmpty()) {
substitute(module("dev.restate:admin-client")).using(project(":admin-client"))
substitute(module("dev.restate:sdk-common")).using(project(":sdk-common"))
substitute(module("dev.restate:sdk-api")).using(project(":sdk-api"))
substitute(module("dev.restate:sdk-api-gen")).using(project(":sdk-api-gen"))
substitute(module("dev.restate:sdk-workflow-api")).using(project(":sdk-workflow-api"))
substitute(module("dev.restate:sdk-http-vertx")).using(project(":sdk-http-vertx"))
substitute(module("dev.restate:sdk-serde-jackson")).using(project(":sdk-serde-jackson"))
}
Expand Down
2 changes: 0 additions & 2 deletions tests/src/test/kotlin/dev/restate/e2e/AwaitTimeoutTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ class NodeAwaitTimeoutTest : BaseAwaitTimeoutTest() {
}
}

// Only a Java test because the typescript SDK is still lacking this feature:
// https://github.com/restatedev/sdk-typescript/issues/21
@Tag("always-suspending")
abstract class BaseAwaitTimeoutTest {

Expand Down
9 changes: 9 additions & 0 deletions tests/src/test/kotlin/dev/restate/e2e/Containers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import dev.restate.e2e.services.singletoncounter.SingletonCounterGrpc
import dev.restate.e2e.services.verification.interpreter.CommandInterpreterGrpc
import dev.restate.e2e.services.verification.verifier.CommandVerifierGrpc
import dev.restate.e2e.utils.ServiceSpec
import my.restate.e2e.services.WorkflowAPIBlockAndWaitServiceAdapter
import org.testcontainers.containers.GenericContainer

object Containers {
Expand Down Expand Up @@ -72,6 +73,10 @@ object Containers {
"HTTP_SERVER_ADDRESS", "http://${INT_SORTER_HTTP_SERVER_CONTAINER_SPEC.first}:8080")
.build()

val JAVA_WORKFLOW_SERVICE_SPEC =
javaServicesContainer("java-workflow", WorkflowAPIBlockAndWaitServiceAdapter.SERVICE_NAME)
.build()

// -- Node containers

fun nodeServicesContainer(hostName: String, vararg services: String): ServiceSpec.Builder {
Expand Down Expand Up @@ -112,6 +117,10 @@ object Containers {
val NODE_HANDLER_API_ECHO_TEST_SERVICE_SPEC =
nodeServicesContainer("node-proxy", HANDLER_API_ECHO_TEST_SERVICE_NAME).build()

const val WORKFLOW_API_BLOCK_AND_WAIT_SERVICE_NAME = "WorkflowAPIBlockAndWait"
val NODE_WORKFLOW_SERVICE_SPEC =
nodeServicesContainer("node-workflow", WORKFLOW_API_BLOCK_AND_WAIT_SERVICE_NAME).build()

const val EMBEDDED_HANDLER_SERVER_HOSTNAME = "node-embedded-handler"
const val EMBEDDED_HANDLER_SERVER_PORT = 8080

Expand Down
16 changes: 16 additions & 0 deletions tests/src/test/kotlin/dev/restate/e2e/Utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import java.net.http.HttpClient
import java.net.http.HttpRequest
import java.net.http.HttpResponse
import java.nio.charset.StandardCharsets
import org.assertj.core.api.Assertions.assertThat

object Utils {

Expand All @@ -41,4 +42,19 @@ object Utils {
.build()
return httpClient.send(req, jacksonBodyHandler())
}

fun doJsonRequestToService(
restateEndpoint: String,
service: String,
method: String,
reqBody: Any
): JsonNode {
val res = postJsonRequest("${restateEndpoint}${service}/${method}", reqBody)
assertThat(res.statusCode()).isEqualTo(200)
assertThat(res.headers().firstValue("content-type"))
.get()
.asString()
.contains("application/json")
return res.body()
}
}
Loading

0 comments on commit b0a027b

Please sign in to comment.