Skip to content

Commit

Permalink
fix(task): make /admin{export, backup} endpoints async
Browse files Browse the repository at this point in the history
This commit also removes the /admin/export and /admin/backup
endpoints for creating export and backup respectively.

This also removes the exportedFiles field (#7835) (#7836)
  • Loading branch information
ajeetdsouza authored and mangalaman93 committed Jan 20, 2023
1 parent cbec230 commit 45a5ec1
Show file tree
Hide file tree
Showing 9 changed files with 56 additions and 160 deletions.
42 changes: 0 additions & 42 deletions dgraph/cmd/alpha/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,6 @@ func getAdminMux() *http.ServeMux {
http.MethodPut: true,
http.MethodPost: true,
}, adminAuthHandler(http.HandlerFunc(drainingHandler))))
adminMux.Handle("/admin/export", allowedMethodsHandler(allowedMethods{http.MethodGet: true},
adminAuthHandler(http.HandlerFunc(exportHandler))))
adminMux.Handle("/admin/config/cache_mb", allowedMethodsHandler(allowedMethods{
http.MethodGet: true,
http.MethodPut: true,
Expand Down Expand Up @@ -159,46 +157,6 @@ func shutDownHandler(w http.ResponseWriter, r *http.Request) {
x.Check2(w.Write([]byte(`{"code": "Success", "message": "Server is shutting down"}`)))
}

func exportHandler(w http.ResponseWriter, r *http.Request) {
if err := r.ParseForm(); err != nil {
x.SetHttpStatus(w, http.StatusBadRequest, "Parse of export request failed.")
return
}

format := worker.DefaultExportFormat
if vals, ok := r.Form["format"]; ok {
if len(vals) > 1 {
x.SetHttpStatus(w, http.StatusBadRequest,
"Only one export format may be specified.")
return
}
format = worker.NormalizeExportFormat(vals[0])
if format == "" {
x.SetHttpStatus(w, http.StatusBadRequest, "Invalid export format.")
return
}
}

gqlReq := &schema.Request{
Query: `
mutation export($format: String) {
export(input: {format: $format}) {
response {
code
}
}
}`,
Variables: map[string]interface{}{"format": format},
}

if resp := resolveWithAdminServer(gqlReq, r, adminServer); len(resp.Errors) != 0 {
x.SetStatus(w, resp.Errors[0].Message, "Export failed.")
return
}
w.Header().Set("Content-Type", "application/json")
x.Check2(w.Write([]byte(`{"code": "Success", "message": "Export completed."}`)))
}

func memoryLimitHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case http.MethodGet:
Expand Down
69 changes: 0 additions & 69 deletions dgraph/cmd/alpha/admin_backup.go

This file was deleted.

5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ require (
github.com/dgraph-io/dgo/v210 v210.0.0-20210407152819-261d1c2a6987
github.com/dgraph-io/gqlgen v0.13.2
github.com/dgraph-io/gqlparser/v2 v2.2.1
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed
github.com/dgraph-io/ristretto v0.1.1
github.com/dgraph-io/simdjson-go v0.3.0
github.com/dgrijalva/jwt-go v3.2.0+incompatible
Expand All @@ -40,6 +40,7 @@ require (
github.com/google/uuid v1.0.0
github.com/gorilla/websocket v1.4.2
github.com/graph-gophers/graphql-go v0.0.0-20200309224638-dae41bde9ef9
github.com/graph-gophers/graphql-transport-ws v0.0.2 // indirect
github.com/hashicorp/vault/api v1.0.4
github.com/minio/minio-go/v6 v6.0.55
github.com/mitchellh/panicwrap v1.0.0
Expand All @@ -53,7 +54,7 @@ require (
github.com/spf13/cobra v0.0.5
github.com/spf13/pflag v1.0.3
github.com/spf13/viper v1.7.1
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.7.0
github.com/twpayne/go-geom v1.0.5
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c
go.etcd.io/etcd v0.5.0-alpha.5.0.20190108173120-83c051b701d3
Expand Down
10 changes: 7 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,8 @@ github.com/dgraph-io/gqlgen v0.13.2/go.mod h1:iCOrOv9lngN7KAo+jMgvUPVDlYHdf7qDws
github.com/dgraph-io/gqlparser/v2 v2.1.1/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU=
github.com/dgraph-io/gqlparser/v2 v2.2.1 h1:15msK9XEHOSrRqQO48UU+2ZTf1R1U8+tfL9H5D5/eQQ=
github.com/dgraph-io/gqlparser/v2 v2.2.1/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15 h1:X2NRsgAtVUAp2nmTPCq+x+wTcRRrj74CEpy7E0Unsl4=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210511143556-2cef522f1f15/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed h1:pgGMBoTtFhR+xkyzINaToLYRurHn+6pxMYffIGmmEPc=
github.com/dgraph-io/graphql-transport-ws v0.0.0-20210223074046-e5b8b80bb4ed/go.mod h1:7z3c/5w0sMYYZF5bHsrh8IH4fKwG5O5Y70cPH1ZLLRQ=
github.com/dgraph-io/ristretto v0.1.1 h1:6CWw5tJNgpegArSHpNHJKldNeq03FQCwYvfMVWajOK8=
github.com/dgraph-io/ristretto v0.1.1/go.mod h1:S1GPSBCYCIhmVNfcth17y2zZtQT6wzkzgwUve0VDWWA=
github.com/dgraph-io/simdjson-go v0.3.0 h1:h71LO7vR4LHMPUhuoGN8bqGm1VNfGOlAG8BI6iDUKw0=
Expand Down Expand Up @@ -281,10 +281,13 @@ github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2z
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/graph-gophers/graphql-go v0.0.0-20200309224638-dae41bde9ef9 h1:kLnsdud6Fl1/7ZX/5oD23cqYAzBfuZBhNkGr2NvuEsU=
github.com/graph-gophers/graphql-go v0.0.0-20200309224638-dae41bde9ef9/go.mod h1:9CQHMSxwO4MprSdzoIEobiHpoLtHm77vfxsvsIN5Vuc=
github.com/graph-gophers/graphql-transport-ws v0.0.2 h1:DbmSkbIGzj8SvHei6n8Mh9eLQin8PtA8xY9eCzjRpvo=
github.com/graph-gophers/graphql-transport-ws v0.0.2/go.mod h1:5BVKvFzOd2BalVIBFfnfmHjpJi/MZ5rOj8G55mXvZ8g=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.4.1/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
Expand Down Expand Up @@ -589,8 +592,9 @@ github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXf
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
Expand Down
2 changes: 1 addition & 1 deletion graphql/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,8 +259,8 @@ const (
}
type ExportPayload {
exportedFiles: [String]
response: Response
taskId: String
}
type DrainingPayload {
Expand Down
18 changes: 5 additions & 13 deletions graphql/admin/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package admin
import (
"context"
"encoding/json"
"fmt"
"math"

"github.com/golang/glog"
Expand Down Expand Up @@ -92,30 +93,21 @@ func resolveExport(ctx context.Context, m schema.Mutation) (*resolve.Resolved, b
SessionToken: input.SessionToken,
Anonymous: input.Anonymous,
}

files, err := worker.ExportOverNetwork(context.Background(), req)
taskId, err := worker.Tasks.Enqueue(req)
if err != nil {
return resolve.EmptyResult(m, err), false
}

data := response("Success", "Export completed.")
data["exportedFiles"] = toInterfaceSlice(files)
msg := fmt.Sprintf("Export queued with ID %#x", taskId)
data := response("Success", msg)
data["taskId"] = fmt.Sprintf("%#x", taskId)
return resolve.DataResult(
m,
map[string]interface{}{m.Name(): data},
nil,
), true
}

// toInterfaceSlice converts []string to []interface{}
func toInterfaceSlice(in []string) []interface{} {
out := make([]interface{}, 0, len(in))
for _, s := range in {
out = append(out, s)
}
return out
}

func getExportInput(m schema.Mutation) (*exportInput, error) {
inputArg := m.ArgValue(schema.InputArgName)
inputByts, err := json.Marshal(inputArg)
Expand Down
4 changes: 4 additions & 0 deletions graphql/e2e/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ func TestDeleteSchemaAndExport(t *testing.T) {
Query: `mutation {
export(input: {format: "rdf"}) {
response { code }
taskId
}
}`,
}
Expand All @@ -682,7 +683,10 @@ func TestDeleteSchemaAndExport(t *testing.T) {

var data interface{}
require.NoError(t, json.Unmarshal(exportGqlResp.Data, &data))

require.Equal(t, "Success", testutil.JsonGet(data, "export", "response", "code").(string))
taskId := testutil.JsonGet(data, "export", "taskId").(string)
testutil.WaitForTask(t, taskId, false)

// applying a new schema should still work
newSchemaResp := common.AssertUpdateGQLSchemaSuccess(t, groupOneHTTP, schema, nil)
Expand Down
63 changes: 33 additions & 30 deletions systest/export/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"net/http"
"os"
"path/filepath"
"strings"
"testing"

minio "github.com/minio/minio-go/v6"
Expand Down Expand Up @@ -52,19 +53,17 @@ func TestExportSchemaToMinio(t *testing.T) {
mc.MakeBucket(bucketName, "")

setupDgraph(t, moviesData, movieSchema)
result := requestExport(t, minioDest, "rdf")
require.Equal(t, "Success", testutil.JsonGet(result, "data", "export", "response", "code").(string))
require.Equal(
t, "Export completed.", testutil.JsonGet(result, "data", "export", "response", "message").(string))

var files []string
for _, f := range testutil.JsonGet(result, "data", "export", "exportedFiles").([]interface{}) {
files = append(files, f.(string))
requestExport(t, minioDest, "rdf")

schemaFile := ""
doneCh := make(chan struct{})
defer close(doneCh)
for obj := range mc.ListObjectsV2(bucketName, "dgraph.", true, doneCh) {
if strings.Contains(obj.Key, ".schema.gz") {
schemaFile = obj.Key
}
}
require.Equal(t, 3, len(files))

schemaFile := files[1]
require.Contains(t, schemaFile, ".schema.gz")
require.NotEmpty(t, schemaFile)

object, err := mc.GetObject(bucketName, schemaFile, minio.GetObjectOptions{})
require.NoError(t, err)
Expand Down Expand Up @@ -112,14 +111,11 @@ func TestExportAndLoadJson(t *testing.T) {
setupDgraph(t, moviesData, movieSchema)

// Run export
const destination = "/data/export-data"
requestExport(t, destination, "json")

requestExport(t, "/data/export-data", "json")
copyToLocalFs(t)
files, err := ioutil.ReadDir(copyExportDir)
require.NoError(t, err)
require.Len(t, files, 1)
exportDir := filepath.Join(copyExportDir, files[0].Name())

q := `{ q(func:has(movie)) { count(uid) } }`

Expand All @@ -136,7 +132,12 @@ func TestExportAndLoadJson(t *testing.T) {
require.JSONEq(t, `{"data": {"q": [{"count":0}]}}`, res)

// Live load the exported data
loadData(t, exportDir, "json")
files, err = ioutil.ReadDir(copyExportDir)
require.NoError(t, err)
require.Len(t, files, 1)
exportName := files[0].Name()
dir := filepath.Join(copyExportDir, exportName)
loadData(t, dir, "json")

res = runQuery(t, q)
require.JSONEq(t, `{"data":{"q":[{"count": 5}]}}`, res)
Expand Down Expand Up @@ -170,12 +171,10 @@ func TestExportAndLoadJsonFacets(t *testing.T) {

// Run export
requestExport(t, "/data/export-data", "json")

copyToLocalFs(t)
files, err := ioutil.ReadDir(copyExportDir)
require.NoError(t, err)
require.Len(t, files, 1)
exportDir := filepath.Join(copyExportDir, files[0].Name())

checkRes := func() {
// Check value posting.
Expand Down Expand Up @@ -215,7 +214,11 @@ func TestExportAndLoadJsonFacets(t *testing.T) {
require.JSONEq(t, `{"data": {"q": []}}`, res)

// Live load the exported data and verify that exported data is loaded correctly.
dir := filepath.Join(exportDir)
files, err = ioutil.ReadDir(copyExportDir)
require.NoError(t, err)
require.Len(t, files, 1)
exportName := files[0].Name()
dir := filepath.Join(copyExportDir, exportName)
loadData(t, dir, "json")

// verify that the state after loading the exported data as same.
Expand Down Expand Up @@ -288,11 +291,13 @@ func setupDgraph(t *testing.T, nquads, schema string) {
require.NoError(t, err)
}

func requestExport(t *testing.T, dest string, format string) map[string]interface{} {
func requestExport(t *testing.T, dest string, format string) {
exportRequest := `mutation export($dst: String!, $f: String!) {
export(input: {destination: $dst, format: $f}) {
response { code message }
exportedFiles
response {
code
}
taskId
}
}`

Expand All @@ -310,11 +315,9 @@ func requestExport(t *testing.T, dest string, format string) map[string]interfac
resp, err := http.Post(adminUrl, "application/json", bytes.NewBuffer(b))
require.NoError(t, err)

buf, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)

var result map[string]interface{}
require.NoError(t, json.Unmarshal(buf, &result))

return result
var data interface{}
require.NoError(t, json.NewDecoder(resp.Body).Decode(&data))
require.Equal(t, "Success", testutil.JsonGet(data, "data", "export", "response", "code").(string))
taskId := testutil.JsonGet(data, "data", "export", "taskId").(string)
testutil.WaitForTask(t, taskId, false)
}
Loading

0 comments on commit 45a5ec1

Please sign in to comment.