From d208aa2e68fbbf7b228e3e9a9e660c29f80c2a47 Mon Sep 17 00:00:00 2001 From: veeding <104374143+veeding@users.noreply.github.com> Date: Mon, 16 May 2022 18:16:14 +0800 Subject: [PATCH] complete file and import task modify upload and download mody: go mod refine the code update: make check motify api modify return values rename --- .gitignore | 1 + server-v2/api/studio/etc/studio-api.yaml | 7 +- server-v2/api/studio/internal/common/utils.go | 29 ++ .../api/studio/internal/config/config.go | 81 ++++- .../handler/file/fileuploadhandler.go | 2 +- .../importtask/createimporttaskhandler.go | 33 ++ .../importtask/deleteimporttaskhandler.go | 33 ++ .../importtask/downloadconfighandler.go | 33 ++ .../handler/importtask/downloadlogshandler.go | 33 ++ .../importtask/getimporttaskhandler.go | 33 ++ .../getimporttasklognameshandler.go | 33 ++ .../importtask/getmanyimporttaskhandler.go | 33 ++ .../importtask/getmanyimporttaskloghandler.go | 33 ++ .../importtask/stopimporttaskhandler.go | 33 ++ .../api/studio/internal/handler/routes.go | 57 +++- .../internal/logic/file/filedestroylogic.go | 3 +- .../internal/logic/file/filesindexlogic.go | 3 +- .../internal/logic/file/fileuploadlogic.go | 14 +- .../logic/importtask/createimporttasklogic.go | 29 ++ .../logic/importtask/deleteimporttasklogic.go | 29 ++ .../logic/importtask/downloadconfiglogic.go | 30 ++ .../logic/importtask/downloadlogslogic.go | 30 ++ .../logic/importtask/getimporttasklogic.go | 29 ++ .../importtask/getimporttasklognameslogic.go | 29 ++ .../importtask/getmanyimporttasklogic.go | 29 ++ .../importtask/getmanyimporttaskloglogic.go | 30 ++ .../logic/importtask/stopimporttasklogic.go | 30 ++ server-v2/api/studio/internal/service/file.go | 56 +-- .../api/studio/internal/service/import.go | 321 ++++++++++++++++++ .../internal/service/importer/importer.go | 261 ++++++++++++++ .../studio/internal/service/importer/task.go | 21 ++ .../internal/service/importer/taskInfo.go | 16 + .../internal/service/importer/taskdb.go | 87 +++++ .../internal/service/importer/taskmgr.go | 269 +++++++++++++++ .../api/studio/internal/svc/servicecontext.go | 9 +- server-v2/api/studio/internal/types/types.go | 198 +++++++++++ server-v2/api/studio/pkg/auth/authorize.go | 1 - server-v2/api/studio/pkg/config/config.go | 115 ------- server-v2/api/studio/restapi/file.api | 6 +- server-v2/api/studio/restapi/import.api | 235 +++++++++++++ server-v2/api/studio/restapi/studio.api | 1 + server-v2/api/studio/studio.go | 26 ++ server-v2/go.mod | 13 +- server-v2/go.sum | 19 +- 44 files changed, 2245 insertions(+), 168 deletions(-) create mode 100644 server-v2/api/studio/internal/common/utils.go create mode 100644 server-v2/api/studio/internal/handler/importtask/createimporttaskhandler.go create mode 100644 server-v2/api/studio/internal/handler/importtask/deleteimporttaskhandler.go create mode 100644 server-v2/api/studio/internal/handler/importtask/downloadconfighandler.go create mode 100644 server-v2/api/studio/internal/handler/importtask/downloadlogshandler.go create mode 100644 server-v2/api/studio/internal/handler/importtask/getimporttaskhandler.go create mode 100644 server-v2/api/studio/internal/handler/importtask/getimporttasklognameshandler.go create mode 100644 server-v2/api/studio/internal/handler/importtask/getmanyimporttaskhandler.go create mode 100644 server-v2/api/studio/internal/handler/importtask/getmanyimporttaskloghandler.go create mode 100644 server-v2/api/studio/internal/handler/importtask/stopimporttaskhandler.go create mode 100644 server-v2/api/studio/internal/logic/importtask/createimporttasklogic.go create mode 100644 server-v2/api/studio/internal/logic/importtask/deleteimporttasklogic.go create mode 100644 server-v2/api/studio/internal/logic/importtask/downloadconfiglogic.go create mode 100644 server-v2/api/studio/internal/logic/importtask/downloadlogslogic.go create mode 100644 server-v2/api/studio/internal/logic/importtask/getimporttasklogic.go create mode 100644 server-v2/api/studio/internal/logic/importtask/getimporttasklognameslogic.go create mode 100644 server-v2/api/studio/internal/logic/importtask/getmanyimporttasklogic.go create mode 100644 server-v2/api/studio/internal/logic/importtask/getmanyimporttaskloglogic.go create mode 100644 server-v2/api/studio/internal/logic/importtask/stopimporttasklogic.go create mode 100644 server-v2/api/studio/internal/service/import.go create mode 100644 server-v2/api/studio/internal/service/importer/importer.go create mode 100644 server-v2/api/studio/internal/service/importer/task.go create mode 100644 server-v2/api/studio/internal/service/importer/taskInfo.go create mode 100644 server-v2/api/studio/internal/service/importer/taskdb.go create mode 100644 server-v2/api/studio/internal/service/importer/taskmgr.go delete mode 100644 server-v2/api/studio/pkg/config/config.go create mode 100644 server-v2/api/studio/restapi/import.api diff --git a/.gitignore b/.gitignore index 21bfa6d0..7111f107 100644 --- a/.gitignore +++ b/.gitignore @@ -23,5 +23,6 @@ dist/ !app/**/*.js tmp server/data +server-v2/api/*/data assets/ bin/ \ No newline at end of file diff --git a/server-v2/api/studio/etc/studio-api.yaml b/server-v2/api/studio/etc/studio-api.yaml index 0d12cbb3..8cbe778a 100644 --- a/server-v2/api/studio/etc/studio-api.yaml +++ b/server-v2/api/studio/etc/studio-api.yaml @@ -1,6 +1,6 @@ Name: studio-api Host: 0.0.0.0 -Port: 9000 +Port: 7002 MaxBytes: 1073741824 Debug: Enable: false @@ -8,4 +8,7 @@ Auth: AccessSecret: "login_secret" AccessExpire: 1800 File: - UploadDir: "./upload/" \ No newline at end of file + UploadDir: "./data/upload/" + TasksDir: "./data/tasks" + SqliteDbFilePath: "./data/tasks.db" + TaskIdPath: "./data/taskId.data" \ No newline at end of file diff --git a/server-v2/api/studio/internal/common/utils.go b/server-v2/api/studio/internal/common/utils.go new file mode 100644 index 00000000..28aa0484 --- /dev/null +++ b/server-v2/api/studio/internal/common/utils.go @@ -0,0 +1,29 @@ +package common + +import ( + "net/http" + "regexp" + "strings" +) + +func ReserveRequest(r *http.Request) bool { + if strings.HasPrefix(r.URL.Path, "/api/files") { + return true + } + if strings.HasPrefix(r.URL.Path, "/api/import-tasks") { + return true + } + return false +} + +func ReserveResponse(r *http.Request) bool { + if strings.HasPrefix(r.URL.Path, "/api/import-tasks") { + return true + } + return false +} + +func IgnoreHandlerBody(r *http.Request) bool { + pattern := regexp.MustCompile(`/api/import-tasks/\d/download.*`) + return pattern.MatchString(r.URL.Path) +} diff --git a/server-v2/api/studio/internal/config/config.go b/server-v2/api/studio/internal/config/config.go index 3efe2fcf..6e0c6bd5 100644 --- a/server-v2/api/studio/internal/config/config.go +++ b/server-v2/api/studio/internal/config/config.go @@ -1,6 +1,13 @@ package config -import "github.com/zeromicro/go-zero/rest" +import ( + "io/ioutil" + "os" + "path/filepath" + + "github.com/zeromicro/go-zero/rest" + "go.uber.org/zap" +) type Config struct { rest.RestConf @@ -13,6 +20,76 @@ type Config struct { } File struct { - UploadDir string + UploadDir string + TasksDir string + SqliteDbFilePath string + TaskIdPath string + } +} + +const ( + DefaultFilesDataDir = "data" + DefaultTaskIdPath = "data/taskId.data" + DefaultUploadDir = "data/upload" + DefaultTasksDir = "data/tasks" + DefaultSqlitedbFilePath = "data/tasks.db" +) + +func (c *Config) Validate() error { + return nil +} + +func (c *Config) Complete() { + if c.File.TaskIdPath == "" { + _, err := os.Stat(DefaultFilesDataDir) + if os.IsNotExist(err) { + os.MkdirAll(DefaultFilesDataDir, 0o766) + } + abs, _ := filepath.Abs(DefaultTaskIdPath) + _, err = ioutil.ReadFile(abs) + if err != nil { + if os.IsNotExist(err) { + _, err := os.Create(abs) + if err != nil { + zap.L().Fatal("DefaultTaskIdPath Init fail", zap.Error(err)) + } else { + zap.L().Fatal("DefaultTaskIdPath Init fail", zap.Error(err)) + } + } + } + c.File.TaskIdPath = abs + } + + if c.File.UploadDir == "" { + abs, _ := filepath.Abs(DefaultTasksDir) + c.File.UploadDir = abs + _, err := os.Stat(abs) + if os.IsNotExist(err) { + os.MkdirAll(abs, 0o776) + } + } + + if c.File.TasksDir == "" { + abs, _ := filepath.Abs(DefaultTasksDir) + c.File.TasksDir = abs + _, err := os.Stat(abs) + if os.IsNotExist(err) { + os.MkdirAll(abs, 0o766) + } + } + + if c.File.SqliteDbFilePath == "" { + _, err := os.Stat(DefaultFilesDataDir) + if os.IsNotExist(err) { + os.MkdirAll(DefaultFilesDataDir, 0o766) + } + abs, _ := filepath.Abs(DefaultSqlitedbFilePath) + c.File.SqliteDbFilePath = abs } } + +func (c *Config) InitConfig() error { + c.Complete() + + return c.Validate() +} diff --git a/server-v2/api/studio/internal/handler/file/fileuploadhandler.go b/server-v2/api/studio/internal/handler/file/fileuploadhandler.go index e379481b..93cc8169 100644 --- a/server-v2/api/studio/internal/handler/file/fileuploadhandler.go +++ b/server-v2/api/studio/internal/handler/file/fileuploadhandler.go @@ -10,7 +10,7 @@ import ( func FileUploadHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - l := file.NewFileUploadLogic(r, svcCtx) + l := file.NewFileUploadLogic(r.Context(), svcCtx) err := l.FileUpload() svcCtx.ResponseHandler.Handle(w, r, nil, err) } diff --git a/server-v2/api/studio/internal/handler/importtask/createimporttaskhandler.go b/server-v2/api/studio/internal/handler/importtask/createimporttaskhandler.go new file mode 100644 index 00000000..e043a6f5 --- /dev/null +++ b/server-v2/api/studio/internal/handler/importtask/createimporttaskhandler.go @@ -0,0 +1,33 @@ +// Code generated by goctl. DO NOT EDIT. +package importtask + +import ( + "net/http" + + "github.com/vesoft-inc/go-pkg/validator" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/logic/importtask" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/zeromicro/go-zero/rest/httpx" +) + +func CreateImportTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.CreateImportTaskRequest + if err := httpx.Parse(r, &req); err != nil { + err = ecode.WithCode(ecode.ErrParam, err) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + if err := validator.Struct(req); err != nil { + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + + l := importtask.NewCreateImportTaskLogic(r.Context(), svcCtx) + data, err := l.CreateImportTask(req) + svcCtx.ResponseHandler.Handle(w, r, data, err) + } +} diff --git a/server-v2/api/studio/internal/handler/importtask/deleteimporttaskhandler.go b/server-v2/api/studio/internal/handler/importtask/deleteimporttaskhandler.go new file mode 100644 index 00000000..c6a12618 --- /dev/null +++ b/server-v2/api/studio/internal/handler/importtask/deleteimporttaskhandler.go @@ -0,0 +1,33 @@ +// Code generated by goctl. DO NOT EDIT. +package importtask + +import ( + "net/http" + + "github.com/vesoft-inc/go-pkg/validator" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/logic/importtask" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/zeromicro/go-zero/rest/httpx" +) + +func DeleteImportTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.DeleteImportTaskRequest + if err := httpx.Parse(r, &req); err != nil { + err = ecode.WithCode(ecode.ErrParam, err) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + if err := validator.Struct(req); err != nil { + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + + l := importtask.NewDeleteImportTaskLogic(r.Context(), svcCtx) + err := l.DeleteImportTask(req) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + } +} diff --git a/server-v2/api/studio/internal/handler/importtask/downloadconfighandler.go b/server-v2/api/studio/internal/handler/importtask/downloadconfighandler.go new file mode 100644 index 00000000..0400faaf --- /dev/null +++ b/server-v2/api/studio/internal/handler/importtask/downloadconfighandler.go @@ -0,0 +1,33 @@ +// Code generated by goctl. DO NOT EDIT. +package importtask + +import ( + "net/http" + + "github.com/vesoft-inc/go-pkg/validator" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/logic/importtask" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/zeromicro/go-zero/rest/httpx" +) + +func DownloadConfigHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.DownloadConfigsRequest + if err := httpx.Parse(r, &req); err != nil { + err = ecode.WithCode(ecode.ErrParam, err) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + if err := validator.Struct(req); err != nil { + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + + l := importtask.NewDownloadConfigLogic(r.Context(), svcCtx) + err := l.DownloadConfig(req) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + } +} diff --git a/server-v2/api/studio/internal/handler/importtask/downloadlogshandler.go b/server-v2/api/studio/internal/handler/importtask/downloadlogshandler.go new file mode 100644 index 00000000..cb4d4b34 --- /dev/null +++ b/server-v2/api/studio/internal/handler/importtask/downloadlogshandler.go @@ -0,0 +1,33 @@ +// Code generated by goctl. DO NOT EDIT. +package importtask + +import ( + "net/http" + + "github.com/vesoft-inc/go-pkg/validator" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/logic/importtask" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/zeromicro/go-zero/rest/httpx" +) + +func DownloadLogsHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.DownloadLogsRequest + if err := httpx.Parse(r, &req); err != nil { + err = ecode.WithCode(ecode.ErrParam, err) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + if err := validator.Struct(req); err != nil { + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + + l := importtask.NewDownloadLogsLogic(r.Context(), svcCtx) + err := l.DownloadLogs(req) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + } +} diff --git a/server-v2/api/studio/internal/handler/importtask/getimporttaskhandler.go b/server-v2/api/studio/internal/handler/importtask/getimporttaskhandler.go new file mode 100644 index 00000000..cb53642c --- /dev/null +++ b/server-v2/api/studio/internal/handler/importtask/getimporttaskhandler.go @@ -0,0 +1,33 @@ +// Code generated by goctl. DO NOT EDIT. +package importtask + +import ( + "net/http" + + "github.com/vesoft-inc/go-pkg/validator" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/logic/importtask" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/zeromicro/go-zero/rest/httpx" +) + +func GetImportTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.GetImportTaskRequest + if err := httpx.Parse(r, &req); err != nil { + err = ecode.WithCode(ecode.ErrParam, err) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + if err := validator.Struct(req); err != nil { + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + + l := importtask.NewGetImportTaskLogic(r.Context(), svcCtx) + data, err := l.GetImportTask(req) + svcCtx.ResponseHandler.Handle(w, r, data, err) + } +} diff --git a/server-v2/api/studio/internal/handler/importtask/getimporttasklognameshandler.go b/server-v2/api/studio/internal/handler/importtask/getimporttasklognameshandler.go new file mode 100644 index 00000000..84aa1e01 --- /dev/null +++ b/server-v2/api/studio/internal/handler/importtask/getimporttasklognameshandler.go @@ -0,0 +1,33 @@ +// Code generated by goctl. DO NOT EDIT. +package importtask + +import ( + "net/http" + + "github.com/vesoft-inc/go-pkg/validator" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/logic/importtask" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/zeromicro/go-zero/rest/httpx" +) + +func GetImportTaskLogNamesHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.GetImportTaskLogNamesRequest + if err := httpx.Parse(r, &req); err != nil { + err = ecode.WithCode(ecode.ErrParam, err) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + if err := validator.Struct(req); err != nil { + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + + l := importtask.NewGetImportTaskLogNamesLogic(r.Context(), svcCtx) + data, err := l.GetImportTaskLogNames(req) + svcCtx.ResponseHandler.Handle(w, r, data, err) + } +} diff --git a/server-v2/api/studio/internal/handler/importtask/getmanyimporttaskhandler.go b/server-v2/api/studio/internal/handler/importtask/getmanyimporttaskhandler.go new file mode 100644 index 00000000..0937d48c --- /dev/null +++ b/server-v2/api/studio/internal/handler/importtask/getmanyimporttaskhandler.go @@ -0,0 +1,33 @@ +// Code generated by goctl. DO NOT EDIT. +package importtask + +import ( + "net/http" + + "github.com/vesoft-inc/go-pkg/validator" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/logic/importtask" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/zeromicro/go-zero/rest/httpx" +) + +func GetManyImportTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.GetManyImportTaskRequest + if err := httpx.Parse(r, &req); err != nil { + err = ecode.WithCode(ecode.ErrParam, err) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + if err := validator.Struct(req); err != nil { + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + + l := importtask.NewGetManyImportTaskLogic(r.Context(), svcCtx) + data, err := l.GetManyImportTask(req) + svcCtx.ResponseHandler.Handle(w, r, data, err) + } +} diff --git a/server-v2/api/studio/internal/handler/importtask/getmanyimporttaskloghandler.go b/server-v2/api/studio/internal/handler/importtask/getmanyimporttaskloghandler.go new file mode 100644 index 00000000..b030ea08 --- /dev/null +++ b/server-v2/api/studio/internal/handler/importtask/getmanyimporttaskloghandler.go @@ -0,0 +1,33 @@ +// Code generated by goctl. DO NOT EDIT. +package importtask + +import ( + "net/http" + + "github.com/vesoft-inc/go-pkg/validator" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/logic/importtask" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/zeromicro/go-zero/rest/httpx" +) + +func GetManyImportTaskLogHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.GetManyImportTaskLogRequest + if err := httpx.Parse(r, &req); err != nil { + err = ecode.WithCode(ecode.ErrParam, err) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + if err := validator.Struct(req); err != nil { + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + + l := importtask.NewGetManyImportTaskLogLogic(r.Context(), svcCtx) + data, err := l.GetManyImportTaskLog(req) + svcCtx.ResponseHandler.Handle(w, r, data, err) + } +} diff --git a/server-v2/api/studio/internal/handler/importtask/stopimporttaskhandler.go b/server-v2/api/studio/internal/handler/importtask/stopimporttaskhandler.go new file mode 100644 index 00000000..3c5d4928 --- /dev/null +++ b/server-v2/api/studio/internal/handler/importtask/stopimporttaskhandler.go @@ -0,0 +1,33 @@ +// Code generated by goctl. DO NOT EDIT. +package importtask + +import ( + "net/http" + + "github.com/vesoft-inc/go-pkg/validator" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/logic/importtask" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/zeromicro/go-zero/rest/httpx" +) + +func StopImportTaskHandler(svcCtx *svc.ServiceContext) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var req types.StopImportTaskRequest + if err := httpx.Parse(r, &req); err != nil { + err = ecode.WithCode(ecode.ErrParam, err) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + if err := validator.Struct(req); err != nil { + svcCtx.ResponseHandler.Handle(w, r, nil, err) + return + } + + l := importtask.NewStopImportTaskLogic(r.Context(), svcCtx) + err := l.StopImportTask(req) + svcCtx.ResponseHandler.Handle(w, r, nil, err) + } +} diff --git a/server-v2/api/studio/internal/handler/routes.go b/server-v2/api/studio/internal/handler/routes.go index c6b5e76d..e2708fb7 100644 --- a/server-v2/api/studio/internal/handler/routes.go +++ b/server-v2/api/studio/internal/handler/routes.go @@ -7,6 +7,7 @@ import ( file "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/handler/file" gateway "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/handler/gateway" health "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/handler/health" + importtask "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/handler/importtask" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" "github.com/zeromicro/go-zero/rest" @@ -59,19 +60,69 @@ func RegisterHandlers(server *rest.Server, serverCtx *svc.ServiceContext) { []rest.Route{ { Method: http.MethodPost, - Path: "/api/file", + Path: "/api/files", Handler: file.FileUploadHandler(serverCtx), }, { Method: http.MethodDelete, - Path: "/api/file/:name", + Path: "/api/files/:name", Handler: file.FileDestroyHandler(serverCtx), }, { Method: http.MethodGet, - Path: "/api/file", + Path: "/api/files", Handler: file.FilesIndexHandler(serverCtx), }, }, ) + + server.AddRoutes( + []rest.Route{ + { + Method: http.MethodPost, + Path: "/api/import-tasks", + Handler: importtask.CreateImportTaskHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/api/import-tasks/:id", + Handler: importtask.GetImportTaskHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/api/import-tasks", + Handler: importtask.GetManyImportTaskHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/api/import-tasks/:id/logs", + Handler: importtask.GetManyImportTaskLogHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/api/import-tasks/:id/task-log-names", + Handler: importtask.GetImportTaskLogNamesHandler(serverCtx), + }, + { + Method: http.MethodDelete, + Path: "/api/import-tasks/:id", + Handler: importtask.DeleteImportTaskHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/api/import-tasks/:id/stop", + Handler: importtask.StopImportTaskHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/api/import-tasks/:id/download-logs", + Handler: importtask.DownloadLogsHandler(serverCtx), + }, + { + Method: http.MethodGet, + Path: "/api/import-tasks/:id/download-config", + Handler: importtask.DownloadConfigHandler(serverCtx), + }, + }, + ) } diff --git a/server-v2/api/studio/internal/logic/file/filedestroylogic.go b/server-v2/api/studio/internal/logic/file/filedestroylogic.go index b70b97f3..4f551c77 100644 --- a/server-v2/api/studio/internal/logic/file/filedestroylogic.go +++ b/server-v2/api/studio/internal/logic/file/filedestroylogic.go @@ -2,6 +2,7 @@ package file import ( "context" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" @@ -25,5 +26,5 @@ func NewFileDestroyLogic(ctx context.Context, svcCtx *svc.ServiceContext) *FileD } func (l *FileDestroyLogic) FileDestroy(req types.FileDestroyRequest) error { - return service.NewFileService(nil, l.ctx, l.svcCtx).FileDestroy(req.Name) + return service.NewFileService(l.ctx, l.svcCtx).FileDestroy(req.Name) } diff --git a/server-v2/api/studio/internal/logic/file/filesindexlogic.go b/server-v2/api/studio/internal/logic/file/filesindexlogic.go index 21c5d3d5..6ca16117 100644 --- a/server-v2/api/studio/internal/logic/file/filesindexlogic.go +++ b/server-v2/api/studio/internal/logic/file/filesindexlogic.go @@ -2,6 +2,7 @@ package file import ( "context" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" @@ -25,5 +26,5 @@ func NewFilesIndexLogic(ctx context.Context, svcCtx *svc.ServiceContext) *FilesI } func (l *FilesIndexLogic) FilesIndex() (resp *types.FilesIndexData, err error) { - return service.NewFileService(nil, l.ctx, l.svcCtx).FilesIndex() + return service.NewFileService(l.ctx, l.svcCtx).FilesIndex() } diff --git a/server-v2/api/studio/internal/logic/file/fileuploadlogic.go b/server-v2/api/studio/internal/logic/file/fileuploadlogic.go index 81f5eaf6..f2460962 100644 --- a/server-v2/api/studio/internal/logic/file/fileuploadlogic.go +++ b/server-v2/api/studio/internal/logic/file/fileuploadlogic.go @@ -1,27 +1,27 @@ package file import ( - "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" - "net/http" + "context" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" "github.com/zeromicro/go-zero/core/logx" ) type FileUploadLogic struct { logx.Logger - r *http.Request + ctx context.Context svcCtx *svc.ServiceContext } -func NewFileUploadLogic(r *http.Request, svcCtx *svc.ServiceContext) *FileUploadLogic { +func NewFileUploadLogic(ctx context.Context, svcCtx *svc.ServiceContext) *FileUploadLogic { return &FileUploadLogic{ - Logger: logx.WithContext(r.Context()), - r: r, + Logger: logx.WithContext(ctx), + ctx: ctx, svcCtx: svcCtx, } } func (l *FileUploadLogic) FileUpload() error { - return service.NewFileService(l.r, nil, l.svcCtx).FileUpload() + return service.NewFileService(l.ctx, l.svcCtx).FileUpload() } diff --git a/server-v2/api/studio/internal/logic/importtask/createimporttasklogic.go b/server-v2/api/studio/internal/logic/importtask/createimporttasklogic.go new file mode 100644 index 00000000..1a99734a --- /dev/null +++ b/server-v2/api/studio/internal/logic/importtask/createimporttasklogic.go @@ -0,0 +1,29 @@ +package importtask + +import ( + "context" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type CreateImportTaskLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewCreateImportTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *CreateImportTaskLogic { + return &CreateImportTaskLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *CreateImportTaskLogic) CreateImportTask(req types.CreateImportTaskRequest) (resp *types.CreateImportTaskData, err error) { + return service.NewImportService(l.ctx, l.svcCtx).CreateImportTask(&req) +} diff --git a/server-v2/api/studio/internal/logic/importtask/deleteimporttasklogic.go b/server-v2/api/studio/internal/logic/importtask/deleteimporttasklogic.go new file mode 100644 index 00000000..989550fb --- /dev/null +++ b/server-v2/api/studio/internal/logic/importtask/deleteimporttasklogic.go @@ -0,0 +1,29 @@ +package importtask + +import ( + "context" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type DeleteImportTaskLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewDeleteImportTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DeleteImportTaskLogic { + return &DeleteImportTaskLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *DeleteImportTaskLogic) DeleteImportTask(req types.DeleteImportTaskRequest) error { + return service.NewImportService(l.ctx, l.svcCtx).DeleteImportTask(&req) +} diff --git a/server-v2/api/studio/internal/logic/importtask/downloadconfiglogic.go b/server-v2/api/studio/internal/logic/importtask/downloadconfiglogic.go new file mode 100644 index 00000000..c0a95510 --- /dev/null +++ b/server-v2/api/studio/internal/logic/importtask/downloadconfiglogic.go @@ -0,0 +1,30 @@ +package importtask + +import ( + "context" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type DownloadConfigLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewDownloadConfigLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DownloadConfigLogic { + return &DownloadConfigLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *DownloadConfigLogic) DownloadConfig(req types.DownloadConfigsRequest) error { + return service.NewImportService(l.ctx, l.svcCtx).DownloadConfig(&req) +} diff --git a/server-v2/api/studio/internal/logic/importtask/downloadlogslogic.go b/server-v2/api/studio/internal/logic/importtask/downloadlogslogic.go new file mode 100644 index 00000000..56f31258 --- /dev/null +++ b/server-v2/api/studio/internal/logic/importtask/downloadlogslogic.go @@ -0,0 +1,30 @@ +package importtask + +import ( + "context" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type DownloadLogsLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewDownloadLogsLogic(ctx context.Context, svcCtx *svc.ServiceContext) *DownloadLogsLogic { + return &DownloadLogsLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *DownloadLogsLogic) DownloadLogs(req types.DownloadLogsRequest) error { + return service.NewImportService(l.ctx, l.svcCtx).DownloadLogs(&req) +} diff --git a/server-v2/api/studio/internal/logic/importtask/getimporttasklogic.go b/server-v2/api/studio/internal/logic/importtask/getimporttasklogic.go new file mode 100644 index 00000000..f0c51417 --- /dev/null +++ b/server-v2/api/studio/internal/logic/importtask/getimporttasklogic.go @@ -0,0 +1,29 @@ +package importtask + +import ( + "context" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type GetImportTaskLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewGetImportTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetImportTaskLogic { + return &GetImportTaskLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *GetImportTaskLogic) GetImportTask(req types.GetImportTaskRequest) (resp *types.GetImportTaskData, err error) { + return service.NewImportService(l.ctx, l.svcCtx).GetImportTask(&req) +} diff --git a/server-v2/api/studio/internal/logic/importtask/getimporttasklognameslogic.go b/server-v2/api/studio/internal/logic/importtask/getimporttasklognameslogic.go new file mode 100644 index 00000000..0c5b409e --- /dev/null +++ b/server-v2/api/studio/internal/logic/importtask/getimporttasklognameslogic.go @@ -0,0 +1,29 @@ +package importtask + +import ( + "context" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type GetImportTaskLogNamesLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewGetImportTaskLogNamesLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetImportTaskLogNamesLogic { + return &GetImportTaskLogNamesLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *GetImportTaskLogNamesLogic) GetImportTaskLogNames(req types.GetImportTaskLogNamesRequest) (resp *types.GetImportTaskLogNamesData, err error) { + return service.NewImportService(l.ctx, l.svcCtx).GetImportTaskLogNames(&req) +} diff --git a/server-v2/api/studio/internal/logic/importtask/getmanyimporttasklogic.go b/server-v2/api/studio/internal/logic/importtask/getmanyimporttasklogic.go new file mode 100644 index 00000000..a85f3ce5 --- /dev/null +++ b/server-v2/api/studio/internal/logic/importtask/getmanyimporttasklogic.go @@ -0,0 +1,29 @@ +package importtask + +import ( + "context" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type GetManyImportTaskLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewGetManyImportTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetManyImportTaskLogic { + return &GetManyImportTaskLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *GetManyImportTaskLogic) GetManyImportTask(req types.GetManyImportTaskRequest) (resp *types.GetManyImportTaskData, err error) { + return service.NewImportService(l.ctx, l.svcCtx).GetManyImportTask(&req) +} diff --git a/server-v2/api/studio/internal/logic/importtask/getmanyimporttaskloglogic.go b/server-v2/api/studio/internal/logic/importtask/getmanyimporttaskloglogic.go new file mode 100644 index 00000000..3d08d666 --- /dev/null +++ b/server-v2/api/studio/internal/logic/importtask/getmanyimporttaskloglogic.go @@ -0,0 +1,30 @@ +package importtask + +import ( + "context" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type GetManyImportTaskLogLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewGetManyImportTaskLogLogic(ctx context.Context, svcCtx *svc.ServiceContext) *GetManyImportTaskLogLogic { + return &GetManyImportTaskLogLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *GetManyImportTaskLogLogic) GetManyImportTaskLog(req types.GetManyImportTaskLogRequest) (resp *types.GetManyImportTaskLogData, err error) { + return service.NewImportService(l.ctx, l.svcCtx).GetManyImportTaskLog(&req) +} diff --git a/server-v2/api/studio/internal/logic/importtask/stopimporttasklogic.go b/server-v2/api/studio/internal/logic/importtask/stopimporttasklogic.go new file mode 100644 index 00000000..3cc15853 --- /dev/null +++ b/server-v2/api/studio/internal/logic/importtask/stopimporttasklogic.go @@ -0,0 +1,30 @@ +package importtask + +import ( + "context" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service" + + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + + "github.com/zeromicro/go-zero/core/logx" +) + +type StopImportTaskLogic struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext +} + +func NewStopImportTaskLogic(ctx context.Context, svcCtx *svc.ServiceContext) *StopImportTaskLogic { + return &StopImportTaskLogic{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (l *StopImportTaskLogic) StopImportTask(req types.StopImportTaskRequest) error { + return service.NewImportService(l.ctx, l.svcCtx).StopImportTask(&req) +} diff --git a/server-v2/api/studio/internal/service/file.go b/server-v2/api/studio/internal/service/file.go index 9fd1a8a2..32969d9a 100644 --- a/server-v2/api/studio/internal/service/file.go +++ b/server-v2/api/studio/internal/service/file.go @@ -6,12 +6,6 @@ import ( "encoding/csv" "errors" "fmt" - "github.com/axgle/mahonia" - "github.com/saintfish/chardet" - "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" - "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" - "github.com/zeromicro/go-zero/core/logx" - "go.uber.org/zap" "io" "io/ioutil" "mime/multipart" @@ -19,6 +13,15 @@ import ( "os" "path/filepath" "strings" + + "github.com/axgle/mahonia" + "github.com/saintfish/chardet" + "github.com/vesoft-inc/go-pkg/middleware" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + "github.com/zeromicro/go-zero/core/logx" + "go.uber.org/zap" ) const ( @@ -45,19 +48,11 @@ type ( } ) -func NewFileService(r *http.Request, ctx context.Context, svcCtx *svc.ServiceContext) FileService { - if r != nil { - return &fileService{ - Logger: logx.WithContext(r.Context()), - r: r, - svcCtx: svcCtx, - } - } else { - return &fileService{ - Logger: logx.WithContext(ctx), - ctx: ctx, - svcCtx: svcCtx, - } +func NewFileService(ctx context.Context, svcCtx *svc.ServiceContext) FileService { + return &fileService{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, } } @@ -132,8 +127,13 @@ func (f *fileService) FileUpload() error { } } + httpReq, ok := middleware.GetRequest(f.ctx) + if !ok { + return ecode.WithInternalServer(fmt.Errorf("unset KeepRequest")) + } + logx.Infof("dir:", dir) - files, _, err := f.UploadFormFiles(dir) + files, _, err := UploadFormFiles(httpReq, dir) if err != nil { logx.Infof("upload file error:%v", err) return err @@ -157,20 +157,20 @@ func (f *fileService) FileUpload() error { return nil } -func (f *fileService) UploadFormFiles(destDirectory string) (uploaded []*multipart.FileHeader, n int64, err error) { - err = f.r.ParseMultipartForm(defaultMulipartMemory) +func UploadFormFiles(r *http.Request, destDirectory string) (uploaded []*multipart.FileHeader, n int64, err error) { + err = r.ParseMultipartForm(defaultMulipartMemory) if err != nil { return nil, 0, err } - if f.r.MultipartForm != nil { - if fhs := f.r.MultipartForm.File; fhs != nil { + if r.MultipartForm != nil { + if fhs := r.MultipartForm.File; fhs != nil { for _, files := range fhs { for _, file := range files { file.Filename = strings.ReplaceAll(file.Filename, "../", "") file.Filename = strings.ReplaceAll(file.Filename, "..\\", "") - n0, err0 := f.SaveFormFile(file, filepath.Join(destDirectory, file.Filename)) + n0, err0 := SaveFormFile(file, filepath.Join(destDirectory, file.Filename)) if err0 != nil { return nil, 0, err0 } @@ -185,7 +185,7 @@ func (f *fileService) UploadFormFiles(destDirectory string) (uploaded []*multipa return nil, 0, http.ErrMissingFile } -func (f *fileService) SaveFormFile(fh *multipart.FileHeader, dest string) (int64, error) { +func SaveFormFile(fh *multipart.FileHeader, dest string) (int64, error) { src, err := fh.Open() if err != nil { return 0, err @@ -222,7 +222,7 @@ func checkCharset(file *multipart.FileHeader) (string, error) { func changeFileCharset2UTF8(filePath string, charSet string) error { fileUTF8Path := filePath + "-copy" err := func() error { - file, err := os.OpenFile(filePath, os.O_RDONLY, 0666) + file, err := os.OpenFile(filePath, os.O_RDONLY, 0o666) if err != nil { zap.L().Warn("open file fail", zap.Error(err)) return err @@ -235,7 +235,7 @@ func changeFileCharset2UTF8(filePath string, charSet string) error { return noCharsetErr } decodeReader := decoder.NewReader(reader) - fileUTF8, err := os.OpenFile(fileUTF8Path, os.O_RDONLY|os.O_CREATE|os.O_WRONLY, 0666) + fileUTF8, err := os.OpenFile(fileUTF8Path, os.O_RDONLY|os.O_CREATE|os.O_WRONLY, 0o666) if err != nil { return err } diff --git a/server-v2/api/studio/internal/service/import.go b/server-v2/api/studio/internal/service/import.go new file mode 100644 index 00000000..ba3f25f6 --- /dev/null +++ b/server-v2/api/studio/internal/service/import.go @@ -0,0 +1,321 @@ +package service + +import ( + "bufio" + "context" + "encoding/json" + "errors" + "fmt" + "io/ioutil" + "net/http" + "os" + "path/filepath" + "sync" + + "github.com/vesoft-inc/go-pkg/middleware" + importconfig "github.com/vesoft-inc/nebula-importer/pkg/config" + importererrors "github.com/vesoft-inc/nebula-importer/pkg/errors" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service/importer" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/utils" + "github.com/zeromicro/go-zero/core/logx" + "go.uber.org/zap" +) + +var ( + _ ImportService = (*importService)(nil) + muTaskId sync.RWMutex +) + +const ( + importLogName = "import.log" + errContentDir = "err" +) + +type ( + ImportService interface { + CreateImportTask(*types.CreateImportTaskRequest) (*types.CreateImportTaskData, error) + StopImportTask(request *types.StopImportTaskRequest) error + DownloadConfig(*types.DownloadConfigsRequest) error + DownloadLogs(request *types.DownloadLogsRequest) error + DeleteImportTask(*types.DeleteImportTaskRequest) error + GetImportTask(*types.GetImportTaskRequest) (*types.GetImportTaskData, error) + GetManyImportTask(request *types.GetManyImportTaskRequest) (*types.GetManyImportTaskData, error) + GetImportTaskLogNames(request *types.GetImportTaskLogNamesRequest) (*types.GetImportTaskLogNamesData, error) + GetManyImportTaskLog(request *types.GetManyImportTaskLogRequest) (*types.GetManyImportTaskLogData, error) + } + + importService struct { + logx.Logger + ctx context.Context + svcCtx *svc.ServiceContext + } +) + +func NewImportService(ctx context.Context, svcCtx *svc.ServiceContext) ImportService { + return &importService{ + Logger: logx.WithContext(ctx), + ctx: ctx, + svcCtx: svcCtx, + } +} + +func (i *importService) CreateImportTask(req *types.CreateImportTaskRequest) (*types.CreateImportTaskData, error) { + jsons, err := json.Marshal(req.Config) + if err != nil { + return nil, ecode.WithCode(ecode.ErrParam, nil) + } + + conf := importconfig.YAMLConfig{} + err = json.Unmarshal(jsons, &conf) + if err != nil { + return nil, err + } + + if err = validClientParams(&conf); err != nil { + err = importererrors.Wrap(importererrors.InvalidConfigPathOrFormat, err) + zap.L().Warn("client params is wrong", zap.Error(err)) + return nil, err + } + + taskDir, err := importer.GetNewTaskDir(i.svcCtx.Config.File.TasksDir) + if err != nil { + return nil, err + } + logPath := filepath.Join(taskDir, importLogName) + conf.LogPath = &logPath + + // create config file + if err := importer.CreateConfigFile(i.svcCtx.Config.File.UploadDir, taskDir, conf); err != nil { + return nil, err + } + + // create err dir + taskErrDir := filepath.Join(taskDir, "err") + if err = utils.CreateDir(taskErrDir); err != nil { + return nil, err + } + + // import + nebulaAddress := *conf.NebulaClientSettings.Connection.Address + user := *conf.NebulaClientSettings.Connection.User + name := req.Name + space := *conf.NebulaClientSettings.Space + task, taskID, err := importer.GetTaskMgr().NewTask(nebulaAddress, user, name, space) + if err != nil { + zap.L().Warn("init task fail", zap.Error(err)) + return nil, err + } + if err = importer.Import(taskID, &conf); err != nil { + // task err: import task not start err + task.TaskInfo.TaskStatus = importer.StatusAborted.String() + err1 := importer.GetTaskMgr().AbortTask(taskID) + if err != nil { + zap.L().Warn("finish task fail", zap.Error(err1)) + } + zap.L().Error(fmt.Sprintf("Failed to start a import task: `%s`, task result: `%v`", taskID, err)) + return nil, err + } + + // write taskId to file + muTaskId.Lock() + taskIDBytes, err := ioutil.ReadFile(i.svcCtx.Config.File.TaskIdPath) + if err != nil { + zap.L().Warn("read taskId file error", zap.Error(err)) + return nil, err + } + taskIdJSON := make(map[string]bool) + if len(taskIDBytes) != 0 { + if err := json.Unmarshal(taskIDBytes, &taskIdJSON); err != nil { + zap.L().Warn("read taskId file error", zap.Error(err)) + return nil, err + } + } + taskIdJSON[taskID] = true + bytes, err := json.Marshal(taskIdJSON) + if err != nil { + zap.L().Warn("read taskId file error", zap.Error(err)) + } + err = ioutil.WriteFile(i.svcCtx.Config.File.TaskIdPath, bytes, 777) + if err != nil { + zap.L().Warn("write taskId file error", zap.Error(err)) + } + defer muTaskId.Unlock() + + return &types.CreateImportTaskData{ + Id: taskID, + }, nil +} + +func (i *importService) StopImportTask(req *types.StopImportTaskRequest) error { + return importer.StopImportTask(req.Id, req.Address+":"+req.Port, req.Username) +} + +func (i *importService) DownloadConfig(req *types.DownloadConfigsRequest) error { + httpReq, ok := middleware.GetRequest(i.ctx) + if !ok { + return ecode.WithInternalServer(fmt.Errorf("unset KeepRequest")) + } + + httpResp, ok := middleware.GetResponseWriter(i.ctx) + if !ok { + return ecode.WithInternalServer(fmt.Errorf("unset KeepResponse Writer")) + } + + configPath := filepath.Join(i.svcCtx.Config.File.TasksDir, req.Id, "config.yaml") + httpResp.Header().Set("Content-Type", "application/octet-stream") + httpResp.Header().Set("Content-Disposition", "attachment;filename="+filepath.Base(configPath)) + http.ServeFile(httpResp, httpReq, configPath) + + return nil +} + +func (i *importService) DownloadLogs(req *types.DownloadLogsRequest) error { + id := req.Id + + httpReq, ok := middleware.GetRequest(i.ctx) + if !ok { + return ecode.WithInternalServer(fmt.Errorf("unset KeepRequest")) + } + + httpResp, ok := middleware.GetResponseWriter(i.ctx) + if !ok { + return ecode.WithInternalServer(fmt.Errorf("unset KeepResponse Writer")) + } + + filename := req.Name + path := "" + if filename == importLogName { + path = filepath.Join(i.svcCtx.Config.File.TasksDir, id, filename) + } else { + path = filepath.Join(i.svcCtx.Config.File.TasksDir, id, "err", filename) + } + + httpResp.Header().Set("Content-Type", "application/octet-stream") + httpResp.Header().Set("Content-Disposition", "attachment;filename="+filepath.Base(path)) + http.ServeFile(httpResp, httpReq, path) + return nil +} + +func (i *importService) DeleteImportTask(req *types.DeleteImportTaskRequest) error { + return importer.DeleteImportTask(i.svcCtx.Config.File.TasksDir, req.Id, req.Address+":"+req.Port, req.Username) +} + +func (i *importService) GetImportTask(req *types.GetImportTaskRequest) (*types.GetImportTaskData, error) { + return importer.GetImportTask(i.svcCtx.Config.File.TasksDir, req.Id, req.Address+":"+req.Port, req.Username) +} + +func (i *importService) GetManyImportTask(req *types.GetManyImportTaskRequest) (*types.GetManyImportTaskData, error) { + return importer.GetManyImportTask(i.svcCtx.Config.File.TasksDir, req.Address+":"+req.Port, req.Username, req.Page, req.PageSize) +} + +// GetImportTaskLogNames :Get all log file's name of a task +func (i *importService) GetImportTaskLogNames(req *types.GetImportTaskLogNamesRequest) (*types.GetImportTaskLogNamesData, error) { + id := req.Id + + errLogDir := filepath.Join(i.svcCtx.Config.File.TasksDir, id, "err") + fileInfos, err := ioutil.ReadDir(errLogDir) + if err != nil { + return nil, err + } + + data := &types.GetImportTaskLogNamesData{ + Names: []string{}, + } + data.Names = append(data.Names, importLogName) + for _, fileInfo := range fileInfos { + name := fileInfo.Name() + data.Names = append(data.Names, name) + } + return data, nil +} + +func (i *importService) GetManyImportTaskLog(req *types.GetManyImportTaskLogRequest) (*types.GetManyImportTaskLogData, error) { + path := "" + if req.File == importLogName { + path = filepath.Join(i.svcCtx.Config.File.TasksDir, req.Id, req.File) + } else { + path = filepath.Join(i.svcCtx.Config.File.TasksDir, req.Id, errContentDir, req.File) + } + lines, err := readFileLines(path, req.Offset, req.Limit) + if err != nil { + return nil, err + } + + muTaskId.RLock() + taskIdBytes, err := ioutil.ReadFile(i.svcCtx.Config.File.TaskIdPath) + muTaskId.RUnlock() + if err != nil { + zap.L().Warn("read taskId file error", zap.Error(err)) + return nil, err + } + taskIdJSON := make(map[string]bool) + if len(taskIdBytes) != 0 { + err = json.Unmarshal(taskIdBytes, &taskIdJSON) + if err != nil { + zap.L().Warn("parse taskId file error", zap.Error(err)) + return nil, err + } + } + + if len(lines) == 0 && taskIdJSON[req.Id] { + return nil, nil + } + if len(lines) == 0 { + return nil, errors.New("no task") + } + + data := &types.GetManyImportTaskLogData{ + Logs: lines, + } + + return data, nil +} + +func validClientParams(conf *importconfig.YAMLConfig) error { + if conf.NebulaClientSettings.Connection == nil || + conf.NebulaClientSettings.Connection.Address == nil || + *conf.NebulaClientSettings.Connection.Address == "" || + conf.NebulaClientSettings.Connection.User == nil || + *conf.NebulaClientSettings.Connection.User == "" || + conf.NebulaClientSettings.Space == nil || + *conf.NebulaClientSettings.Space == "" { + return ecode.WithCode(ecode.ErrParam, nil) + } + + for _, fn := range conf.Files { + if fn.CSV.Delimiter == nil || *fn.CSV.Delimiter == "" { + delimiter := "," + fn.CSV.Delimiter = &delimiter + } + } + + return nil +} + +func readFileLines(path string, offset int64, limit int64) ([]string, error) { + file, err := os.Open(path) + if err != nil { + zap.L().Warn("open file error", zap.Error(err)) + return nil, err + } + defer file.Close() + scanner := bufio.NewScanner(file) + res := make([]string, 0) + if limit != -1 { + for lineIndex := int64(0); scanner.Scan() && lineIndex < offset+limit; lineIndex++ { + if lineIndex >= offset { + res = append(res, scanner.Text()) + } + } + } else { + for lineIndex := int64(0); scanner.Scan(); lineIndex++ { + if lineIndex >= offset { + res = append(res, scanner.Text()) + } + } + } + return res, nil +} diff --git a/server-v2/api/studio/internal/service/importer/importer.go b/server-v2/api/studio/internal/service/importer/importer.go new file mode 100644 index 00000000..3e3d34fb --- /dev/null +++ b/server-v2/api/studio/internal/service/importer/importer.go @@ -0,0 +1,261 @@ +package importer + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "strconv" + "time" + + importconfig "github.com/vesoft-inc/nebula-importer/pkg/config" + importerErrors "github.com/vesoft-inc/nebula-importer/pkg/errors" + "github.com/vesoft-inc/nebula-importer/pkg/logger" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/types" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/utils" + "go.uber.org/zap" + + "gopkg.in/yaml.v2" +) + +type ImportResult struct { + TaskId string `json:"taskId"` + TimeCost string `json:"timeCost"` // Milliseconds + FailedRows int64 `json:"failedRows"` + ErrorResult struct { + ErrorCode int `json:"errorCode"` + ErrorMsg string `json:"errorMsg"` + } +} + +func GetNewTaskDir(tasksDir string) (string, error) { + taskId, err := GetTaskMgr().NewTaskID() + if err != nil { + return "", err + } + taskDir := filepath.Join(tasksDir, taskId) + return taskDir, nil +} + +func CreateConfigFile(uploadDir, taskdir string, config importconfig.YAMLConfig) error { + fileName := "config.yaml" + err := utils.CreateDir(taskdir) + if err := utils.CreateDir(taskdir); err != nil { + return err + } + path := filepath.Join(taskdir, fileName) + // erase user information + address := *config.NebulaClientSettings.Connection.Address + user := *config.NebulaClientSettings.Connection.User + password := *config.NebulaClientSettings.Connection.Password + *config.NebulaClientSettings.Connection.Address = "" + *config.NebulaClientSettings.Connection.User = "" + *config.NebulaClientSettings.Connection.Password = "" + + // erase path infomation + logPath := *config.LogPath + *config.LogPath = "import.log" + paths := make([]string, 0) + failDataPaths := make([]string, 0) + for _, file := range config.Files { + paths = append(paths, filepath.Join(uploadDir, *file.Path)) + failDataPaths = append(failDataPaths, filepath.Join(taskdir, "err", *file.FailDataPath)) + _, fileName := filepath.Split(*file.Path) + _, fileDataName := filepath.Split(*file.FailDataPath) + *file.Path = fileName + *file.FailDataPath = fileDataName + } + + outYaml, err := yaml.Marshal(config) + if err != nil { + return err + } + if err := os.WriteFile(path, outYaml, 0o644); err != nil { + zap.L().Warn("write"+path+"file error", zap.Error(err)) + return err + } + + *config.LogPath = logPath + *config.NebulaClientSettings.Connection.Address = address + *config.NebulaClientSettings.Connection.User = user + *config.NebulaClientSettings.Connection.Password = password + for i, file := range config.Files { + *file.Path = paths[i] + *file.FailDataPath = failDataPaths[i] + } + return nil +} + +func Import(taskID string, conf *importconfig.YAMLConfig) (err error) { + runnerLogger := logger.NewRunnerLogger(*conf.LogPath) + if err := conf.ValidateAndReset("", runnerLogger); err != nil { + return err + } + + task, _ := GetTaskMgr().GetTask(taskID) + go func() { + result := ImportResult{} + now := time.Now() + task.GetRunner().Run(conf) + timeCost := time.Since(now).Milliseconds() + result.TaskId = taskID + result.TimeCost = fmt.Sprintf("%dms", timeCost) + if rerrs := task.GetRunner().Errors(); len(rerrs) != 0 { + allErrIsNotCompleteError := true + for _, rerr := range rerrs { + err := rerr.(importerErrors.ImporterError) + if err.ErrCode != importerErrors.NotCompleteError { + allErrIsNotCompleteError = false + break + } + } + if allErrIsNotCompleteError { + task.TaskInfo.TaskStatus = StatusFinished.String() + result.FailedRows = task.GetRunner().NumFailed + err1 := GetTaskMgr().FinishTask(taskID) + if err1 != nil { + zap.L().Warn("finish task fail", zap.Error(err1)) + } + zap.L().Debug(fmt.Sprintf("Success to finish a import task: `%s`, task result: `%v`", taskID, result)) + return + } + // TODO: return all errors + task.TaskInfo.TaskStatus = StatusAborted.String() + err, _ := rerrs[0].(importerErrors.ImporterError) + result.ErrorResult.ErrorCode = err.ErrCode + result.ErrorResult.ErrorMsg = err.ErrMsg.Error() + task.TaskInfo.TaskMessage = err.ErrMsg.Error() + err1 := GetTaskMgr().AbortTask(taskID) + if err1 != nil { + zap.L().Warn("finish task fail", zap.Error(err1)) + } + zap.L().Warn(fmt.Sprintf("Failed to finish a import task: `%s`, task result: `%v`", taskID, result)) + } else { + task.TaskInfo.TaskStatus = StatusFinished.String() + result.FailedRows = task.GetRunner().NumFailed + err := GetTaskMgr().FinishTask(taskID) + if err != nil { + zap.L().Warn("finish task fail", zap.Error(err)) + } + zap.L().Debug(fmt.Sprintf("Success to finish a import task: `%s`, task result: `%v`", taskID, result)) + } + }() + return nil +} + +func ImportStatus(taskID string) (*TaskInfo, error) { + if t, ok := GetTaskMgr().GetTask(taskID); ok { + if t.GetRunner() != nil { + err := GetTaskMgr().UpdateTaskInfo(taskID) + if err != nil { + return nil, err + } + } + return t.TaskInfo, nil + } + return nil, errors.New("task is not exist") +} + +func DeleteImportTask(tasksDir, taskID, address, username string) error { + if id, err := strconv.Atoi(taskID); err != nil { + zap.L().Warn(fmt.Sprintf("stop task fail, id : %s", taskID), zap.Error(err)) + return errors.New("task not existed") + } else { + _, err := taskmgr.db.FindTaskInfoByIdAndAddresssAndUser(id, address, username) + if err != nil { + zap.L().Warn(fmt.Sprintf("stop task fail, id : %s", taskID), zap.Error(err)) + return errors.New("task not existed") + } + } + err := GetTaskMgr().DelTask(tasksDir, taskID) + if err != nil { + return fmt.Errorf("task del fail, %s", err.Error()) + } + return nil +} + +func GetImportTask(tasksDir, taskID, address, username string) (*types.GetImportTaskData, error) { + task := Task{} + result := &types.GetImportTaskData{} + + if id, err := strconv.Atoi(taskID); err != nil { + zap.L().Warn(fmt.Sprintf("UpdateTaskInfo fail, id : %s", taskID), zap.Error(err)) + return nil, errors.New("task not existed") + } else { + _, err := taskmgr.db.FindTaskInfoByIdAndAddresssAndUser(id, address, username) + if err != nil { + zap.L().Warn(fmt.Sprintf("UpdateTaskInfo fail, id : %s", taskID), zap.Error(err)) + return nil, errors.New("task not existed") + } + } + + err := GetTaskMgr().UpdateTaskInfo(taskID) + if err != nil { + zap.L().Warn(fmt.Sprintf("UpdateTaskInfo fail, id : %s", taskID), zap.Error(err)) + } + if t, ok := GetTaskMgr().GetTask(taskID); ok { + task = *t + result.Id = fmt.Sprintf("%d", task.TaskInfo.ID) + result.Status = task.TaskInfo.TaskStatus + result.CreateTime = task.TaskInfo.CreatedTime + result.UpdateTime = task.TaskInfo.UpdatedTime + result.Address = task.TaskInfo.NebulaAddress + result.User = task.TaskInfo.User + result.Name = task.TaskInfo.Name + result.Space = task.TaskInfo.Space + result.Stats = types.ImportTaskStats(task.TaskInfo.Stats) + } + + return result, nil +} + +func GetManyImportTask(tasksDir, address, username string, page, pageSize int) (*types.GetManyImportTaskData, error) { + result := &types.GetManyImportTaskData{ + Total: 0, + List: []types.GetImportTaskData{}, + } + + taskIDs, err := GetTaskMgr().GetAllTaskIDs(address, username) + if err != nil { + return nil, err + } + + start := (page - 1) * pageSize + stop := page * pageSize + if len(taskIDs) <= start { + return nil, errors.New("invalid parameter") + } else { + if stop >= len(taskIDs) { + stop = len(taskIDs) + } + result.Total = int64(stop - start) + + for i := start; i < stop; i++ { + data, _ := GetImportTask(tasksDir, taskIDs[i], address, username) + result.List = append(result.List, *data) + } + } + + return result, nil +} + +func StopImportTask(taskID, address, username string) error { + if id, err := strconv.Atoi(taskID); err != nil { + zap.L().Warn(fmt.Sprintf("stop task fail, id : %s", taskID), zap.Error(err)) + return errors.New("task not existed") + } else { + _, err := taskmgr.db.FindTaskInfoByIdAndAddresssAndUser(id, address, username) + if err != nil { + zap.L().Warn(fmt.Sprintf("stop task fail, id : %s", taskID), zap.Error(err)) + return errors.New("task not existed") + } + } + + err := GetTaskMgr().StopTask(taskID) + if err != nil { + zap.L().Warn(fmt.Sprintf("stop task fail, id : %s", taskID), zap.Error(err)) + return err + } else { + return nil + } +} diff --git a/server-v2/api/studio/internal/service/importer/task.go b/server-v2/api/studio/internal/service/importer/task.go new file mode 100644 index 00000000..58b9c1ee --- /dev/null +++ b/server-v2/api/studio/internal/service/importer/task.go @@ -0,0 +1,21 @@ +package importer + +import ( + "github.com/vesoft-inc/nebula-importer/pkg/cmd" + "github.com/zeromicro/go-zero/core/logx" +) + +type Task struct { + runner *cmd.Runner `json:"runner,omitempty"` + TaskInfo *TaskInfo `json:"task_info,omitempty"` +} + +func (t *Task) UpdateQueryStats() error { + stats, err := t.runner.QueryStats() + if err != nil { + logx.Infof("query import stats fail: %s", err) + return err + } + t.TaskInfo.Stats = *stats + return nil +} diff --git a/server-v2/api/studio/internal/service/importer/taskInfo.go b/server-v2/api/studio/internal/service/importer/taskInfo.go new file mode 100644 index 00000000..dd393131 --- /dev/null +++ b/server-v2/api/studio/internal/service/importer/taskInfo.go @@ -0,0 +1,16 @@ +package importer + +import "github.com/vesoft-inc/nebula-importer/pkg/stats" + +type TaskInfo struct { + ID int `json:"taskID" gorm:"primaryKey;autoIncrement"` + Name string `json:"name"` + Space string `json:"space"` + NebulaAddress string `json:"nebulaAddress"` + CreatedTime int64 `json:"createdTime"` + UpdatedTime int64 `json:"updatedTime"` + User string `json:"user"` + TaskStatus string `json:"taskStatus"` + TaskMessage string `json:"taskMessage"` + Stats stats.Stats `json:"stats" gorm:"embedded"` +} diff --git a/server-v2/api/studio/internal/service/importer/taskdb.go b/server-v2/api/studio/internal/service/importer/taskdb.go new file mode 100644 index 00000000..6619b2f8 --- /dev/null +++ b/server-v2/api/studio/internal/service/importer/taskdb.go @@ -0,0 +1,87 @@ +package importer + +import ( + "github.com/zeromicro/go-zero/core/logx" + "gorm.io/driver/sqlite" + "gorm.io/gorm" + "gorm.io/gorm/logger" +) + +type TaskDb struct { + *gorm.DB +} + +func InitDB(sqlitedbFilePath string) { + dbFilePath := sqlitedbFilePath + db, err := gorm.Open(sqlite.Open(dbFilePath), &gorm.Config{ + Logger: logger.Default.LogMode(logger.Info), + }) + if err != nil { + logx.Errorf("init db fail: %s", err) + } + + err = db.AutoMigrate(&TaskInfo{}) + if err != nil { + logx.Errorf("init taskInfo table fail: %s", err) + panic(err) + } + GetTaskMgr().db = &TaskDb{ + DB: db, + } + if err := GetTaskMgr().db.UpdateProcessingTasks2Aborted(); err != nil { + logx.Errorf("update processing tasks to aborted failed: %s", err) + panic(err) + } +} + +// FindTaskInfoByIdAndAddresssAndUser used to check whether the task belongs to the user +func (t *TaskDb) FindTaskInfoByIdAndAddresssAndUser(id int, nebulaAddress, user string) (*TaskInfo, error) { + taskInfo := new(TaskInfo) + if err := t.Model(&TaskInfo{}).Where("id = ? AND nebula_address = ? And user = ?", id, nebulaAddress, + user).First(&taskInfo).Error; err != nil { + return nil, err + } + return taskInfo, nil +} + +func (t *TaskDb) InsertTaskInfo(info *TaskInfo) error { + return t.Create(info).Error +} + +func (t *TaskDb) UpdateTaskInfo(info *TaskInfo) error { + return t.Model(&TaskInfo{}).Where("id = ?", info.ID).Updates(info).Error +} + +func (t *TaskDb) DelTaskInfo(ID int) error { + return t.Delete(&TaskInfo{}, ID).Error +} + +func (t *TaskDb) LastId() (int, error) { + var id int + if err := t.Raw("SELECT MAX(id) FROM task_infos").Scan(&id).Error; err != nil { + if err.Error() == "sql: Scan error on column index 0, name \"MAX(id)\": converting NULL to int is unsupported" { + return 0, nil + } + return 0, err + } + return id, nil +} + +func (t *TaskDb) SelectAllIds(nebulaAddress, user string) ([]int, error) { + var taskInfos []TaskInfo + ids := make([]int, 0) + if err := t.Select("id").Where("nebula_address = ? And user = ?", nebulaAddress, user).Find(&taskInfos).Error; err != nil { + return nil, err + } + for _, taskInfo := range taskInfos { + ids = append(ids, taskInfo.ID) + } + return ids, nil +} + +func (t *TaskDb) UpdateProcessingTasks2Aborted() error { + if err := t.Model(&TaskInfo{}).Where("task_status = ?", StatusProcessing.String()).Update("task_status", StatusAborted.String()).Error; err != nil { + return err + } + return nil +} diff --git a/server-v2/api/studio/internal/service/importer/taskmgr.go b/server-v2/api/studio/internal/service/importer/taskmgr.go new file mode 100644 index 00000000..38f66533 --- /dev/null +++ b/server-v2/api/studio/internal/service/importer/taskmgr.go @@ -0,0 +1,269 @@ +package importer + +import ( + "errors" + "fmt" + "os" + "path/filepath" + "strconv" + "sync" + "time" + + "github.com/vesoft-inc/nebula-importer/pkg/cmd" + "github.com/zeromicro/go-zero/core/logx" + + _ "github.com/mattn/go-sqlite3" +) + +var ( + taskmgr *TaskMgr = &TaskMgr{ + tasks: sync.Map{}, + db: &TaskDb{}, + } + + mux sync.Mutex +) + +type TaskMgr struct { + tasks sync.Map + db *TaskDb +} + +func newTask(nebulaAddress string, user string, name string, space string) *Task { + timeUnix := time.Now().Unix() + return &Task{ + runner: &cmd.Runner{}, + TaskInfo: &TaskInfo{ + Name: name, + Space: space, + CreatedTime: timeUnix, + UpdatedTime: timeUnix, + TaskStatus: StatusProcessing.String(), + NebulaAddress: nebulaAddress, + User: user, + }, + } +} + +func (task *Task) GetRunner() *cmd.Runner { + return task.runner +} + +func (mgr *TaskMgr) NewTaskID() (string, error) { + tid, err := mgr.db.LastId() + if err != nil { + return "", err + } + taskID := fmt.Sprintf("%v", tid+1) + return taskID, nil +} + +func (mgr *TaskMgr) NewTask(nebulaAddress string, user string, name string, space string) (*Task, string, error) { + mux.Lock() + defer mux.Unlock() + task := newTask(nebulaAddress, user, name, space) + if err := mgr.db.InsertTaskInfo(task.TaskInfo); err != nil { + return nil, "", err + } + tid, err := mgr.db.LastId() + if err != nil { + return nil, "", err + } + task.TaskInfo.ID = tid + taskID := fmt.Sprintf("%v", tid) + mgr.PutTask(taskID, task) + return task, taskID, nil +} + +func GetTaskMgr() *TaskMgr { + return taskmgr +} + +/* + GetTask get task from map and local sql +*/ +func (mgr *TaskMgr) GetTask(taskID string) (*Task, bool) { + if task, ok := mgr.getTaskFromMap(taskID); ok { + return task, true + } + task := mgr.getTaskFromSQL(taskID) + // did not find task + if task.TaskInfo.ID == 0 { + return nil, false + } + return task, true +} + +/* + PutTask put task into tasks map +*/ +func (mgr *TaskMgr) PutTask(taskID string, task *Task) { + mgr.tasks.Store(taskID, task) +} + +/* + FinishTask will query task stats, delete task in the map + and update the taskInfo in local sql +*/ +func (mgr *TaskMgr) FinishTask(taskID string) (err error) { + task, ok := mgr.getTaskFromMap(taskID) + if !ok { + return + } + if err := task.UpdateQueryStats(); err != nil { + return err + } + timeUnix := time.Now().Unix() + task.TaskInfo.UpdatedTime = timeUnix + err = mgr.db.UpdateTaskInfo(task.TaskInfo) + if err != nil { + return err + } + mgr.tasks.Delete(taskID) + return +} + +func (mgr *TaskMgr) AbortTask(taskID string) (err error) { + task, ok := mgr.getTaskFromMap(taskID) + if !ok { + return + } + timeUnix := time.Now().Unix() + task.TaskInfo.UpdatedTime = timeUnix + err = mgr.db.UpdateTaskInfo(task.TaskInfo) + if err != nil { + return err + } + mgr.tasks.Delete(taskID) + return +} + +func (mgr *TaskMgr) DelTask(tasksDir, taskID string) error { + _, ok := mgr.getTaskFromMap(taskID) + if ok { + mgr.tasks.Delete(taskID) + } + id, err := strconv.Atoi(taskID) + if err != nil { + return errors.New("taskID is wrong") + } + if err = mgr.db.DelTaskInfo(id); err != nil { + return err + } + taskDir := filepath.Join(tasksDir, taskID) + return os.RemoveAll(taskDir) +} + +/* + UpdateTaskInfo will query task stats, update task in the map + and update the taskInfo in local sql +*/ +func (mgr *TaskMgr) UpdateTaskInfo(taskID string) error { + task, ok := mgr.getTaskFromMap(taskID) + if !ok { + return nil + } + if err := task.UpdateQueryStats(); err != nil { + return err + } + timeUnix := time.Now().Unix() + task.TaskInfo.UpdatedTime = timeUnix + return mgr.db.UpdateTaskInfo(task.TaskInfo) +} + +/* + StopTask will change the task status to `StatusStoped`, + and then call FinishTask +*/ +func (mgr *TaskMgr) StopTask(taskID string) error { + if task, ok := mgr.getTaskFromMap(taskID); ok { + if task.GetRunner().Readers == nil { + return errors.New("task is not initialized") + } + for _, r := range task.GetRunner().Readers { + r.Stop() + } + task.TaskInfo.TaskStatus = StatusStoped.String() + if err := mgr.FinishTask(taskID); err != nil { + logx.Alert(fmt.Sprintf("finish task fail: %s", err)) + return err + } + return nil + } + return errors.New("task is finished or not exist") +} + +/* + `GetAllTaskIDs` will return all task ids in map +*/ +func (mgr *TaskMgr) GetAllTaskIDs(nebulaAddress, username string) ([]string, error) { + ids := make([]string, 0) + allIds, err := mgr.db.SelectAllIds(nebulaAddress, username) + if err != nil { + return nil, err + } + for _, id := range allIds { + ids = append(ids, strconv.Itoa(id)) + } + return ids, nil +} + +func (mgr *TaskMgr) getTaskFromMap(taskID string) (*Task, bool) { + if task, ok := mgr.tasks.Load(taskID); ok { + return task.(*Task), true + } + return nil, false +} + +func (mgr *TaskMgr) getTaskFromSQL(taskID string) *Task { + taskInfo := new(TaskInfo) + mgr.db.First(taskInfo, taskID) + task := new(Task) + task.TaskInfo = taskInfo + return task +} + +type TaskStatus int + +/* + the task in memory (map) has 2 status: processing, aborted; + and the task in local sql has 2 status: finished, stoped; +*/ +const ( + StatusUnknown TaskStatus = iota + StatusFinished + StatusStoped + StatusProcessing + StatusNotExisted + StatusAborted +) + +var taskStatusMap = map[TaskStatus]string{ + StatusFinished: "statusFinished", + StatusStoped: "statusStoped", + StatusProcessing: "statusProcessing", + StatusNotExisted: "statusNotExisted", + StatusAborted: "statusAborted", +} + +var taskStatusRevMap = map[string]TaskStatus{ + "statusFinished": StatusFinished, + "statusStoped": StatusStoped, + "statusProcessing": StatusProcessing, + "statusNotExisted": StatusNotExisted, + "statusAborted": StatusAborted, +} + +func NewTaskStatus(status string) TaskStatus { + if v, ok := taskStatusRevMap[status]; ok { + return v + } + return StatusUnknown +} + +func (status TaskStatus) String() string { + if v, ok := taskStatusMap[status]; ok { + return v + } + return "statusUnknown" +} diff --git a/server-v2/api/studio/internal/svc/servicecontext.go b/server-v2/api/studio/internal/svc/servicecontext.go index 6564ff82..bfaad934 100644 --- a/server-v2/api/studio/internal/svc/servicecontext.go +++ b/server-v2/api/studio/internal/svc/servicecontext.go @@ -3,13 +3,14 @@ package svc import ( "database/sql" "errors" + "net/http" "github.com/vesoft-inc/go-pkg/httpclient" "github.com/vesoft-inc/go-pkg/response" "github.com/vesoft-inc/go-pkg/validator" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/common" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/config" "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/ecode" - "github.com/zeromicro/go-zero/core/logx" ) @@ -49,5 +50,11 @@ func createResponseHandler(c config.Config) response.Handler { // nolint:gocriti }, Errorf: logx.Errorf, DebugInfo: c.Debug.Enable, + CheckBodyType: func(r *http.Request) response.StandardHandlerBodyType { + if common.IgnoreHandlerBody(r) { + return response.StandardHandlerBodyNone + } + return response.StandardHandlerBodyJson + }, }) } diff --git a/server-v2/api/studio/internal/types/types.go b/server-v2/api/studio/internal/types/types.go index ee697dba..3ed1ca25 100644 --- a/server-v2/api/studio/internal/types/types.go +++ b/server-v2/api/studio/internal/types/types.go @@ -51,3 +51,201 @@ type FileStat struct { type FilesIndexData struct { List []FileStat `json:"list"` } + +type ImportTaskConnection struct { + User string `json:"user" validate:"required"` + Password string `json:"password" validate:"required"` + Address string `json:"address" validate:"required"` +} + +type ImportTaskClientSettings struct { + Retry int `json:"retry,optional"` + Concurrency int `json:"concurrency,optional"` + ChannelBufferSize int `json:"channelBufferSize,optional"` + Space string `json:"space" validate:"required"` + Connection ImportTaskConnection `json:"connection" validate:"required"` + PostStart ImportTaskPostStart `json:"postStart,optional"` + PreStop ImportTaskPreStop `json:"preStop,optional"` +} + +type ImportTaskPostStart struct { + Commands string `json:"commands" validate:"required"` + AfterPeriod string `json:"afterPeriod" validate:"required"` +} + +type ImportTaskPreStop struct { + Commands string `json:"commands,optional"` +} + +type ImportTaskCSV struct { + WithHeader bool `json:"withHeader,optional"` + WithLabel bool `json:"withLabel,optional"` + Delimiter string `json:"delimiter,optional" default:","` +} + +type ImportTaskVID struct { + Index int64 `json:"index" validate:"required"` + Type string `json:"type" validate:"required"` + Function string `json:"function,optional"` + Prefix string `json:"prefix,optional"` +} + +type ImportTaskTagProp struct { + Name string `json:"name" validate:"required"` + Type string `json:"type" validate:"required"` + Index int64 `json:"index" validate:"required"` +} + +type ImportTaskTag struct { + Name string `json:"name" validate:"required"` + Props []ImportTaskTagProp `json:"props" validate:"required"` +} + +type ImportTaskVertex struct { + VID ImportTaskVID `json:"vid" validate:"required"` + Tags []ImportTaskTag `json:"tags" validate:"required"` +} + +type ImportTaskEdgeID struct { + Index int64 `json:"index" validate:"required"` + Function string `json:"function,optional"` + Type string `json:"type" validate:"required"` + Prefix string `json:"prefix,optional"` +} + +type ImportTaskEdgeRank struct { + Index int64 `json:"index"` +} + +type ImportTaskEdgeProp struct { + Name string `json:"name"` + Type string `json:"type"` + Index int64 `json:"index"` +} + +type ImportTaskEdge struct { + Name string `json:"name" validate:"required"` + SrcVID ImportTaskEdgeID `json:"srcVID" validate:"required"` + DstVID ImportTaskEdgeID `json:"dstVID" validate:"required"` + Rank ImportTaskEdgeRank `json:"rank, optional"` + Props []ImportTaskEdgeProp `json:"props" validate:"required"` +} + +type ImportTaskSchema struct { + Type string `json:"type" validate:"required"` + Edge ImportTaskEdge `json:"edge,optional"` + Vertex ImportTaskVertex `json:"vertex,optional"` +} + +type ImportTaskFile struct { + Path string `json:"path" validate:"required"` + FailDataPath string `json:"failDataPath" validate:"required"` + BatchSize int `json:"batchSize,optional"` + Limit int `json:"limit, optional"` + InOrder bool `json:"inOrder, optional"` + Type string `json:"type" validate:"required"` + CSV ImportTaskCSV `json:"csv" validate:"required"` + Schema ImportTaskSchema `json:"schema" validate:"required"` +} + +type ImportTaskConfig struct { + Version string `json:"version" validate:"required"` + Description string `json:"description,optional"` + RemoveTempFiles bool `json:"removeTempFiles,optional"` + ClientSettings ImportTaskClientSettings `json:"clientSettings" validate:"required"` + Files []ImportTaskFile `json:"files" validate:"required"` +} + +type CreateImportTaskRequest struct { + Name string `json:"name" validate:"required"` + Config ImportTaskConfig `json:"config" validate:"required"` +} + +type CreateImportTaskData struct { + Id string `json:"id"` +} + +type GetImportTaskRequest struct { + Id string `path:"id" validate:"required"` + Address string `form:"address"` + Username string `form:"username"` + Port string `form:"port"` +} + +type GetImportTaskData struct { + Id string `json:"id"` + Name string `json:"name"` + User string `json:"user"` + Address string `json:"address"` + Space string `json:"space"` + Status string `json:"status"` + CreateTime int64 `json:"createTime"` + UpdateTime int64 `json:"updateTime"` + Stats ImportTaskStats `json:"stats"` +} + +type ImportTaskStats struct { + NumFailed int64 `json:"numFailed"` + NumReadFailed int64 `json:"numReadFailed"` + TotalCount int64 `json:"totalCount"` + TotalBatches int64 `json:"totalBatches"` + TotalLatency int64 `json:"totalLatency"` + TotalReqTime int64 `json:"totalReqTime"` + TotalBytes int64 `json:"totalBytes"` + TotalImportedBytes int64 `json:"totalImportedBytes"` +} + +type GetManyImportTaskRequest struct { + Address string `form:"address"` + Username string `form:"username"` + Port string `form:"port"` + Page int `form:"page,default=1"` + PageSize int `form:"pageSize,default=100"` +} + +type GetManyImportTaskData struct { + Total int64 `json:"total"` + List []GetImportTaskData `json:"list"` +} + +type GetManyImportTaskLogRequest struct { + Id string `path:"id" validate:"required"` + File string `form:"file" validate:"required"` + Offset int64 `form:"offset" validate:"min=0"` + Limit int64 `form:"limit" validate:"min=1"` +} + +type GetManyImportTaskLogData struct { + Logs []string `json:"logs"` +} + +type GetImportTaskLogNamesRequest struct { + Id string `path:"id" validate:"required""` +} + +type GetImportTaskLogNamesData struct { + Names []string `json:"names"` +} + +type DeleteImportTaskRequest struct { + Id string `path:"id"` + Address string `form:"address"` + Username string `form:"username"` + Port string `form:"port"` +} + +type StopImportTaskRequest struct { + Id string `path:"id"` + Address string `form:"address"` + Port string `form:"port"` + Username string `form:"username"` +} + +type DownloadLogsRequest struct { + Id string `path:"id" validate:"required"` + Name string `form:"name" validate:"required"` +} + +type DownloadConfigsRequest struct { + Id string `path:"id" validate:"required"` +} diff --git a/server-v2/api/studio/pkg/auth/authorize.go b/server-v2/api/studio/pkg/auth/authorize.go index 6cc6bced..62d4710c 100644 --- a/server-v2/api/studio/pkg/auth/authorize.go +++ b/server-v2/api/studio/pkg/auth/authorize.go @@ -64,7 +64,6 @@ func Decode(tokenString, secret string) (*AuthData, error) { } return []byte(secret), nil }) - if err != nil { if ve, ok := err.(*jwt.ValidationError); ok { if ve.Errors&jwt.ValidationErrorMalformed != 0 { diff --git a/server-v2/api/studio/pkg/config/config.go b/server-v2/api/studio/pkg/config/config.go deleted file mode 100644 index 4c3a99fc..00000000 --- a/server-v2/api/studio/pkg/config/config.go +++ /dev/null @@ -1,115 +0,0 @@ -package config - -import ( - "io/ioutil" - "os" - "path/filepath" - - "go.uber.org/zap" - "gopkg.in/yaml.v2" -) - -var Cfg = new(Config) - -type ( - Config struct { - Web Web `yaml:"web"` - } - - Web struct { - TaskIdPath string `yaml:"task_id_path"` - UploadDir string `yaml:"upload_dir"` - TasksDir string `yaml:"tasks_dir"` - SqlitedbFilePath string `yaml:"sqlitedb_file_path"` - Address string `yaml:"address"` - Port int `yaml:"port"` - } -) - -const ( - DefaultFilesDataDir = "data" - DefaultTaskIdPath = "data/taskId.data" - DefaultUploadDir = "data/upload" - DefaultTasksDir = "data/tasks" - DefaultSqlitedbFilePath = "data/tasks.db" - DefaultAddress = "0.0.0.0" - DefaultPort = 9000 -) - -func (c *Config) Validate() error { - return nil -} - -func (w *Web) Validate() error { - return nil -} - -func (c *Config) Complete() { - c.Web.Complete() -} - -func (w *Web) Complete() { - if w.TaskIdPath == "" { - _, err := os.Stat(DefaultFilesDataDir) - if os.IsNotExist(err) { - os.MkdirAll(DefaultFilesDataDir, 0o766) - } - abs, _ := filepath.Abs(DefaultTaskIdPath) - _, err = ioutil.ReadFile(abs) - if err != nil { - if os.IsNotExist(err) { - _, err := os.Create(abs) - if err != nil { - zap.L().Fatal("DefaultTaskIdPath Init fail", zap.Error(err)) - } - } else { - zap.L().Fatal("DefaultTaskIdPath Init fail", zap.Error(err)) - } - } - w.TaskIdPath = abs - } - if w.UploadDir == "" { - abs, _ := filepath.Abs(DefaultUploadDir) - w.UploadDir = abs - _, err := os.Stat(abs) - if os.IsNotExist(err) { - os.MkdirAll(abs, 0o766) - } - } - if w.TasksDir == "" { - abs, _ := filepath.Abs(DefaultTasksDir) - w.TasksDir = abs - _, err := os.Stat(abs) - if os.IsNotExist(err) { - os.MkdirAll(abs, 0o766) - } - } - if w.SqlitedbFilePath == "" { - _, err := os.Stat(DefaultFilesDataDir) - if os.IsNotExist(err) { - os.MkdirAll(DefaultFilesDataDir, 0o766) - } - abs, _ := filepath.Abs(DefaultSqlitedbFilePath) - w.SqlitedbFilePath = abs - } - if w.Address == "" { - w.Address = DefaultAddress - } - if w.Port == 0 { - w.Port = DefaultPort - } -} - -func InitConfig(path string) error { - yamlFile, err := ioutil.ReadFile(path) - if err != nil { - return err - } - if err := yaml.Unmarshal(yamlFile, Cfg); err != nil { - return err - } - - Cfg.Complete() - - return Cfg.Validate() -} diff --git a/server-v2/api/studio/restapi/file.api b/server-v2/api/studio/restapi/file.api index f4ae3b71..f0f1778c 100644 --- a/server-v2/api/studio/restapi/file.api +++ b/server-v2/api/studio/restapi/file.api @@ -25,11 +25,11 @@ type ( service studio-api { @doc "Upload File" @handler FileUpload - post /api/file + post /api/files @doc "delete file" @handler FileDestroy - delete /api/file/:name returns(FileDestroyRequest) + delete /api/files/:name(FileDestroyRequest) @doc "preview file" @handler FilesIndex - get /api/file returns(FilesIndexData) + get /api/files returns(FilesIndexData) } \ No newline at end of file diff --git a/server-v2/api/studio/restapi/import.api b/server-v2/api/studio/restapi/import.api new file mode 100644 index 00000000..2013b8fc --- /dev/null +++ b/server-v2/api/studio/restapi/import.api @@ -0,0 +1,235 @@ +syntax = "v1" + +type ( + ImportTaskConnection { + User string `json:"user" validate:"required"` + Password string `json:"password" validate:"required"` + Address string `json:"address" validate:"required"` + } + + ImportTaskClientSettings { + Retry int `json:"retry,optional"` + Concurrency int `json:"concurrency,optional"` + ChannelBufferSize int `json:"channelBufferSize,optional"` + Space string `json:"space" validate:"required"` + Connection ImportTaskConnection `json:"connection" validate:"required"` + PostStart ImportTaskPostStart `json:"postStart,optional"` + PreStop ImportTaskPreStop `json:"preStop,optional"` + } + + ImportTaskPostStart { + Commands string `json:"commands" validate:"required"` + AfterPeriod string `json:"afterPeriod" validate:"required"` + } + + ImportTaskPreStop { + Commands string `json:"commands,optional"` + } + + ImportTaskCSV { + WithHeader bool `json:"withHeader,optional"` + WithLabel bool `json:"withLabel,optional"` + Delimiter string `json:"delimiter,optional" default:","` + } + + ImportTaskVID { + Index int64 `json:"index" validate:"required"` + Type string `json:"type" validate:"required"` + Function string `json:"function,optional"` + Prefix string `json:"prefix,optional"` + } + + ImportTaskTagProp { + Name string `json:"name" validate:"required"` + Type string `json:"type" validate:"required"` + Index int64 `json:"index" validate:"required"` + } + + ImportTaskTag { + Name string `json:"name" validate:"required"` + Props []ImportTaskTagProp `json:"props" validate:"required"` + } + + ImportTaskVertex { + VID ImportTaskVID `json:"vid" validate:"required"` + Tags []ImportTaskTag `json:"tags" validate:"required"` + } + + ImportTaskEdgeID { + Index int64 `json:"index" validate:"required"` + Function string `json:"function,optional"` + Type string `json:"type" validate:"required"` + Prefix string `json:"prefix,optional"` + } + + ImportTaskEdgeRank { + Index int64 `json:"index"` + } + + ImportTaskEdgeProp { + Name string `json:"name"` + Type string `json:"type"` + Index int64 `json:"index"` + } + + ImportTaskEdge { + Name string `json:"name" validate:"required"` + SrcVID ImportTaskEdgeID `json:"srcVID" validate:"required"` + DstVID ImportTaskEdgeID `json:"dstVID" validate:"required"` + Rank ImportTaskEdgeRank `json:"rank, optional"` + Props []ImportTaskEdgeProp `json:"props" validate:"required"` + } + + ImportTaskSchema { + Type string `json:"type" validate:"required"` + Edge ImportTaskEdge `json:"edge,optional"` + Vertex ImportTaskVertex `json:"vertex,optional"` + } + + ImportTaskFile { + Path string `json:"path" validate:"required"` + FailDataPath string `json:"failDataPath" validate:"required"` + BatchSize int `json:"batchSize,optional"` + Limit int `json:"limit, optional"` + InOrder bool `json:"inOrder, optional"` + Type string `json:"type" validate:"required"` + CSV ImportTaskCSV `json:"csv" validate:"required"` + Schema ImportTaskSchema `json:"schema" validate:"required"` + } + + ImportTaskConfig { + Version string `json:"version" validate:"required"` + Description string `json:"description,optional"` + RemoveTempFiles bool `json:"removeTempFiles,optional"` + ClientSettings ImportTaskClientSettings `json:"clientSettings" validate:"required"` + Files []ImportTaskFile `json:"files" validate:"required"` + } + + CreateImportTaskRequest { + Name string `json:"name" validate:"required"` + Config ImportTaskConfig `json:"config" validate:"required"` + } + + CreateImportTaskData { + Id string `json:"id"` + } + + GetImportTaskRequest { + Id string `path:"id" validate:"required"` + Address string `form:"address"` + Username string `form:"username"` + Port string `form:"port"` + } + + GetImportTaskData { + Id string `json:"id"` + Name string `json:"name"` + User string `json:"user"` + Address string `json:"address"` + Space string `json:"space"` + Status string `json:"status"` + CreateTime int64 `json:"createTime"` + UpdateTime int64 `json:"updateTime"` + Stats ImportTaskStats `json:"stats"` + } + + ImportTaskStats { + NumFailed int64 `json:"numFailed"` + NumReadFailed int64 `json:"numReadFailed"` + TotalCount int64 `json:"totalCount"` + TotalBatches int64 `json:"totalBatches"` + TotalLatency int64 `json:"totalLatency"` + TotalReqTime int64 `json:"totalReqTime"` + TotalBytes int64 `json:"totalBytes"` + TotalImportedBytes int64 `json:"totalImportedBytes"` + } + + GetManyImportTaskRequest { + Address string `form:"address"` + Username string `form:"username"` + Port string `form:"port"` + Page int `form:"page,default=1"` + PageSize int `form:"pageSize,default=100"` + } + + GetManyImportTaskData { + Total int64 `json:"total"` + List []GetImportTaskData `json:"data"` + } + + GetManyImportTaskLogRequest { + Id string `path:"id" validate:"required"` + File string `form:"file" validate:"required"` + Offset int64 `form:"offset" validate:"min=0"` + Limit int64 `form:"limit" validate:"min=1"` + } + + GetManyImportTaskLogData { + Logs []string `json:"data"` + } + + GetImportTaskLogNamesRequest { + Id string `path:"id" validate:"required""` + } + + GetImportTaskLogNamesData { + Names []string `json:"data"` + } + + DeleteImportTaskRequest { + Id string `path:"id"` + Address string `form:"address"` + Username string `form:"username"` + Port string `form:"port"` + } + + StopImportTaskRequest { + Id string `path:"id"` + Address string `form:"address"` + Port string `form:"port"` + Username string `form:"username"` + } + + DownloadLogsRequest { + Id string `path:"id" validate:"required"` + Name string `form:"name" validate:"required"` + } + + DownloadConfigsRequest { + Id string `path:"id" validate:"required"` + } +) + +@server( + group: importtask +) + +service studio-api { + @doc "Create Import Task" + @handler CreateImportTask + post /api/import-tasks(CreateImportTaskRequest) returns(CreateImportTaskData) + @doc "Get Import Task" + @handler GetImportTask + get /api/import-tasks/:id(GetImportTaskRequest) returns(GetImportTaskData) + @doc "Get Many Import Task" + @handler GetManyImportTask + get /api/import-tasks(GetManyImportTaskRequest) returns(GetManyImportTaskData) + @doc "Get Many Import Task Log" + @handler GetManyImportTaskLog + get /api/import-tasks/:id/logs(GetManyImportTaskLogRequest) returns(GetManyImportTaskLogData) + @doc "Get all logs file name of a Task" + @handler GetImportTaskLogNames + get /api/import-tasks/:id/task-log-names(GetImportTaskLogNamesRequest) returns(GetImportTaskLogNamesData) + @doc "Delete Import Task" + @handler DeleteImportTask + delete /api/import-tasks/:id(DeleteImportTaskRequest) + @doc "Stop Import Task" + @handler StopImportTask + get /api/import-tasks/:id/stop(StopImportTaskRequest) + @doc "Download logs" + @handler DownloadLogs + get /api/import-tasks/:id/download-logs(DownloadLogsRequest) + @doc "Download Config" + @handler DownloadConfig + get /api/import-tasks/:id/download-config(DownloadConfigsRequest) +} \ No newline at end of file diff --git a/server-v2/api/studio/restapi/studio.api b/server-v2/api/studio/restapi/studio.api index 9c362460..0b6cf67a 100644 --- a/server-v2/api/studio/restapi/studio.api +++ b/server-v2/api/studio/restapi/studio.api @@ -9,4 +9,5 @@ import ( "health.api" "gateway.api" "file.api" + "import.api" ) \ No newline at end of file diff --git a/server-v2/api/studio/studio.go b/server-v2/api/studio/studio.go index 55a1258e..a6973419 100644 --- a/server-v2/api/studio/studio.go +++ b/server-v2/api/studio/studio.go @@ -7,10 +7,14 @@ import ( "net/http" "github.com/vesoft-inc/go-pkg/middleware" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/common" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/config" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/handler" + "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/service/importer" "github.com/vesoft-inc/nebula-studio/server/api/studio/internal/svc" "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/auth" + "github.com/vesoft-inc/nebula-studio/server/api/studio/pkg/logging" + "go.uber.org/zap" "github.com/zeromicro/go-zero/core/conf" "github.com/zeromicro/go-zero/rest" @@ -27,6 +31,18 @@ func main() { var c config.Config conf.MustLoad(*configFile, &c, conf.UseEnv()) + // init logger + loggingOptions := logging.NewOptions() + if err := loggingOptions.InitGlobals(); err != nil { + panic(err) + } + + if err := c.InitConfig(); err != nil { + zap.L().Fatal("init config failed", zap.Error(err)) + } + + importer.InitDB(c.File.SqliteDbFilePath) + svcCtx := svc.NewServiceContext(c) server := rest.MustNewServer(c.RestConf, rest.WithNotFoundHandler(middleware.NewAssetsHandler(middleware.AssetsConfig{ Root: "assets", @@ -38,6 +54,16 @@ func main() { // global middleware server.Use(auth.AuthMiddlewareWithCtx(svcCtx)) + server.Use(rest.ToMiddleware(middleware.ReserveRequest(middleware.ReserveRequestConfig{ + Skipper: func(r *http.Request) bool { + return !common.ReserveRequest(r) + }, + }))) + server.Use(rest.ToMiddleware(middleware.ReserveResponseWriter(middleware.ReserveResponseWriterConfig{ + Skipper: func(r *http.Request) bool { + return !common.ReserveResponse(r) + }, + }))) // api handlers handler.RegisterHandlers(server, svcCtx) diff --git a/server-v2/go.mod b/server-v2/go.mod index 1bafbe9b..7808475b 100644 --- a/server-v2/go.mod +++ b/server-v2/go.mod @@ -3,10 +3,20 @@ module github.com/vesoft-inc/nebula-studio/server go 1.17 require ( - github.com/vesoft-inc/go-pkg v0.0.0-20220511092334-a180a9379d8d + github.com/vesoft-inc/go-pkg v0.0.0-20220516090733-5ce93ad3254b + github.com/vesoft-inc/nebula-importer v1.0.1-0.20220505095506-93febd41c2be github.com/zeromicro/go-zero v1.3.3 ) +require github.com/vesoft-inc/nebula-go/v3 v3.0.0-20220425030225-cdb52399b40a // indirect + +require ( + github.com/jinzhu/inflection v1.0.0 // indirect + github.com/jinzhu/now v1.1.5 // indirect + github.com/mattn/go-sqlite3 v1.14.12 + gorm.io/gorm v1.23.4 +) + require ( github.com/facebook/fbthrift v0.31.1-0.20211129061412-801ed7f9f295 // indirect github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b // indirect @@ -53,4 +63,5 @@ require ( google.golang.org/grpc v1.46.0 // indirect google.golang.org/protobuf v1.28.0 // indirect gopkg.in/yaml.v2 v2.4.0 + gorm.io/driver/sqlite v1.3.2 ) diff --git a/server-v2/go.sum b/server-v2/go.sum index f6c28bf6..c513edad 100644 --- a/server-v2/go.sum +++ b/server-v2/go.sum @@ -250,6 +250,11 @@ github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/U github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= +github.com/jinzhu/inflection v1.0.0 h1:K317FqzuhWc8YvSVlFMCCUb36O/S9MCKRDI7QkRKD/E= +github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= +github.com/jinzhu/now v1.1.4/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= +github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= +github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= @@ -289,6 +294,8 @@ github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Ky github.com/mattn/go-runewidth v0.0.9/go.mod h1:H031xJmbD/WCDINGzjvQ9THkh0rPKHF+m2gUSrubnMI= github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= +github.com/mattn/go-sqlite3 v1.14.12 h1:TJ1bhYJPV44phC+IMu1u2K/i5RriLTPe+yc68XDJ1Z0= +github.com/mattn/go-sqlite3 v1.14.12/go.mod h1:NyWgC/yNuGj7Q9rpYnZvas74GogHl5/Z4A/KQRfk6bU= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369 h1:I0XW9+e1XWDxdcEniV4rQAIOPUGDq67JSCiRCgGCZLI= github.com/matttproud/golang_protobuf_extensions v1.0.2-0.20181231171920-c182affec369/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= @@ -391,10 +398,14 @@ github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMT github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= -github.com/vesoft-inc/go-pkg v0.0.0-20220511092334-a180a9379d8d h1:Q/eVc0H8CuQNmviD43vAxtJKyB3aFd9R8Z7pOdxokoA= -github.com/vesoft-inc/go-pkg v0.0.0-20220511092334-a180a9379d8d/go.mod h1:HCAXRhF2io+nPLQnl+RQ6XyVcp1Xdv6NgslXRBBCiEU= +github.com/vesoft-inc/go-pkg v0.0.0-20220516090733-5ce93ad3254b h1:MVAkGU2YH1p3PhWN0T+r0bkv+gn5b33J2tEdnXXZAUE= +github.com/vesoft-inc/go-pkg v0.0.0-20220516090733-5ce93ad3254b/go.mod h1:HCAXRhF2io+nPLQnl+RQ6XyVcp1Xdv6NgslXRBBCiEU= +github.com/vesoft-inc/nebula-go/v3 v3.0.0-20220425030225-cdb52399b40a h1:/8l9RT6gU0cUS1Cgzqv3A9dKto19VQBjVk1BqAAqqvM= +github.com/vesoft-inc/nebula-go/v3 v3.0.0-20220425030225-cdb52399b40a/go.mod h1:+sXv05jYQBARdTbTcIEsWVXCnF/6ttOlDK35xQ6m54s= github.com/vesoft-inc/nebula-http-gateway/ccore v0.0.0-20220413113447-a3f4c56287d8 h1:iL92Uk6hAe4vUBK/L99wf5295HYOtnD4plctVA5xek0= github.com/vesoft-inc/nebula-http-gateway/ccore v0.0.0-20220413113447-a3f4c56287d8/go.mod h1:sFEvE+cY4TgwqWx6H6msOqAUzRhsEHHKaaMgIZENHuQ= +github.com/vesoft-inc/nebula-importer v1.0.1-0.20220505095506-93febd41c2be h1:7YGSREZ6uS1WjCdKTlHEvhSeYURJFY7UXQZ008+EB2Y= +github.com/vesoft-inc/nebula-importer v1.0.1-0.20220505095506-93febd41c2be/go.mod h1:8xAQi6KI2qe40Dop/GqDXmBEurt7qGp5Pjd1MESAVNA= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= github.com/xdg-go/scram v1.0.2/go.mod h1:1WAq6h33pAW+iRreB34OORO2Nf7qel3VV3fjBj+hCSs= github.com/xdg-go/stringprep v1.0.2/go.mod h1:8F9zXuvzgwmyT5DUm4GUfZGDdT3W+LCvS6+da4O5kxM= @@ -790,6 +801,10 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b h1:h8qDotaEPuJATrMmW04NCwg7v22aHH28wwpauUhK9Oo= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gorm.io/driver/sqlite v1.3.2 h1:nWTy4cE52K6nnMhv23wLmur9Y3qWbZvOBz+V4PrGAxg= +gorm.io/driver/sqlite v1.3.2/go.mod h1:B+8GyC9K7VgzJAcrcXMRPdnMcck+8FgJynEehEPM16U= +gorm.io/gorm v1.23.4 h1:1BKWM67O6CflSLcwGQR7ccfmC4ebOxQrTfOQGRE9wjg= +gorm.io/gorm v1.23.4/go.mod h1:l2lP/RyAtc1ynaTjFksBde/O8v9oOGIApu2/xRitmZk= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=