Skip to content

Commit

Permalink
Added support for binary serialization
Browse files Browse the repository at this point in the history
  • Loading branch information
stidsborg committed Oct 20, 2024
1 parent 1a43766 commit 4974e68
Show file tree
Hide file tree
Showing 53 changed files with 418 additions and 391 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ public class LeaseUpdaterTestFunctionStore : IFunctionStore

public Task<bool> CreateFunction(
FlowId flowId,
string? param,
byte[]? param,
long leaseExpiration,
long? postponeUntil,
long timestamp
Expand All @@ -53,15 +53,15 @@ public Task<IReadOnlyList<FlowInstance>> GetSucceededFunctions(FlowType flowType

public Task<bool> SetFunctionState(
FlowId flowId, Status status,
string? storedParameter, string? storedResult,
byte[]? storedParameter, byte[]? storedResult,
StoredException? storedException,
long expires,
int expectedEpoch
) => _inner.SetFunctionState(flowId, status, storedParameter, storedResult, storedException, expires, expectedEpoch);

public Task<bool> SucceedFunction(
FlowId flowId,
string? result,
byte[]? result,
long timestamp,
int expectedEpoch,
ComplimentaryState complimentaryState
Expand Down Expand Up @@ -95,7 +95,7 @@ public Task<bool> Interrupt(FlowId flowId, bool onlyIfExecuting)

public Task<bool?> Interrupted(FlowId flowId) => _inner.Interrupted(flowId);

public Task<bool> SetParameters(FlowId flowId, string? storedParameter, string? storedResult, int expectedEpoch)
public Task<bool> SetParameters(FlowId flowId, byte[]? storedParameter, byte[]? storedResult, int expectedEpoch)
=> _inner.SetParameters(flowId, storedParameter, storedResult, expectedEpoch);

public Task<StatusAndEpoch?> GetFunctionStatus(FlowId flowId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ public async Task CrashedWatchDogStartUpIsDelayedByOneSecondSuccessfully()
var functionId = new FlowId("flowType", "flowInstance");
await store.CreateFunction(
functionId,
"hello world".ToJson(),
"hello world".ToJson().ToUtf8Bytes(),
leaseExpiration: DateTime.UtcNow.Ticks,
postponeUntil: null,
timestamp: DateTime.UtcNow.Ticks
Expand Down Expand Up @@ -49,7 +49,7 @@ public async Task CrashedWatchDogStartUpNotDelayedSuccessfully()
var functionId = new FlowId("flowType", "flowInstance");
await store.CreateFunction(
functionId,
"hello world".ToJson(),
"hello world".ToJson().ToUtf8Bytes(),
leaseExpiration: DateTime.UtcNow.Ticks,
postponeUntil: null,
timestamp: DateTime.UtcNow.Ticks
Expand All @@ -74,7 +74,7 @@ public async Task PostponedWatchDogStartUpIsDelayedByOneSecondSuccessfully()
var functionId = new FlowId("flowType", "flowInstance");
await store.CreateFunction(
functionId,
storedParameter,
storedParameter.ToUtf8Bytes(),
leaseExpiration: DateTime.UtcNow.Ticks,
postponeUntil: null,
timestamp: DateTime.UtcNow.Ticks
Expand All @@ -84,7 +84,7 @@ await store.PostponeFunction(
postponeUntil: 0,
timestamp: DateTime.UtcNow.Ticks,
expectedEpoch: 0,
complimentaryState: new ComplimentaryState(storedParameter.ToFunc(), LeaseLength: 0)
complimentaryState: new ComplimentaryState(storedParameter.ToUtf8Bytes().ToFunc(), LeaseLength: 0)
).ShouldBeTrueAsync();

var stopWatch = new Stopwatch();
Expand All @@ -107,7 +107,7 @@ public async Task PostponedWatchDogStartUpNotDelayedSuccessfully()
{
var store = new InMemoryFunctionStore();

var storedParameter = "hello world".ToJson();
var storedParameter = "hello world".ToJson().ToUtf8Bytes();
var functionId = new FlowId("flowType", "flowInstance");
await store.CreateFunction(
functionId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,38 +54,38 @@ private class Serializer : ISerializer
public bool Invoked { get; set; }
private ISerializer Default { get; } = DefaultSerializer.Instance;

public string SerializeParameter<TParam>(TParam parameter)
public byte[] SerializeParameter<TParam>(TParam parameter)
{
Invoked = true;
return Default.SerializeParameter(parameter);
}

public TParam DeserializeParameter<TParam>(string json)
public TParam DeserializeParameter<TParam>(byte[] json)
=> Default.DeserializeParameter<TParam>(json);

public StoredException SerializeException(Exception exception)
=> Default.SerializeException(exception);
public PreviouslyThrownException DeserializeException(StoredException storedException)
=> Default.DeserializeException(storedException);

public string SerializeResult<TResult>(TResult result)
public byte[] SerializeResult<TResult>(TResult result)
=> Default.SerializeResult(result);
public TResult DeserializeResult<TResult>(string json)
public TResult DeserializeResult<TResult>(byte[] json)
=> Default.DeserializeResult<TResult>(json);

public JsonAndType SerializeMessage<TEvent>(TEvent message) where TEvent : notnull
=> Default.SerializeMessage(message);
public object DeserializeMessage(string json, string type)
public object DeserializeMessage(byte[] json, byte[] type)
=> Default.DeserializeMessage(json, type);

public string SerializeEffectResult<TResult>(TResult result)
public byte[] SerializeEffectResult<TResult>(TResult result)
=> Default.SerializeEffectResult(result);
public TResult DeserializeEffectResult<TResult>(string json)
public TResult DeserializeEffectResult<TResult>(byte[] json)
=> Default.DeserializeEffectResult<TResult>(json);

public string SerializeState<TState>(TState state) where TState : FlowState, new()
public byte[] SerializeState<TState>(TState state) where TState : FlowState, new()
=> Default.SerializeState(state);
public TState DeserializeState<TState>(string json) where TState : FlowState, new()
public TState DeserializeState<TState>(byte[] json) where TState : FlowState, new()
=> Default.DeserializeState<TState>(json);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,38 +59,38 @@ private class Serializer : ISerializer
public bool Invoked { get; set; }
private ISerializer Default { get; } = DefaultSerializer.Instance;

public string SerializeParameter<TParam>(TParam parameter)
public byte[] SerializeParameter<TParam>(TParam parameter)
{
Invoked = true;
return Default.SerializeParameter(parameter);
}
public TParam DeserializeParameter<TParam>(string json)
public TParam DeserializeParameter<TParam>(byte[] json)
=> Default.DeserializeParameter<TParam>(json);

public StoredException SerializeException(Exception exception)
=> Default.SerializeException(exception);
public PreviouslyThrownException DeserializeException(StoredException storedException)
=> Default.DeserializeException(storedException);

public string SerializeResult<TResult>(TResult result)
public byte[] SerializeResult<TResult>(TResult result)
=> Default.SerializeResult(result);
public TResult DeserializeResult<TResult>(string json)
public TResult DeserializeResult<TResult>(byte[] json)
=> Default.DeserializeResult<TResult>(json);

public JsonAndType SerializeMessage<TEvent>(TEvent message) where TEvent : notnull
=> Default.SerializeMessage(message);
public object DeserializeMessage(string json, string type)
public object DeserializeMessage(byte[] json, byte[] type)
=> Default.DeserializeMessage(json, type);

public string SerializeEffectResult<TResult>(TResult result)
public byte[] SerializeEffectResult<TResult>(TResult result)
=> Default.SerializeEffectResult(result);
public TResult DeserializeEffectResult<TResult>(string json)
public TResult DeserializeEffectResult<TResult>(byte[] json)
=> Default.DeserializeEffectResult<TResult>(json);

public string SerializeState<TState>(TState state) where TState : FlowState, new()
public byte[] SerializeState<TState>(TState state) where TState : FlowState, new()
=> Default.SerializeState(state);

public TState DeserializeState<TState>(string json) where TState : FlowState, new()
public TState DeserializeState<TState>(byte[] json) where TState : FlowState, new()
=> Default.DeserializeState<TState>(json);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,37 +59,37 @@ private class Serializer : ISerializer
public bool Invoked { get; set; }
private ISerializer Default { get; } = DefaultSerializer.Instance;

public string SerializeParameter<TParam>(TParam parameter)
public byte[] SerializeParameter<TParam>(TParam parameter)
{
Invoked = true;
return Default.SerializeParameter(parameter);
}
public TParam DeserializeParameter<TParam>(string json)
public TParam DeserializeParameter<TParam>(byte[] json)
=> Default.DeserializeParameter<TParam>(json);

public StoredException SerializeException(Exception exception)
=> Default.SerializeException(exception);
public PreviouslyThrownException DeserializeException(StoredException storedException)
=> Default.DeserializeException(storedException);

public string SerializeResult<TResult>(TResult result)
public byte[] SerializeResult<TResult>(TResult result)
=> Default.SerializeResult(result);
public TResult DeserializeResult<TResult>(string json)
public TResult DeserializeResult<TResult>(byte[] json)
=> Default.DeserializeResult<TResult>(json);

public JsonAndType SerializeMessage<TEvent>(TEvent message) where TEvent : notnull
=> Default.SerializeMessage(message);
public object DeserializeMessage(string json, string type)
public object DeserializeMessage(byte[] json, byte[] type)
=> Default.DeserializeMessage(json, type);

public string SerializeEffectResult<TResult>(TResult result)
public byte[] SerializeEffectResult<TResult>(TResult result)
=> Default.SerializeEffectResult(result);
public TResult DeserializeEffectResult<TResult>(string json)
public TResult DeserializeEffectResult<TResult>(byte[] json)
=> Default.DeserializeEffectResult<TResult>(json);

public string SerializeState<TState>(TState state) where TState : Domain.FlowState, new()
public byte[] SerializeState<TState>(TState state) where TState : FlowState, new()
=> Default.SerializeState(state);
public TState DeserializeState<TState>(string json) where TState : FlowState, new()
public TState DeserializeState<TState>(byte[] json) where TState : FlowState, new()
=> Default.DeserializeState<TState>(json);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public async Task RFunctionsShutdownTaskOnlyCompletesAfterCrashedWatchDogsRFunct

await store.CreateFunction(
functionId,
param: "".ToJson(),
param: "".ToJson().ToUtf8Bytes(),
leaseExpiration: DateTime.UtcNow.Ticks,
postponeUntil: null,
timestamp: DateTime.UtcNow.Ticks
Expand Down Expand Up @@ -144,7 +144,7 @@ public async Task RFunctionsShutdownTaskOnlyCompletesAfterPostponedWatchDogsRFun

await store.CreateFunction(
functionId,
storedParameter,
storedParameter.ToUtf8Bytes(),
leaseExpiration: DateTime.UtcNow.Ticks,
postponeUntil: null,
timestamp: DateTime.UtcNow.Ticks
Expand All @@ -155,7 +155,7 @@ await store.PostponeFunction(
postponeUntil: DateTime.UtcNow.AddDays(-1).Ticks,
timestamp: DateTime.UtcNow.Ticks,
expectedEpoch: 0,
new ComplimentaryState(() => storedParameter, LeaseLength: 0)
new ComplimentaryState(() => storedParameter.ToUtf8Bytes(), LeaseLength: 0)
).ShouldBeTrueAsync();

var unhandledExceptionCatcher = new UnhandledExceptionCatcher();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using Cleipnir.ResilientFunctions.CoreRuntime;
using Cleipnir.ResilientFunctions.CoreRuntime.ParameterSerialization;
using Cleipnir.ResilientFunctions.Domain;
using Cleipnir.ResilientFunctions.Helpers;
using Cleipnir.ResilientFunctions.Messaging;
using Cleipnir.ResilientFunctions.Storage;
using Cleipnir.ResilientFunctions.Tests.Utils;
Expand All @@ -19,7 +20,7 @@ protected async Task CustomEventSerializerIsUsedWhenSpecified(Task<IFunctionStor
var functionStore = await functionStoreTask;
await functionStore.CreateFunction(
functionId,
Test.SimpleStoredParameter,
param: Test.SimpleStoredParameter,
leaseExpiration: DateTime.UtcNow.Ticks,
postponeUntil: null,
timestamp: DateTime.UtcNow.Ticks
Expand All @@ -45,7 +46,7 @@ await functionStore.CreateFunction(

eventSerializer.EventToDeserialize.Count.ShouldBe(1);
var (eventJson, eventType) = eventSerializer.EventToDeserialize[0];
var deserializedEvent = DefaultSerializer.Instance.DeserializeMessage(eventJson, eventType);
var deserializedEvent = DefaultSerializer.Instance.DeserializeMessage(eventJson.ToUtf8Bytes(), eventType.ToUtf8Bytes());
deserializedEvent.ShouldBe("hello world");
}

Expand All @@ -54,41 +55,41 @@ private class EventSerializer : ISerializer
public Utils.SyncedList<object> EventToSerialize { get; } = new();
public Utils.SyncedList<Tuple<string, string>> EventToDeserialize { get; }= new();

public string SerializeParameter<TParam>(TParam parameter)
public byte[] SerializeParameter<TParam>(TParam parameter)
=> DefaultSerializer.Instance.SerializeParameter(parameter);

public TParam DeserializeParameter<TParam>(string json)
public TParam DeserializeParameter<TParam>(byte[] json)
=> DefaultSerializer.Instance.DeserializeParameter<TParam>(json);

public StoredException SerializeException(Exception exception)
=> DefaultSerializer.Instance.SerializeException(exception);
public PreviouslyThrownException DeserializeException(StoredException storedException)
=> DefaultSerializer.Instance.DeserializeException(storedException);

public string SerializeResult<TResult>(TResult result)
public byte[] SerializeResult<TResult>(TResult result)
=> DefaultSerializer.Instance.SerializeResult(result);
public TResult DeserializeResult<TResult>(string json)
public TResult DeserializeResult<TResult>(byte[] json)
=> DefaultSerializer.Instance.DeserializeResult<TResult>(json);

public JsonAndType SerializeMessage<TEvent>(TEvent message) where TEvent : notnull
{
EventToSerialize.Add(message);
return DefaultSerializer.Instance.SerializeMessage(message);
}
public object DeserializeMessage(string json, string type)
public object DeserializeMessage(byte[] json, byte[] type)
{
EventToDeserialize.Add(Tuple.Create(json, type));
EventToDeserialize.Add(Tuple.Create(json.ToStringFromUtf8Bytes(), type.ToStringFromUtf8Bytes()));
return DefaultSerializer.Instance.DeserializeMessage(json, type);
}

public string SerializeEffectResult<TResult>(TResult result)
public byte[] SerializeEffectResult<TResult>(TResult result)
=> DefaultSerializer.Instance.SerializeEffectResult(result);
public TResult DeserializeEffectResult<TResult>(string json)
public TResult DeserializeEffectResult<TResult>(byte[] json)
=> DefaultSerializer.Instance.DeserializeEffectResult<TResult>(json);

public string SerializeState<TState>(TState state) where TState : FlowState, new()
public byte[] SerializeState<TState>(TState state) where TState : FlowState, new()
=> DefaultSerializer.Instance.SerializeState(state);
public TState DeserializeState<TState>(string json) where TState : FlowState, new()
public TState DeserializeState<TState>(byte[] json) where TState : FlowState, new()
=> DefaultSerializer.Instance.DeserializeState<TState>(json);
}
}
Loading

0 comments on commit 4974e68

Please sign in to comment.