Skip to content

Commit

Permalink
fix: ensure tipsetwork starting and stopping work
Browse files Browse the repository at this point in the history
  • Loading branch information
frrist committed May 23, 2022
1 parent 186d605 commit 3ac2bce
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 111 deletions.
Binary file added .config.toml.swp
Binary file not shown.
Binary file added .docker-compose.yml.swp
Binary file not shown.
98 changes: 20 additions & 78 deletions chain/indexer/distributed/catalog.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,12 @@
package distributed

import (
"context"
"encoding/json"
"fmt"
"os"

logging "github.com/ipfs/go-log/v2"
"go.uber.org/atomic"

"github.com/hibiken/asynq"
"go.opentelemetry.io/otel/trace"
logging "github.com/ipfs/go-log/v2"

"github.com/filecoin-project/lily/chain/indexer/distributed/queue/tasks"
"github.com/filecoin-project/lily/config"
)

Expand All @@ -30,7 +24,7 @@ func NewCatalog(cfg config.QueueConfig) (*Catalog, error) {
if _, exists := c.servers[name]; exists {
return nil, fmt.Errorf("duplicate queue name: %q", name)
}
log.Infow("registering worker queue config", "name", name, "type", "redis")
log.Infow("registering worker queue config", "name", name, "type", "redis", "addr", sc.RedisConfig.Addr)

// Find the password of the queue, which is either indirectly specified using PasswordEnv or explicit via Password.
// TODO use github.com/kelseyhightower/envconfig
Expand All @@ -42,34 +36,29 @@ func NewCatalog(cfg config.QueueConfig) (*Catalog, error) {
}

c.servers[name] = &TipSetWorker{
server: asynq.NewServer(
asynq.RedisClientOpt{
Network: sc.RedisConfig.Network,
Addr: sc.RedisConfig.Addr,
Username: sc.RedisConfig.Username,
Password: queuePassword,
DB: sc.RedisConfig.DB,
PoolSize: sc.RedisConfig.PoolSize,
},
asynq.Config{
LogLevel: sc.WorkerConfig.LogLevel(),
Queues: sc.WorkerConfig.Queues(),
ShutdownTimeout: sc.WorkerConfig.ShutdownTimeout,
Concurrency: sc.WorkerConfig.Concurrency,
StrictPriority: sc.WorkerConfig.StrictPriority,
Logger: log.With("worker", name),
ErrorHandler: &QueueErrorHandler{},
},
),
running: atomic.NewBool(false),
RedisConfig: asynq.RedisClientOpt{
Network: sc.RedisConfig.Network,
Addr: sc.RedisConfig.Addr,
Username: sc.RedisConfig.Username,
Password: queuePassword,
DB: sc.RedisConfig.DB,
PoolSize: sc.RedisConfig.PoolSize,
},
ServerConfig: asynq.Config{
LogLevel: sc.WorkerConfig.LogLevel(),
Queues: sc.WorkerConfig.Queues(),
ShutdownTimeout: sc.WorkerConfig.ShutdownTimeout,
Concurrency: sc.WorkerConfig.Concurrency,
StrictPriority: sc.WorkerConfig.StrictPriority,
},
}
}

for name, cc := range cfg.Notifiers {
if _, exists := c.servers[name]; exists {
return nil, fmt.Errorf("duplicate queue name: %q", name)
}
log.Infow("registering notifier queue config", "name", name, "type", "redis")
log.Infow("registering notifier queue config", "name", name, "type", "redis", "addr", cc.Addr)

// Find the password of the queue, which is either indirectly specified using PasswordEnv or explicit via Password.
// TODO use github.com/kelseyhightower/envconfig
Expand All @@ -95,24 +84,8 @@ func NewCatalog(cfg config.QueueConfig) (*Catalog, error) {
}

type TipSetWorker struct {
server *asynq.Server
running *atomic.Bool
}

func (w *TipSetWorker) Running() bool {
return w.running.Load()
}

func (w *TipSetWorker) Run(mux *asynq.ServeMux) error {
if w.running.Load() {
return fmt.Errorf("server already running")
}
w.running.Swap(true)
return w.server.Run(mux)
}

func (w *TipSetWorker) Shutdown() {
w.server.Shutdown()
RedisConfig asynq.RedisClientOpt
ServerConfig asynq.Config
}

// Catalog contains a map of workers and clients
Expand Down Expand Up @@ -149,34 +122,3 @@ func (c *Catalog) Notifier(name string) (*asynq.Client, error) {
}
return client, nil
}

