Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(tasks): add support for distributed tasks #229

Merged
merged 10 commits into from
Mar 21, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ gen-proto:
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/api/promise.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/api/schedule.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/api/lock.proto
protoc --go_out=. --go_opt=paths=source_relative --go-grpc_out=. --go-grpc_opt=paths=source_relative internal/app/subsystems/api/grpc/api/task.proto

.PHONY: deps
deps:
Expand All @@ -13,8 +14,11 @@ gen-openapi:
oapi-codegen -generate types,client -package promises ./api/promises-openapi.yml > pkg/client/promises/openapi.go
oapi-codegen -generate types,client -package schedules ./api/schedules-openapi.yml > pkg/client/schedules/openapi.go
oapi-codegen -generate types,client -package locks ./api/locks-openapi.yml > pkg/client/locks/openapi.go
oapi-codegen -generate types,client -package tasks ./api/tasks-openapi.yml > pkg/client/tasks/openapi.go

.PHONY: gen-mock
gen-mock:
mockgen -source=pkg/client/promises/openapi.go -destination=pkg/client/promises/mock_client.go -package promises
mockgen -source=pkg/client/schedules/openapi.go -destination=pkg/client/schedules/mock_client.go -package schedules
mockgen -source=pkg/client/locks/openapi.go -destination=pkg/client/locks/mock_client.go -package locks
mockgen -source=pkg/client/tasks/openapi.go -destination=pkg/client/tasks/mock_client.go -package tasks
158 changes: 158 additions & 0 deletions api/tasks-openapi.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
openapi: 3.0.0
info:
title: Tasks API
description: Manage Tasks
version: 1.0.0
paths:
/tasks/claim:
post:
tags:
- Tasks
summary: Claim a task
operationId: claimTask
requestBody:
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/ClaimTaskReq"
responses:
"200":
description: successful operation
content:
application/json:
schema:
$ref: "#/components/schemas/Promise"
"409":
description: Task already claimed

/tasks/complete:
post:
tags:
- Tasks
summary: Complete a task
operationId: completeTask
requestBody:
required: true
content:
application/json:
schema:
$ref: "#/components/schemas/CompleteTaskReq"
responses:
"200":
description: successful operation
"409":
description: Task already completed

components:
#parameters:
schemas:
Promise:
type: object
required:
- id
- state
- timeout
- param
- value
- tags
properties:
id:
type: string
state:
$ref: "#/components/schemas/PromiseState"
param:
$ref: "#/components/schemas/PromiseValue"
value:
$ref: "#/components/schemas/PromiseValue"
timeout:
type: integer
format: int64
idempotencyKeyForCreate:
type: string
readOnly: true
idempotencyKeyForComplete:
type: string
readOnly: true
tags:
type: object
additionalProperties:
type: string
createdOn:
type: integer
readOnly: true
completedOn:
type: integer
readOnly: true

PromiseState:
type: string
enum:
- PENDING
- RESOLVED
- REJECTED
- REJECTED_CANCELED
- REJECTED_TIMEDOUT

PromiseStateComplete:
type: string
enum:
- RESOLVED
- REJECTED

PromiseValue:
type: object
required:
- headers
properties:
headers:
type: object
additionalProperties:
type: string
data:
type: string

ClaimTaskReq:
type: object
properties:
taskId:
type: string
description: The ID of the task to claim
counter:
type: integer
description: The counter of the task to claim
processId:
type: string
executionId:
type: string
expiryInSeconds:
type: integer
format: int64
required:
- taskId
- counter
- processId
- executionId
- expiryInSeconds

