Skip to content

Commit

Permalink
feat(postgres): add postgres query config + db query optimizations (#320
Browse files Browse the repository at this point in the history
) (#321)

* feat(postgres): add query + tick fixes

* feat(postgres): conditional prepare statement

* fix(postgres): Ranging over a map is non deterministic

* Lazily define prepared statement

---------

Co-authored-by: Gabriel Guerra <[email protected]>
Co-authored-by: David Farr <[email protected]>
  • Loading branch information
3 people authored May 15, 2024
1 parent cc44852 commit 23be4a5
Show file tree
Hide file tree
Showing 5 changed files with 432 additions and 302 deletions.
12 changes: 7 additions & 5 deletions cmd/dst/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,11 +146,11 @@ func RunDSTCmd() *cobra.Command {
system.AddOnRequest(t_api.ReleaseLock, coroutines.ReleaseLock)
system.AddOnRequest(t_api.ClaimTask, coroutines.ClaimTask)
system.AddOnRequest(t_api.CompleteTask, coroutines.CompleteTask)
system.AddOnTick(2, coroutines.EnqueueTasks)
system.AddOnTick(2, coroutines.TimeoutLocks)
system.AddOnTick(2, coroutines.SchedulePromises)
system.AddOnTick(2, coroutines.TimeoutPromises)
system.AddOnTick(10, coroutines.NotifySubscriptions)
system.AddOnTick(1000, coroutines.EnqueueTasks)
system.AddOnTick(1000, coroutines.TimeoutLocks)
system.AddOnTick(1000, coroutines.SchedulePromises)
system.AddOnTick(1000, coroutines.TimeoutPromises)
system.AddOnTick(1000, coroutines.NotifySubscriptions)

reqs := []t_api.Kind{
// PROMISE
Expand Down Expand Up @@ -247,6 +247,7 @@ func RunDSTCmd() *cobra.Command {
cmd.Flags().String("aio-store-postgres-username", "", "postgres username")
cmd.Flags().String("aio-store-postgres-password", "", "postgres password")
cmd.Flags().String("aio-store-postgres-database", "resonate_dst", "postgres database name")
cmd.Flags().StringToString("aio-store-postgres-query", make(map[string]string, 0), "postgres query options")
cmd.Flags().Duration("aio-store-postgres-tx-timeout", 2*time.Second, "postgres transaction timeout")
cmd.Flags().Float32("aio-network-success-rate", 0.5, "simulated success rate of http requests")
cmd.Flags().Float32("aio-queuing-success-rate", 0.5, "simulated success rate of queuing requests")
Expand All @@ -261,6 +262,7 @@ func RunDSTCmd() *cobra.Command {
_ = viper.BindPFlag("dst.aio.subsystems.store.config.postgres.username", cmd.Flags().Lookup("aio-store-postgres-username"))
_ = viper.BindPFlag("dst.aio.subsystems.store.config.postgres.password", cmd.Flags().Lookup("aio-store-postgres-password"))
_ = viper.BindPFlag("dst.aio.subsystems.store.config.postgres.database", cmd.Flags().Lookup("aio-store-postgres-database"))
_ = viper.BindPFlag("aio.subsystems.store.config.postgres.query", cmd.Flags().Lookup("aio-store-postgres-query"))
_ = viper.BindPFlag("dst.aio.subsystems.store.config.postgres.txTimeout", cmd.Flags().Lookup("aio-store-postgres-tx-timeout"))
_ = viper.BindPFlag("dst.aio.subsystems.networkDST.config.p", cmd.Flags().Lookup("aio-network-success-rate"))
_ = viper.BindPFlag("dst.aio.subsystems.queuingDST.config.p", cmd.Flags().Lookup("aio-queuing-success-rate"))
Expand Down
16 changes: 9 additions & 7 deletions cmd/serve/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,11 +108,11 @@ func ServeCmd() *cobra.Command {
system.AddOnRequest(t_api.ReleaseLock, coroutines.ReleaseLock)
system.AddOnRequest(t_api.ClaimTask, coroutines.ClaimTask)
system.AddOnRequest(t_api.CompleteTask, coroutines.CompleteTask)
system.AddOnTick(2, coroutines.EnqueueTasks)
system.AddOnTick(2, coroutines.TimeoutLocks)
system.AddOnTick(2, coroutines.SchedulePromises)
system.AddOnTick(2, coroutines.TimeoutPromises)
system.AddOnTick(1, coroutines.NotifySubscriptions)
system.AddOnTick(1000, coroutines.EnqueueTasks)
system.AddOnTick(1000, coroutines.TimeoutLocks)
system.AddOnTick(1000, coroutines.SchedulePromises)
system.AddOnTick(1000, coroutines.TimeoutPromises)
system.AddOnTick(1000, coroutines.NotifySubscriptions)

// metrics server
mux := netHttp.NewServeMux()
Expand Down Expand Up @@ -205,14 +205,15 @@ func ServeCmd() *cobra.Command {
cmd.Flags().Int("aio-store-workers", 1, "number of concurrent connections to the store")
cmd.Flags().Int("aio-store-batch-size", 100, "max submissions processed each tick by a store worker")
cmd.Flags().String("aio-store-sqlite-path", "resonate.db", "sqlite database path")
cmd.Flags().Duration("aio-store-sqlite-tx-timeout", 250*time.Millisecond, "sqlite transaction timeout")
cmd.Flags().Duration("aio-store-sqlite-tx-timeout", 10_000*time.Millisecond, "sqlite transaction timeout")
cmd.Flags().Bool("aio-store-sqlite-reset", false, "sqlite database clean on shutdown")
cmd.Flags().String("aio-store-postgres-host", "localhost", "postgres host")
cmd.Flags().String("aio-store-postgres-port", "5432", "postgres port")
cmd.Flags().String("aio-store-postgres-username", "", "postgres username")
cmd.Flags().String("aio-store-postgres-password", "", "postgres password")
cmd.Flags().String("aio-store-postgres-database", "resonate", "postgres database name")
cmd.Flags().Duration("aio-store-postgres-tx-timeout", 250*time.Millisecond, "postgres transaction timeout")
cmd.Flags().StringToString("aio-store-postgres-query", make(map[string]string, 0), "postgres query options")
cmd.Flags().Duration("aio-store-postgres-tx-timeout", 10_000*time.Millisecond, "postgres transaction timeout")
cmd.Flags().Bool("aio-store-postgres-reset", false, "postgres database clean on shutdown")
// Network
cmd.Flags().Int("aio-network-size", 100, "size of network submission queue buffered channel")
Expand Down Expand Up @@ -242,6 +243,7 @@ func ServeCmd() *cobra.Command {
_ = viper.BindPFlag("aio.subsystems.store.config.postgres.password", cmd.Flags().Lookup("aio-store-postgres-password"))
_ = viper.BindPFlag("aio.subsystems.store.config.postgres.database", cmd.Flags().Lookup("aio-store-postgres-database"))
_ = viper.BindPFlag("aio.subsystems.store.config.postgres.database", cmd.Flags().Lookup("aio-store-postgres-database"))
_ = viper.BindPFlag("aio.subsystems.store.config.postgres.query", cmd.Flags().Lookup("aio-store-postgres-query"))
_ = viper.BindPFlag("aio.subsystems.store.config.postgres.txTimeout", cmd.Flags().Lookup("aio-store-postgres-tx-timeout"))
_ = viper.BindPFlag("aio.subsystems.store.config.postgres.reset", cmd.Flags().Lookup("aio-store-postgres-reset"))
// Network
Expand Down
Loading

0 comments on commit 23be4a5

Please sign in to comment.