type QueueErrorHandler struct{}

func (w *QueueErrorHandler) HandleError(ctx context.Context, task *asynq.Task, err error) {
switch task.Type() {
case tasks.TypeIndexTipSet:
var p tasks.IndexTipSetPayload
if err := json.Unmarshal(task.Payload(), &p); err != nil {
log.Errorw("failed to decode task type (developer error?)", "error", err)
}
if p.HasTraceCarrier() {
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
trace.SpanFromContext(ctx).RecordError(err)
}
}
log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err)
case tasks.TypeGapFillTipSet:
var p tasks.GapFillTipSetPayload
if err := json.Unmarshal(task.Payload(), &p); err != nil {
log.Errorw("failed to decode task type (developer error?)", "error", err)
}
if p.HasTraceCarrier() {
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
trace.SpanFromContext(ctx).RecordError(err)
}
}
log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err)
}
}
57 changes: 48 additions & 9 deletions chain/indexer/distributed/queue/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,32 @@ package queue

import (
"context"
"encoding/json"

"github.com/hibiken/asynq"
logging "github.com/ipfs/go-log/v2"
"go.opentelemetry.io/otel/trace"

"github.com/filecoin-project/lily/chain/indexer"
"github.com/filecoin-project/lily/chain/indexer/distributed"
"github.com/filecoin-project/lily/chain/indexer/distributed/queue/tasks"
"github.com/filecoin-project/lily/storage"
)

var log = logging.Logger("lily/distributed/worker")

type AsynqWorker struct {
done chan struct{}
done chan struct{}

name string
server *distributed.TipSetWorker
index indexer.Indexer
db *storage.Database
}

