Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worker: Support setting task types at runtime #5023

Merged
merged 7 commits into from
Dec 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions api/api_worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type WorkerAPI interface {

storiface.WorkerCalls

TaskDisable(ctx context.Context, tt sealtasks.TaskType) error
TaskEnable(ctx context.Context, tt sealtasks.TaskType) error

// Storage / Other
Remove(ctx context.Context, sector abi.SectorID) error

Expand Down
11 changes: 11 additions & 0 deletions api/apistruct/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,9 @@ type WorkerStruct struct {
ReadPiece func(context.Context, io.Writer, storage.SectorRef, storiface.UnpaddedByteIndex, abi.UnpaddedPieceSize) (storiface.CallID, error) `perm:"admin"`
Fetch func(context.Context, storage.SectorRef, storiface.SectorFileType, storiface.PathType, storiface.AcquireMode) (storiface.CallID, error) `perm:"admin"`

TaskDisable func(ctx context.Context, tt sealtasks.TaskType) error `perm:"admin"`
TaskEnable func(ctx context.Context, tt sealtasks.TaskType) error `perm:"admin"`

Remove func(ctx context.Context, sector abi.SectorID) error `perm:"admin"`
StorageAddLocal func(ctx context.Context, path string) error `perm:"admin"`

Expand Down Expand Up @@ -1573,6 +1576,14 @@ func (w *WorkerStruct) Fetch(ctx context.Context, id storage.SectorRef, fileType
return w.Internal.Fetch(ctx, id, fileType, ptype, am)
}

func (w *WorkerStruct) TaskDisable(ctx context.Context, tt sealtasks.TaskType) error {
return w.Internal.TaskDisable(ctx, tt)
}

func (w *WorkerStruct) TaskEnable(ctx context.Context, tt sealtasks.TaskType) error {
return w.Internal.TaskEnable(ctx, tt)
}

func (w *WorkerStruct) Remove(ctx context.Context, sector abi.SectorID) error {
return w.Internal.Remove(ctx, sector)
}
Expand Down
1 change: 1 addition & 0 deletions api/docgen/docgen.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func init() {
addExample(map[sealtasks.TaskType]struct{}{
sealtasks.TTPreCommit2: {},
})
addExample(sealtasks.TTCommit2)
}

func exampleValue(method string, t, parent reflect.Type) interface{} {
Expand Down
25 changes: 25 additions & 0 deletions cmd/lotus-seal-worker/info.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,14 @@ package main

import (
"fmt"
"sort"

"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/chain/types"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
)

var infoCmd = &cli.Command{
Expand Down Expand Up @@ -49,10 +51,22 @@ var infoCmd = &cli.Command{
return xerrors.Errorf("getting info: %w", err)
}

tt, err := api.TaskTypes(ctx)
if err != nil {
return xerrors.Errorf("getting task types: %w", err)
}

fmt.Printf("Hostname: %s\n", info.Hostname)
fmt.Printf("CPUs: %d; GPUs: %v\n", info.Resources.CPUs, info.Resources.GPUs)
fmt.Printf("RAM: %s; Swap: %s\n", types.SizeStr(types.NewInt(info.Resources.MemPhysical)), types.SizeStr(types.NewInt(info.Resources.MemSwap)))
fmt.Printf("Reserved memory: %s\n", types.SizeStr(types.NewInt(info.Resources.MemReserved)))

fmt.Printf("Task types: ")
for _, t := range ttList(tt) {
fmt.Printf("%s ", t.Short())
}
fmt.Println()

fmt.Println()

paths, err := api.Paths(ctx)
Expand Down Expand Up @@ -80,3 +94,14 @@ var infoCmd = &cli.Command{
return nil
},
}

func ttList(tt map[sealtasks.TaskType]struct{}) []sealtasks.TaskType {
tasks := make([]sealtasks.TaskType, 0, len(tt))
for taskType := range tt {
tasks = append(tasks, taskType)
}
sort.Slice(tasks, func(i, j int) bool {
return tasks[i].Less(tasks[j])
})
return tasks
}
1 change: 1 addition & 0 deletions cmd/lotus-seal-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func main() {
storageCmd,
setCmd,
waitQuietCmd,
tasksCmd,
}

app := &cli.App{
Expand Down
82 changes: 82 additions & 0 deletions cmd/lotus-seal-worker/tasks.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package main

import (
"context"
"strings"

"github.com/urfave/cli/v2"
"golang.org/x/xerrors"

"github.com/filecoin-project/lotus/api"
lcli "github.com/filecoin-project/lotus/cli"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
)

var tasksCmd = &cli.Command{
Name: "tasks",
Usage: "Manage task processing",
Subcommands: []*cli.Command{
tasksEnableCmd,
tasksDisableCmd,
},
}

var allowSetting = map[sealtasks.TaskType]struct{}{
sealtasks.TTAddPiece: {},
sealtasks.TTPreCommit1: {},
sealtasks.TTPreCommit2: {},
sealtasks.TTCommit2: {},
sealtasks.TTUnseal: {},
}

var settableStr = func() string {
var s []string
for _, tt := range ttList(allowSetting) {
s = append(s, tt.Short())
}
return strings.Join(s, "|")
}()

var tasksEnableCmd = &cli.Command{
Name: "enable",
Usage: "Enable a task type",
ArgsUsage: "[" + settableStr + "]",
Action: taskAction(api.WorkerAPI.TaskEnable),
}

var tasksDisableCmd = &cli.Command{
Name: "disable",
Usage: "Disable a task type",
ArgsUsage: "[" + settableStr + "]",
Action: taskAction(api.WorkerAPI.TaskDisable),
}

func taskAction(tf func(a api.WorkerAPI, ctx context.Context, tt sealtasks.TaskType) error) func(cctx *cli.Context) error {
return func(cctx *cli.Context) error {
if cctx.NArg() != 1 {
return xerrors.Errorf("expected 1 argument")
}

var tt sealtasks.TaskType
for taskType := range allowSetting {
if taskType.Short() == cctx.Args().First() {
tt = taskType
break
}
}

if tt == "" {
return xerrors.Errorf("unknown task type '%s'", cctx.Args().First())
}

api, closer, err := lcli.GetWorkerAPI(cctx)
if err != nil {
return err
}
defer closer()

ctx := lcli.ReqContext(cctx)

return tf(api, ctx, tt)
}
}
30 changes: 30 additions & 0 deletions documentation/en/api-methods-worker.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
* [Storage](#Storage)
* [StorageAddLocal](#StorageAddLocal)
* [Task](#Task)
* [TaskDisable](#TaskDisable)
* [TaskEnable](#TaskEnable)
* [TaskTypes](#TaskTypes)
* [Unseal](#Unseal)
* [UnsealPiece](#UnsealPiece)
Expand Down Expand Up @@ -502,6 +504,34 @@ Response: `{}`
## Task


### TaskDisable
There are not yet any comments for this method.

Perms: admin

Inputs:
```json
[
"seal/v0/commit/2"
]
```

Response: `{}`

### TaskEnable
There are not yet any comments for this method.

Perms: admin

Inputs:
```json
[
"seal/v0/commit/2"
]
```

Response: `{}`

### TaskTypes
TaskType -> Weight

Expand Down
20 changes: 20 additions & 0 deletions extern/sector-storage/worker_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ type LocalWorker struct {
ct *workerCallTracker
acceptTasks map[sealtasks.TaskType]struct{}
running sync.WaitGroup
taskLk sync.Mutex

session uuid.UUID
testDisable int64
Expand Down Expand Up @@ -457,9 +458,28 @@ func (l *LocalWorker) ReadPiece(ctx context.Context, writer io.Writer, sector st
}

func (l *LocalWorker) TaskTypes(context.Context) (map[sealtasks.TaskType]struct{}, error) {
l.taskLk.Lock()
defer l.taskLk.Unlock()

return l.acceptTasks, nil
}

func (l *LocalWorker) TaskDisable(ctx context.Context, tt sealtasks.TaskType) error {
l.taskLk.Lock()
defer l.taskLk.Unlock()

delete(l.acceptTasks, tt)
return nil
}

func (l *LocalWorker) TaskEnable(ctx context.Context, tt sealtasks.TaskType) error {
l.taskLk.Lock()
defer l.taskLk.Unlock()

l.acceptTasks[tt] = struct{}{}
return nil
}

func (l *LocalWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) {
return l.localStore.Local(ctx)
}
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,7 @@ github.com/jbenet/goprocess v0.1.4 h1:DRGOFReOMqqDNXwW70QkacFW0YN9QnwLV0Vqk+3oU0
github.com/jbenet/goprocess v0.1.4/go.mod h1:5yspPrukOVuOLORacaBi858NqyClJPQxYZlqdZVfqY4=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jessevdk/go-flags v1.4.0 h1:4IU2WS7AumrZ/40jfhf4QVDMsQwqA7VEHozFRrGARJA=
github.com/jessevdk/go-flags v1.4.0/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
github.com/jmespath/go-jmespath v0.3.0/go.mod h1:9QtRXoHjLGCJ5IBSaohpXITPlowMeeYCZ7fLUTSywik=
Expand Down Expand Up @@ -1210,6 +1211,7 @@ github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJE
github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
github.com/nikkolasg/hexjson v0.0.0-20181101101858-78e39397e00c h1:5bFTChQxSKNwy8ALwOebjekYExl9HTT9urdawqC95tA=
github.com/nikkolasg/hexjson v0.0.0-20181101101858-78e39397e00c/go.mod h1:7qN3Y0BvzRUf4LofcoJplQL10lsFDb4PYlePTVwrP28=
github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229 h1:E2B8qYyeSgv5MXpmzZXRNp8IAQ4vjxIjhpAf5hv/tAg=
github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E=
github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
Expand Down Expand Up @@ -1423,7 +1425,9 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX
github.com/urfave/cli/v2 v2.0.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
github.com/urfave/cli/v2 v2.2.0 h1:JTTnM6wKzdA0Jqodd966MVj4vWbbquZykeX1sKbe2C4=
github.com/urfave/cli/v2 v2.2.0/go.mod h1:SE9GqnLQmjVa0iPEY0f1w3ygNIYcIJ0OKPMoW2caLfQ=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.0.1 h1:tY9CJiPnMXf1ERmG2EyK7gNUd+c6RKGD0IfU8WdUSz8=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM=
Expand Down