Skip to content

Commit

Permalink
Add WorkflowHandle (#329)
Browse files Browse the repository at this point in the history
* Add workflow attach/handle

* Add workflowHandle() generated method, to be similar to TS SDK
  • Loading branch information
slinkydeveloper authored May 22, 2024
1 parent 026e10f commit 3f7cd44
Show file tree
Hide file tree
Showing 7 changed files with 202 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public class ServiceProcessor extends AbstractProcessor {
private HandlebarsTemplateEngine serviceDefinitionFactoryCodegen;
private HandlebarsTemplateEngine clientCodegen;

private static final Set<String> RESERVED_METHOD_NAMES = Set.of("send", "submit");
private static final Set<String> RESERVED_METHOD_NAMES =
Set.of("send", "submit", "workflowHandle");

@Override
public synchronized void init(ProcessingEnvironment processingEnv) {
Expand Down
26 changes: 17 additions & 9 deletions sdk-api-gen/src/main/resources/templates/Client.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -79,34 +79,42 @@ public class {{generatedClassSimpleName}} {
}

{{#handlers}}{{#if isWorkflow}}
public dev.restate.sdk.client.IngressClient.InvocationHandle<{{{boxedOutputFqcn}}}> submit({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) {
public dev.restate.sdk.client.IngressClient.WorkflowHandle<{{{boxedOutputFqcn}}}> workflowHandle() {
return IngressClient.this.ingressClient.workflowHandle(
{{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME,
this.key,
{{outputSerdeRef}});
}

public dev.restate.sdk.client.IngressClient.WorkflowHandle<{{{boxedOutputFqcn}}}> submit({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) {
return this.submit(
{{^inputEmpty}}req, {{/inputEmpty}}
dev.restate.sdk.client.RequestOptions.DEFAULT);
}

public dev.restate.sdk.client.IngressClient.InvocationHandle<{{{boxedOutputFqcn}}}> submit({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) {
return IngressClient.this.ingressClient.submit(
public dev.restate.sdk.client.IngressClient.WorkflowHandle<{{{boxedOutputFqcn}}}> submit({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) {
IngressClient.this.ingressClient.send(
{{{targetExpr this "this.key"}}},
{{inputSerdeRef}},
{{outputSerdeRef}},
{{#if inputEmpty}}null{{else}}req{{/if}},
null,
requestOptions);
return this.workflowHandle();
}

public java.util.concurrent.CompletableFuture<dev.restate.sdk.client.IngressClient.InvocationHandle<{{{boxedOutputFqcn}}}>> submitAsync({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) {
public java.util.concurrent.CompletableFuture<dev.restate.sdk.client.IngressClient.WorkflowHandle<{{{boxedOutputFqcn}}}>> submitAsync({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) {
return this.submitAsync(
{{^inputEmpty}}req, {{/inputEmpty}}
dev.restate.sdk.client.RequestOptions.DEFAULT);
}

public java.util.concurrent.CompletableFuture<dev.restate.sdk.client.IngressClient.InvocationHandle<{{{boxedOutputFqcn}}}>> submitAsync({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) {
return IngressClient.this.ingressClient.submitAsync(
public java.util.concurrent.CompletableFuture<dev.restate.sdk.client.IngressClient.WorkflowHandle<{{{boxedOutputFqcn}}}>> submitAsync({{^inputEmpty}}{{{inputFqcn}}} req, {{/inputEmpty}}dev.restate.sdk.client.RequestOptions requestOptions) {
return IngressClient.this.ingressClient.sendAsync(
{{{targetExpr this "this.key"}}},
{{inputSerdeRef}},
{{outputSerdeRef}},
{{#if inputEmpty}}null{{else}}req{{/if}},
requestOptions);
null,
requestOptions).thenApply(id -> this.workflowHandle());
}
{{else}}
public {{#if outputEmpty}}void{{else}}{{{outputFqcn}}}{{/if}} {{methodName}}({{^inputEmpty}}{{{inputFqcn}}} req{{/inputEmpty}}) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class ServiceProcessor(private val logger: KSPLogger, private val codeGenerator:
SymbolProcessor {

companion object {
private val RESERVED_METHOD_NAMES: Set<String> = setOf("send", "submit")
private val RESERVED_METHOD_NAMES: Set<String> = setOf("send", "submit", "workflowHandle")
}

private val bindableServiceFactoryCodegen: HandlebarsTemplateEngine =
Expand Down
16 changes: 12 additions & 4 deletions sdk-api-kotlin-gen/src/main/resources/templates/Client.hbs
Original file line number Diff line number Diff line change
Expand Up @@ -52,17 +52,25 @@ object {{generatedClassSimpleName}} {
class IngressClient(private val ingressClient: dev.restate.sdk.client.IngressClient{{#isKeyed}}, private val key: String{{/isKeyed}}) {

{{#handlers}}{{#if isWorkflow}}
suspend fun submit({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.RequestOptions = dev.restate.sdk.client.RequestOptions.DEFAULT): dev.restate.sdk.client.IngressClient.InvocationHandle<{{{boxedOutputFqcn}}}> {
return this.ingressClient.submitSuspend(
fun workflowHandle(): dev.restate.sdk.client.IngressClient.WorkflowHandle<{{{boxedOutputFqcn}}}> {
return [email protected](
{{generatedClassSimpleNamePrefix}}Definitions.SERVICE_NAME,
this.key,
{{outputSerdeRef}});
}

suspend fun submit({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.RequestOptions = dev.restate.sdk.client.RequestOptions.DEFAULT): dev.restate.sdk.client.IngressClient.WorkflowHandle<{{{boxedOutputFqcn}}}> {
[email protected](
{{{targetExpr this "this.key"}}},
{{inputSerdeRef}},
{{outputSerdeRef}},
{{#if inputEmpty}}Unit{{else}}req{{/if}},
kotlin.time.Duration.ZERO,
requestOptions);
return this.workflowHandle()
}
{{else}}
suspend fun {{methodName}}({{^inputEmpty}}req: {{{inputFqcn}}}, {{/inputEmpty}}requestOptions: dev.restate.sdk.client.CallRequestOptions = dev.restate.sdk.client.CallRequestOptions.DEFAULT): {{{boxedOutputFqcn}}} {
return this.ingressClient.callSuspend(
return this@IngressClient.ingressClient.callSuspend(
{{{targetExpr this "this.key"}}},
{{inputSerdeRef}},
{{outputSerdeRef}},
Expand Down
25 changes: 13 additions & 12 deletions sdk-api-kotlin/src/main/kotlin/dev/restate/sdk/kotlin/ingress.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// https://github.com/restatedev/sdk-java/blob/main/LICENSE
package dev.restate.sdk.kotlin

import dev.restate.sdk.client.CallRequestOptions
import dev.restate.sdk.client.IngressClient
import dev.restate.sdk.client.RequestOptions
import dev.restate.sdk.common.Serde
Expand All @@ -24,7 +23,7 @@ suspend fun <Req, Res> IngressClient.callSuspend(
reqSerde: Serde<Req>,
resSerde: Serde<Res>,
req: Req,
options: CallRequestOptions = CallRequestOptions.DEFAULT
options: RequestOptions = RequestOptions.DEFAULT
): Res {
return this.callAsync(target, reqSerde, resSerde, req, options).await()
}
Expand All @@ -34,7 +33,7 @@ suspend fun <Req> IngressClient.sendSuspend(
reqSerde: Serde<Req>,
req: Req,
delay: Duration = Duration.ZERO,
options: CallRequestOptions = CallRequestOptions.DEFAULT
options: RequestOptions = RequestOptions.DEFAULT
): String {
return this.sendAsync(target, reqSerde, req, delay.toJavaDuration(), options).await()
}
Expand All @@ -54,23 +53,25 @@ suspend fun IngressClient.AwakeableHandle.rejectSuspend(
this.rejectAsync(reason, options).await()
}

suspend fun <Req, Res> IngressClient.submitSuspend(
target: Target,
reqSerde: Serde<Req>,
resSerde: Serde<Res>,
req: Req,
suspend fun <T> IngressClient.InvocationHandle<T>.attachSuspend(
options: RequestOptions = RequestOptions.DEFAULT
): IngressClient.InvocationHandle<Res> {
return this.submitAsync(target, reqSerde, resSerde, req, options).await()
) {
this.attachAsync(options).await()
}

suspend fun <T> IngressClient.InvocationHandle<T>.attachSuspend(
suspend fun <T> IngressClient.InvocationHandle<T>.getOutputSuspend(
options: RequestOptions = RequestOptions.DEFAULT
) {
this.getOutputAsync(options).await()
}

suspend fun <T> IngressClient.WorkflowHandle<T>.attachSuspend(
options: RequestOptions = RequestOptions.DEFAULT
) {
this.attachAsync(options).await()
}

suspend fun <T> IngressClient.InvocationHandle<T>.getOutputSuspend(
suspend fun <T> IngressClient.WorkflowHandle<T>.getOutputSuspend(
options: RequestOptions = RequestOptions.DEFAULT
) {
this.getOutputAsync(options).await()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,11 @@
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URLEncoder;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -45,7 +47,7 @@ public <Req, Res> CompletableFuture<Res> callAsync(
Serde<Req> reqSerde,
Serde<Res> resSerde,
Req req,
CallRequestOptions requestOptions) {
RequestOptions requestOptions) {
HttpRequest request = prepareHttpRequest(target, false, reqSerde, req, null, requestOptions);
return httpClient
.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
Expand All @@ -70,7 +72,7 @@ public <Req, Res> CompletableFuture<Res> callAsync(

@Override
public <Req> CompletableFuture<String> sendAsync(
Target target, Serde<Req> reqSerde, Req req, Duration delay, CallRequestOptions options) {
Target target, Serde<Req> reqSerde, Req req, Duration delay, RequestOptions options) {
return sendAsyncInner(target, reqSerde, req, delay, options);
}

Expand Down Expand Up @@ -171,16 +173,14 @@ public CompletableFuture<Void> rejectAsync(String reason, RequestOptions options
};
}

@Override
public <Req, Res> CompletableFuture<InvocationHandle<Res>> submitAsync(
Target target, Serde<Req> reqSerde, Serde<Res> resSerde, Req req, RequestOptions options) {
return this.sendAsyncInner(target, reqSerde, req, null, options)
.thenApply(id -> this.invocationHandle(id, resSerde));
}

@Override
public <Res> InvocationHandle<Res> invocationHandle(String invocationId, Serde<Res> resSerde) {
return new InvocationHandle<>() {
@Override
public String invocationId() {
return invocationId;
}

@Override
public CompletableFuture<Res> attachAsync(RequestOptions options) {
// Prepare request
Expand Down Expand Up @@ -257,11 +257,103 @@ public CompletableFuture<Res> getOutputAsync(RequestOptions options) {
};
}

@Override
public <Res> WorkflowHandle<Res> workflowHandle(
String workflowName, String workflowId, Serde<Res> resSerde) {
return new WorkflowHandle<>() {
@Override
public CompletableFuture<Res> attachAsync(RequestOptions options) {
// Prepare request
var reqBuilder =
HttpRequest.newBuilder()
.uri(
baseUri.resolve(
"/restate/workflow/"
+ workflowName
+ "/"
+ URLEncoder.encode(workflowId, StandardCharsets.UTF_8)
+ "/attach"));

// Add headers
headers.forEach(reqBuilder::header);
options.getAdditionalHeaders().forEach(reqBuilder::header);

// Build and Send request
HttpRequest request = reqBuilder.GET().build();
return httpClient
.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
.handle(
(response, throwable) -> {
if (throwable != null) {
throw new IngressException("Error when executing the request", throwable);
}

if (response.statusCode() >= 300) {
handleNonSuccessResponse(response);
}

try {
return resSerde.deserialize(response.body());
} catch (Exception e) {
throw new IngressException(
"Cannot deserialize the response",
response.statusCode(),
response.body(),
e);
}
});
}

@Override
public CompletableFuture<Res> getOutputAsync(RequestOptions options) {
// Prepare request
var reqBuilder =
HttpRequest.newBuilder()
.uri(
baseUri.resolve(
"/restate/workflow/"
+ workflowName
+ "/"
+ URLEncoder.encode(workflowId, StandardCharsets.UTF_8)
+ "/output"));

// Add headers
headers.forEach(reqBuilder::header);
options.getAdditionalHeaders().forEach(reqBuilder::header);

// Build and Send request
HttpRequest request = reqBuilder.GET().build();
return httpClient
.sendAsync(request, HttpResponse.BodyHandlers.ofByteArray())
.handle(
(response, throwable) -> {
if (throwable != null) {
throw new IngressException("Error when executing the request", throwable);
}

if (response.statusCode() >= 300) {
handleNonSuccessResponse(response);
}

try {
return resSerde.deserialize(response.body());
} catch (Exception e) {
throw new IngressException(
"Cannot deserialize the response",
response.statusCode(),
response.body(),
e);
}
});
}
};
}

private URI toRequestURI(Target target, boolean isSend, Duration delay) {
StringBuilder builder = new StringBuilder();
builder.append("/").append(target.getService());
if (target.getKey() != null) {
builder.append("/").append(target.getKey());
builder.append("/").append(URLEncoder.encode(target.getKey(), StandardCharsets.UTF_8));
}
builder.append("/").append(target.getHandler());
if (isSend) {
Expand Down
Loading

0 comments on commit 3f7cd44

Please sign in to comment.