func NewAsynqWorker(i indexer.Indexer, db *storage.Database, server *distributed.TipSetWorker) *AsynqWorker {
func NewAsynqWorker(name string, i indexer.Indexer, db *storage.Database, server *distributed.TipSetWorker) *AsynqWorker {
return &AsynqWorker{
name: name,
server: server,
index: i,
db: db,
Expand All @@ -34,18 +42,49 @@ func (t *AsynqWorker) Run(ctx context.Context) error {
mux.HandleFunc(tasks.TypeIndexTipSet, tasks.NewIndexHandler(t.index).HandleIndexTipSetTask)
mux.HandleFunc(tasks.TypeGapFillTipSet, tasks.NewGapFillHandler(t.index, t.db).HandleGapFillTipSetTask)

if err := t.server.Run(mux); err != nil {
t.server.ServerConfig.Logger = log.With("name", t.name)
t.server.ServerConfig.ErrorHandler = &WorkerErrorHandler{}

server := asynq.NewServer(t.server.RedisConfig, t.server.ServerConfig)
if err := server.Start(mux); err != nil {
return err
}

go func() {
<-ctx.Done()
t.server.Shutdown()
}()

<-ctx.Done()
server.Shutdown()
return nil
}

func (t *AsynqWorker) Done() <-chan struct{} {
return t.done
}

type WorkerErrorHandler struct{}

func (w *WorkerErrorHandler) HandleError(ctx context.Context, task *asynq.Task, err error) {
switch task.Type() {
case tasks.TypeIndexTipSet:
var p tasks.IndexTipSetPayload
if err := json.Unmarshal(task.Payload(), &p); err != nil {
log.Errorw("failed to decode task type (developer error?)", "error", err)
}
if p.HasTraceCarrier() {
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
trace.SpanFromContext(ctx).RecordError(err)
}
}
log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err)
case tasks.TypeGapFillTipSet:
var p tasks.GapFillTipSetPayload
if err := json.Unmarshal(task.Payload(), &p); err != nil {
log.Errorw("failed to decode task type (developer error?)", "error", err)
}
if p.HasTraceCarrier() {
if sc := p.TraceCarrier.AsSpanContext(); sc.IsValid() {
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
trace.SpanFromContext(ctx).RecordError(err)
}
}
log.Errorw("task failed", "type", task.Type(), "tipset", p.TipSet.Key().String(), "height", p.TipSet.Height(), "tasks", p.Tasks, "error", err)
}
}
14 changes: 3 additions & 11 deletions commands/job/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,13 @@ import (
)

var tipsetWorkerFlags struct {
queue string
concurrency int
queue string
}

var TipSetWorkerCmd = &cli.Command{
Name: "tipset-worker",
Usage: "start a tipset-worker that consumes tasks from the provided queuing system and performs indexing",
Flags: []cli.Flag{
&cli.IntFlag{
Name: "concurrency",
Usage: "Concurrency sets the maximum number of concurrent processing of tasks. If set to a zero or negative value it will be set to the number of CPUs usable by the current process.",
Value: 1,
Destination: &tipsetWorkerFlags.concurrency,
},
&cli.StringFlag{
Name: "queue",
Usage: "Name of queue system worker will consume work from.",
Expand All @@ -43,9 +36,8 @@ var TipSetWorkerCmd = &cli.Command{
defer closer()

res, err := api.StartTipSetWorker(ctx, &lily.LilyTipSetWorkerConfig{
JobConfig: RunFlags.ParseJobConfig("tipset-worker"),
Queue: tipsetWorkerFlags.queue,
Concurrency: tipsetWorkerFlags.concurrency,
JobConfig: RunFlags.ParseJobConfig("tipset-worker"),
Queue: tipsetWorkerFlags.queue,
})
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ require (
github.com/hibiken/asynq/x v0.0.0-20220413130846-5c723f597e01
github.com/jedib0t/go-pretty/v6 v6.2.7
go.opentelemetry.io/otel/trace v1.3.0
go.uber.org/atomic v1.9.0
)

require (
Expand Down Expand Up @@ -336,6 +335,7 @@ require (
github.com/zondax/ledger-go v0.12.1 // indirect
go.opentelemetry.io/otel/metric v0.25.0 // indirect
go.opentelemetry.io/otel/sdk/export/metric v0.25.0 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/dig v1.12.0 // indirect
go4.org v0.0.0-20200411211856-f5505b9728dd // indirect
golang.org/x/crypto v0.0.0-20220411220226-7b82a4e95df4 // indirect
Expand Down
4 changes: 0 additions & 4 deletions lens/lily/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,6 @@ type LilyTipSetWorkerConfig struct {

// Queue is the name of the queueing system the worker will consume work from.
Queue string
// Concurrency sets the maximum number of concurrent processing of tasks.
// If set to a zero or negative value, NewServer will overwrite the value
// to the number of CPUs usable by the current process.
Concurrency int
}

type LilySurveyConfig struct {
Expand Down
11 changes: 3 additions & 8 deletions lens/lily/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,6 @@ func (m *LilyNodeAPI) StartTipSetWorker(_ context.Context, cfg *LilyTipSetWorker
return nil, err
}

if worker.Running() {
return nil, fmt.Errorf("worker %s already running", cfg.Queue)
}

taskAPI, err := datasource.NewDataSource(m)
if err != nil {
return nil, err
Expand All @@ -113,11 +109,10 @@ func (m *LilyNodeAPI) StartTipSetWorker(_ context.Context, cfg *LilyTipSetWorker
Name: cfg.JobConfig.Name,
Type: "tipset-worker",
Params: map[string]string{
"queue": cfg.Queue,
"storage": cfg.JobConfig.Storage,
"concurrency": strconv.Itoa(cfg.Concurrency),
"queue": cfg.Queue,
"storage": cfg.JobConfig.Storage,
},
Job: queue.NewAsynqWorker(im, db, worker),
Job: queue.NewAsynqWorker(cfg.JobConfig.Name, im, db, worker),
RestartOnFailure: cfg.JobConfig.RestartOnFailure,
RestartOnCompletion: cfg.JobConfig.RestartOnCompletion,
RestartDelay: cfg.JobConfig.RestartDelay,
Expand Down
1 change: 1 addition & 0 deletions tasks/ipfs/task.go
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
package ipfs

0 comments on commit 3ac2bce

Please sign in to comment.