CompleteTaskReq:
type: object
properties:
taskId:
type: string
description: The ID of the task to complete
counter:
type: integer
description: The counter of the task to claim
executionId:
type: string
state:
$ref: "#/components/schemas/PromiseStateComplete"
value:
$ref: "#/components/schemas/PromiseValue"
required:
- taskId
- counter
- executionId
- state
- value
7 changes: 7 additions & 0 deletions cmd/dst/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ func RunDSTCmd() *cobra.Command {
system.AddOnRequest(t_api.AcquireLock, coroutines.AcquireLock)
system.AddOnRequest(t_api.HeartbeatLocks, coroutines.HeartbeatLocks)
system.AddOnRequest(t_api.ReleaseLock, coroutines.ReleaseLock)
system.AddOnRequest(t_api.ClaimTask, coroutines.ClaimTask)
system.AddOnRequest(t_api.CompleteTask, coroutines.CompleteTask)
system.AddOnTick(2, coroutines.EnqueueTasks)
system.AddOnTick(2, coroutines.TimeoutLocks)
system.AddOnTick(2, coroutines.SchedulePromises)
system.AddOnTick(2, coroutines.TimeoutPromises)
Expand Down Expand Up @@ -152,6 +155,10 @@ func RunDSTCmd() *cobra.Command {
t_api.AcquireLock,
t_api.HeartbeatLocks,
t_api.ReleaseLock,

// TASK
t_api.ClaimTask,
t_api.CompleteTask,
}

dst := dst.New(&dst.Config{
Expand Down
30 changes: 30 additions & 0 deletions cmd/serve/flags.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package serve

import (
"encoding/json"

"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing/connections/t_conn"
)

type (
// ConnectionSlice is a slice of connections a user can define to configure the queueing subsystem.
ConnectionSlice []*t_conn.ConnectionConfig
)

func (c *ConnectionSlice) String() string {
jsonStr, _ := json.Marshal(c)
return string(jsonStr)
}

func (c *ConnectionSlice) Set(v string) error {
var connections []*t_conn.ConnectionConfig
if err := json.Unmarshal([]byte(v), &connections); err != nil {
return err
}
*c = ConnectionSlice(connections)
return nil
}

func (c *ConnectionSlice) Type() string {
return "ConnectionSlice"
}
17 changes: 13 additions & 4 deletions cmd/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/resonatehq/resonate/internal/api"
"github.com/resonatehq/resonate/internal/app/coroutines"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/network"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing"
"github.com/resonatehq/resonate/internal/app/subsystems/api/grpc"
"github.com/resonatehq/resonate/internal/app/subsystems/api/http"
"github.com/resonatehq/resonate/internal/kernel/system"
Expand Down Expand Up @@ -51,28 +52,30 @@ func ServeCmd() *cobra.Command {
reg := prometheus.NewRegistry()
metrics := metrics.New(reg)

// instatiate api/aio
// instantiate api/aio
api := api.New(config.API.Size, metrics)
aio := aio.New(config.AIO.Size, metrics)

// instatiate api subsystems
// instantiate api subsystems
http := http.New(api, config.API.Subsystems.Http)
grpc := grpc.New(api, config.API.Subsystems.Grpc)

// instatiate aio subsystems
// instantiate aio subsystems
network := network.New(config.AIO.Subsystems.Network.Config)
store, err := util.NewStore(config.AIO.Subsystems.Store)
if err != nil {
return err
}
queuing := queuing.NewSubsytemOrDie(config.AIO.Subsystems.Queuing.Config)

// add api subsystems
api.AddSubsystem(http)
api.AddSubsystem(grpc)

// add api subsystems
// add aio subsystems
aio.AddSubsystem(t_aio.Network, network, config.AIO.Subsystems.Network.Subsystem)
aio.AddSubsystem(t_aio.Store, store, config.AIO.Subsystems.Store.Subsystem)
aio.AddSubsystem(t_aio.Queuing, queuing, config.AIO.Subsystems.Queuing.Subsystem)

// start api/aio
if err := api.Start(); err != nil {
Expand Down Expand Up @@ -102,6 +105,9 @@ func ServeCmd() *cobra.Command {
system.AddOnRequest(t_api.AcquireLock, coroutines.AcquireLock)
system.AddOnRequest(t_api.HeartbeatLocks, coroutines.HeartbeatLocks)
system.AddOnRequest(t_api.ReleaseLock, coroutines.ReleaseLock)
system.AddOnRequest(t_api.ClaimTask, coroutines.ClaimTask)
system.AddOnRequest(t_api.CompleteTask, coroutines.CompleteTask)
system.AddOnTick(2, coroutines.EnqueueTasks)
system.AddOnTick(2, coroutines.TimeoutLocks)
system.AddOnTick(2, coroutines.SchedulePromises)
system.AddOnTick(2, coroutines.TimeoutPromises)
Expand Down Expand Up @@ -209,6 +215,9 @@ func ServeCmd() *cobra.Command {
cmd.Flags().Int("aio-network-batch-size", 100, "max submissions processed each tick by a network worker")
cmd.Flags().Duration("aio-network-timeout", 10*time.Second, "network request timeout")

cmd.Flags().Var(&ConnectionSlice{}, "aio-queuing-connections", "queuing subsystem connections")
_ = viper.BindPFlag("aio.subsystems.queuing.config.connections", cmd.Flags().Lookup("aio-queuing-connections"))

_ = viper.BindPFlag("aio.size", cmd.Flags().Lookup("aio-size"))
_ = viper.BindPFlag("aio.subsystems.store.config.kind", cmd.Flags().Lookup("aio-store"))
_ = viper.BindPFlag("aio.subsystems.store.subsystem.size", cmd.Flags().Lookup("aio-store-size"))
Expand Down
2 changes: 2 additions & 0 deletions cmd/util/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/mitchellh/mapstructure"
"github.com/resonatehq/resonate/internal/aio"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/network"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/queuing"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/store/postgres"
"github.com/resonatehq/resonate/internal/app/subsystems/aio/store/sqlite"
"github.com/resonatehq/resonate/internal/app/subsystems/api/grpc"
Expand Down Expand Up @@ -44,6 +45,7 @@ type AIOSubsystems struct {
Store *AIOSubsystemConfig[StoreConfig]
Network *AIOSubsystemConfig[network.Config]
NetworkDST *AIOSubsystemConfig[network.ConfigDST]
Queuing *AIOSubsystemConfig[queuing.Config]
}

type AIOSubsystemConfig[T any] struct {
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ require (
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-chi/chi v1.5.5 // indirect
github.com/go-chi/chi/v5 v5.0.11 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/gorilla/mux v1.8.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.9.1 h1:4idEAncQnU5cB7BeOkPtxjfCSye0AAm1R0RVIqJ+Jmg=
github.com/gin-gonic/gin v1.9.1/go.mod h1:hPrL7YrpYKXt5YId3A/Tnip5kqbEAP+KLuI3SUcPTeU=
github.com/go-chi/chi v1.5.5 h1:vOB/HbEMt9QqBqErz07QehcOKHaWFtuj87tTDVz2qXE=
github.com/go-chi/chi v1.5.5/go.mod h1:C9JqLr3tIYjDOZpzn+BCuxY8z8vmca43EeMgyZt7irw=
github.com/go-chi/chi/v5 v5.0.11 h1:BnpYbFZ3T3S1WMpD79r7R5ThWX40TaFB7L31Y8xqSwA=
github.com/go-chi/chi/v5 v5.0.11/go.mod h1:DslCQbL2OYiznFReuXYUmQ2hGd1aDpCnlMNITLSKoi8=
github.com/go-playground/assert/v2 v2.2.0 h1:JvknZsQTYeFEAhQwI4qEt9cyV5ONwRHC+lYKSsYSR8s=
github.com/go-playground/assert/v2 v2.2.0/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.14.1 h1:EWaQ/wswjilfKLTECiXz7Rh+3BjFhfDFKv/oXslEjJA=
Expand All @@ -54,6 +58,8 @@ github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/uuid v1.5.0 h1:1p67kYwdtXjb0gL0BPiP1Av9wiZPo5A8z2cWkTZ+eyU=
github.com/google/uuid v1.5.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY=
github.com/gorilla/mux v1.8.1/go.mod h1:AKf9I4AEqPTmMytcMc0KkNouC66V3BtZ4qD5fmWSiMQ=
github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
Expand Down
4 changes: 4 additions & 0 deletions internal/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,10 @@ func (a *api) Enqueue(sqe *bus.SQE[t_api.Request, t_api.Response]) {
status = int(res.HeartbeatLocks.Status)
case t_api.ReleaseLock:
status = int(res.ReleaseLock.Status)
case t_api.ClaimTask:
status = int(res.ClaimTask.Status)
case t_api.CompleteTask:
status = int(res.CompleteTask.Status)
case t_api.Echo:
status = 2000
default:
Expand Down
Loading