From a8c253b6509c56bc6d58b22aae20addd3f55c54d Mon Sep 17 00:00:00 2001 From: Keenan Nemetz Date: Fri, 8 Sep 2023 09:46:11 -0700 Subject: [PATCH] refactor: HTTP client interface (#1776) ## Relevant issue(s) ## Description This PR is split into two to make reviewing easier: #1839 This PR adds an HTTP client and server implementation that implements the `client.DB` interface and runs through the existing integration test suite. ## Tasks - [x] I made sure the code is well commented, particularly hard-to-understand areas. - [x] I made sure the repository-held documentation is changed accordingly. - [x] I made sure the pull request title adheres to the conventional commit style (the subset used in the project can be found in [tools/configs/chglog/config.yml](tools/configs/chglog/config.yml)). - [x] I made sure to discuss its limitations such as threats to validity, vulnerability to mistake and misuse, robustness to invalidation of assumptions, resource requirements, ... ## How has this been tested? `make test` Specify the platform(s) on which this was tested: - MacOS --- Makefile | 20 +- datastore/memory/txn.go | 8 +- go.mod | 1 + go.sum | 5 + http/client.go | 418 ++++++++++++++++++ http/client_collection.go | 405 +++++++++++++++++ http/client_lens.go | 147 ++++++ http/client_tx.go | 84 ++++ http/errors.go | 51 +++ http/handler_collection.go | 328 ++++++++++++++ http/handler_lens.go | 107 +++++ http/handler_store.go | 332 ++++++++++++++ http/handler_tx.go | 95 ++++ http/http_client.go | 86 ++++ http/logger.go | 52 +++ http/middleware.go | 145 ++++++ http/server.go | 111 +++++ http/utils.go | 65 +++ http/wrapper.go | 177 ++++++++ http/wrapper_tx.go | 70 +++ tests/integration/explain.go | 135 +++--- tests/integration/lens.go | 30 +- tests/integration/p2p.go | 5 - .../query/one_to_many/simple_test.go | 2 +- tests/integration/results.go | 180 ++++++++ tests/integration/state.go | 5 + tests/integration/utils2.go | 210 +++++---- 27 files changed, 3106 insertions(+), 168 deletions(-) create mode 100644 http/client.go create mode 100644 http/client_collection.go create mode 100644 http/client_lens.go create mode 100644 http/client_tx.go create mode 100644 http/errors.go create mode 100644 http/handler_collection.go create mode 100644 http/handler_lens.go create mode 100644 http/handler_store.go create mode 100644 http/handler_tx.go create mode 100644 http/http_client.go create mode 100644 http/logger.go create mode 100644 http/middleware.go create mode 100644 http/server.go create mode 100644 http/utils.go create mode 100644 http/wrapper.go create mode 100644 http/wrapper_tx.go create mode 100644 tests/integration/results.go diff --git a/Makefile b/Makefile index 7102f0dacf..6eb3456fcc 100644 --- a/Makefile +++ b/Makefile @@ -29,7 +29,7 @@ ifdef BUILD_TAGS BUILD_FLAGS+=-tags $(BUILD_TAGS) endif -TEST_FLAGS=-race -shuffle=on -timeout 210s +TEST_FLAGS=-race -shuffle=on -timeout 300s PLAYGROUND_DIRECTORY=playground LENS_TEST_DIRECTORY=tests/integration/schema/migrations @@ -187,11 +187,15 @@ test\:build: .PHONY: test\:ci test\:ci: - DEFRA_BADGER_MEMORY=true DEFRA_BADGER_FILE=true $(MAKE) test:all + DEFRA_BADGER_MEMORY=true DEFRA_BADGER_FILE=true \ + DEFRA_CLIENT_GO=true DEFRA_CLIENT_HTTP=true \ + $(MAKE) test:all .PHONY: test\:ci-gql-mutations test\:ci-gql-mutations: - DEFRA_MUTATION_TYPE=gql DEFRA_BADGER_MEMORY=true $(MAKE) test:all + DEFRA_MUTATION_TYPE=gql DEFRA_BADGER_MEMORY=true \ + DEFRA_CLIENT_GO=true DEFRA_CLIENT_HTTP=true \ + $(MAKE) test:all .PHONY: test\:gql-mutations test\:gql-mutations: @@ -204,7 +208,9 @@ test\:gql-mutations: # UpdateDoc will call [Collection.Update]. .PHONY: test\:ci-col-named-mutations test\:ci-col-named-mutations: - DEFRA_MUTATION_TYPE=collection-named DEFRA_BADGER_MEMORY=true $(MAKE) test:all + DEFRA_MUTATION_TYPE=collection-named DEFRA_BADGER_MEMORY=true \ + DEFRA_CLIENT_GO=true DEFRA_CLIENT_HTTP=true \ + $(MAKE) test:all .PHONY: test\:col-named-mutations test\:col-named-mutations: @@ -214,6 +220,10 @@ test\:col-named-mutations: test\:go: go test $(DEFAULT_TEST_DIRECTORIES) $(TEST_FLAGS) +.PHONY: test\:http +test\:http: + DEFRA_CLIENT_HTTP=true go test $(DEFAULT_TEST_DIRECTORIES) $(TEST_FLAGS) + .PHONY: test\:names test\:names: gotestsum --format testname -- $(DEFAULT_TEST_DIRECTORIES) $(TEST_FLAGS) @@ -285,7 +295,7 @@ test\:coverage-html: .PHONY: test\:changes test\:changes: @$(MAKE) deps:lens - env DEFRA_DETECT_DATABASE_CHANGES=true gotestsum -- ./... -shuffle=on -p 1 + env DEFRA_DETECT_DATABASE_CHANGES=true DEFRA_CLIENT_GO=true gotestsum -- ./... -shuffle=on -p 1 .PHONY: validate\:codecov validate\:codecov: diff --git a/datastore/memory/txn.go b/datastore/memory/txn.go index 3cd7ab2bf9..7430077e46 100644 --- a/datastore/memory/txn.go +++ b/datastore/memory/txn.go @@ -123,9 +123,9 @@ func (t *basicTxn) GetSize(ctx context.Context, key ds.Key) (size int, err error // Has implements ds.Has. func (t *basicTxn) Has(ctx context.Context, key ds.Key) (exists bool, err error) { - t.ds.closeLk.RLock() - defer t.ds.closeLk.RUnlock() - if t.ds.closed { + t.closeLk.RLock() + defer t.closeLk.RUnlock() + if t.closed { return false, ErrClosed } @@ -162,7 +162,7 @@ func (t *basicTxn) Put(ctx context.Context, key ds.Key, value []byte) error { func (t *basicTxn) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) { t.closeLk.RLock() defer t.closeLk.RUnlock() - if t.ds.closed { + if t.closed { return nil, ErrClosed } diff --git a/go.mod b/go.mod index 4c0753b885..d74ab4b87f 100644 --- a/go.mod +++ b/go.mod @@ -44,6 +44,7 @@ require ( github.com/tidwall/btree v1.6.0 github.com/ugorji/go/codec v1.2.11 github.com/valyala/fastjson v1.6.4 + github.com/vito/go-sse v1.0.0 go.opentelemetry.io/otel/metric v1.17.0 go.opentelemetry.io/otel/sdk/metric v0.40.0 go.uber.org/zap v1.25.0 diff --git a/go.sum b/go.sum index 258ee383d7..8596b06304 100644 --- a/go.sum +++ b/go.sum @@ -1035,6 +1035,7 @@ github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OS github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo= github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM= github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A= +github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs= github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA= github.com/olekukonko/tablewriter v0.0.0-20170122224234-a0225b3f23b5/go.mod h1:vsDQFd/mU46D+Z4whnwzcISnGGzXWMclvtLoiIKAKIo= @@ -1044,6 +1045,7 @@ github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+W github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg= github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk= github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY= +github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/ginkgo/v2 v2.11.0 h1:WgqUCUt/lT6yXoQ8Wef0fsNn5cAuMK7+KT9UFRz2tcU= github.com/onsi/ginkgo/v2 v2.11.0/go.mod h1:ZhrRA5XmEE3x3rhlzamx/JJvujdZoJ2uvgI7kR0iZvM= github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= @@ -1272,6 +1274,8 @@ github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXV github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= +github.com/vito/go-sse v1.0.0 h1:e6/iTrrvy8BRrOwJwmQmlndlil+TLdxXvHi55ZDzH6M= +github.com/vito/go-sse v1.0.0/go.mod h1:2wkcaQ+jtlZ94Uve8gYZjFpL68luAjssTINA2hpgcZs= github.com/warpfork/go-testmark v0.12.1 h1:rMgCpJfwy1sJ50x0M0NgyphxYYPMOODIJHhsXyEHU0s= github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= @@ -1818,6 +1822,7 @@ gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/src-d/go-cli.v0 v0.0.0-20181105080154-d492247bbc0d/go.mod h1:z+K8VcOYVYcSwSjGebuDL6176A1XskgbtNl64NSg+n8= gopkg.in/src-d/go-log.v1 v1.0.1/go.mod h1:GN34hKP0g305ysm2/hctJ0Y8nWP3zxXXJ8GFabTyABE= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= diff --git a/http/client.go b/http/client.go new file mode 100644 index 0000000000..16a8924a65 --- /dev/null +++ b/http/client.go @@ -0,0 +1,418 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package http + +import ( + "bytes" + "context" + "encoding/json" + "io" + "net/http" + "net/url" + "strings" + + blockstore "github.com/ipfs/boxo/blockstore" + sse "github.com/vito/go-sse/sse" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/datastore" + "github.com/sourcenetwork/defradb/events" +) + +var _ client.DB = (*Client)(nil) + +// Client implements the client.DB interface over HTTP. +type Client struct { + http *httpClient +} + +func NewClient(rawURL string) (*Client, error) { + baseURL, err := url.Parse(rawURL) + if err != nil { + return nil, err + } + httpClient := newHttpClient(baseURL.JoinPath("/api/v0")) + return &Client{httpClient}, nil +} + +func (c *Client) NewTxn(ctx context.Context, readOnly bool) (datastore.Txn, error) { + query := url.Values{} + if readOnly { + query.Add("read_only", "true") + } + + methodURL := c.http.baseURL.JoinPath("tx") + methodURL.RawQuery = query.Encode() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, methodURL.String(), nil) + if err != nil { + return nil, err + } + var txRes CreateTxResponse + if err := c.http.requestJson(req, &txRes); err != nil { + return nil, err + } + return &Transaction{txRes.ID, c.http}, nil +} + +func (c *Client) NewConcurrentTxn(ctx context.Context, readOnly bool) (datastore.Txn, error) { + query := url.Values{} + if readOnly { + query.Add("read_only", "true") + } + + methodURL := c.http.baseURL.JoinPath("tx", "concurrent") + methodURL.RawQuery = query.Encode() + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, methodURL.String(), nil) + if err != nil { + return nil, err + } + var txRes CreateTxResponse + if err := c.http.requestJson(req, &txRes); err != nil { + return nil, err + } + return &Transaction{txRes.ID, c.http}, nil +} + +func (c *Client) WithTxn(tx datastore.Txn) client.Store { + client := c.http.withTxn(tx.ID()) + return &Client{client} +} + +func (c *Client) SetReplicator(ctx context.Context, rep client.Replicator) error { + methodURL := c.http.baseURL.JoinPath("p2p", "replicators") + + body, err := json.Marshal(rep) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, methodURL.String(), bytes.NewBuffer(body)) + if err != nil { + return err + } + _, err = c.http.request(req) + return err +} + +func (c *Client) DeleteReplicator(ctx context.Context, rep client.Replicator) error { + methodURL := c.http.baseURL.JoinPath("p2p", "replicators") + + body, err := json.Marshal(rep) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, methodURL.String(), bytes.NewBuffer(body)) + if err != nil { + return err + } + _, err = c.http.request(req) + return err +} + +func (c *Client) GetAllReplicators(ctx context.Context) ([]client.Replicator, error) { + methodURL := c.http.baseURL.JoinPath("p2p", "replicators") + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, methodURL.String(), nil) + if err != nil { + return nil, err + } + var reps []client.Replicator + if err := c.http.requestJson(req, &reps); err != nil { + return nil, err + } + return reps, nil +} + +func (c *Client) AddP2PCollection(ctx context.Context, collectionID string) error { + methodURL := c.http.baseURL.JoinPath("p2p", "collections", collectionID) + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, methodURL.String(), nil) + if err != nil { + return err + } + _, err = c.http.request(req) + return err +} + +func (c *Client) RemoveP2PCollection(ctx context.Context, collectionID string) error { + methodURL := c.http.baseURL.JoinPath("p2p", "collections", collectionID) + + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, methodURL.String(), nil) + if err != nil { + return err + } + _, err = c.http.request(req) + return err +} + +func (c *Client) GetAllP2PCollections(ctx context.Context) ([]string, error) { + methodURL := c.http.baseURL.JoinPath("p2p", "collections") + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, methodURL.String(), nil) + if err != nil { + return nil, err + } + var cols []string + if err := c.http.requestJson(req, &cols); err != nil { + return nil, err + } + return cols, nil +} + +func (c *Client) BasicImport(ctx context.Context, filepath string) error { + methodURL := c.http.baseURL.JoinPath("backup", "import") + + body, err := json.Marshal(&client.BackupConfig{Filepath: filepath}) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, methodURL.String(), bytes.NewBuffer(body)) + if err != nil { + return err + } + _, err = c.http.request(req) + return err +} + +func (c *Client) BasicExport(ctx context.Context, config *client.BackupConfig) error { + methodURL := c.http.baseURL.JoinPath("backup", "export") + + body, err := json.Marshal(config) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, methodURL.String(), bytes.NewBuffer(body)) + if err != nil { + return err + } + _, err = c.http.request(req) + return err +} + +func (c *Client) AddSchema(ctx context.Context, schema string) ([]client.CollectionDescription, error) { + methodURL := c.http.baseURL.JoinPath("schema") + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, methodURL.String(), strings.NewReader(schema)) + if err != nil { + return nil, err + } + var cols []client.CollectionDescription + if err := c.http.requestJson(req, &cols); err != nil { + return nil, err + } + return cols, nil +} + +func (c *Client) PatchSchema(ctx context.Context, patch string) error { + methodURL := c.http.baseURL.JoinPath("schema") + + req, err := http.NewRequestWithContext(ctx, http.MethodPatch, methodURL.String(), strings.NewReader(patch)) + if err != nil { + return err + } + _, err = c.http.request(req) + return err +} + +func (c *Client) SetMigration(ctx context.Context, config client.LensConfig) error { + return c.LensRegistry().SetMigration(ctx, config) +} + +func (c *Client) LensRegistry() client.LensRegistry { + return &LensRegistry{c.http} +} + +func (c *Client) GetCollectionByName(ctx context.Context, name client.CollectionName) (client.Collection, error) { + methodURL := c.http.baseURL.JoinPath("collections") + methodURL.RawQuery = url.Values{"name": []string{name}}.Encode() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, methodURL.String(), nil) + if err != nil { + return nil, err + } + var description client.CollectionDescription + if err := c.http.requestJson(req, &description); err != nil { + return nil, err + } + return &Collection{c.http, description}, nil +} + +func (c *Client) GetCollectionBySchemaID(ctx context.Context, schemaId string) (client.Collection, error) { + methodURL := c.http.baseURL.JoinPath("collections") + methodURL.RawQuery = url.Values{"schema_id": []string{schemaId}}.Encode() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, methodURL.String(), nil) + if err != nil { + return nil, err + } + var description client.CollectionDescription + if err := c.http.requestJson(req, &description); err != nil { + return nil, err + } + return &Collection{c.http, description}, nil +} + +func (c *Client) GetCollectionByVersionID(ctx context.Context, versionId string) (client.Collection, error) { + methodURL := c.http.baseURL.JoinPath("collections") + methodURL.RawQuery = url.Values{"version_id": []string{versionId}}.Encode() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, methodURL.String(), nil) + if err != nil { + return nil, err + } + var description client.CollectionDescription + if err := c.http.requestJson(req, &description); err != nil { + return nil, err + } + return &Collection{c.http, description}, nil +} + +func (c *Client) GetAllCollections(ctx context.Context) ([]client.Collection, error) { + methodURL := c.http.baseURL.JoinPath("collections") + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, methodURL.String(), nil) + if err != nil { + return nil, err + } + var descriptions []client.CollectionDescription + if err := c.http.requestJson(req, &descriptions); err != nil { + return nil, err + } + collections := make([]client.Collection, len(descriptions)) + for i, d := range descriptions { + collections[i] = &Collection{c.http, d} + } + return collections, nil +} + +func (c *Client) GetAllIndexes(ctx context.Context) (map[client.CollectionName][]client.IndexDescription, error) { + methodURL := c.http.baseURL.JoinPath("indexes") + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, methodURL.String(), nil) + if err != nil { + return nil, err + } + var indexes map[client.CollectionName][]client.IndexDescription + if err := c.http.requestJson(req, &indexes); err != nil { + return nil, err + } + return indexes, nil +} + +func (c *Client) ExecRequest(ctx context.Context, query string) *client.RequestResult { + methodURL := c.http.baseURL.JoinPath("graphql") + result := &client.RequestResult{} + + body, err := json.Marshal(&GraphQLRequest{query}) + if err != nil { + result.GQL.Errors = []error{err} + return result + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, methodURL.String(), bytes.NewBuffer(body)) + if err != nil { + result.GQL.Errors = []error{err} + return result + } + c.http.setDefaultHeaders(req) + + res, err := c.http.client.Do(req) + if err != nil { + result.GQL.Errors = []error{err} + return result + } + if res.Header.Get("Content-Type") == "text/event-stream" { + result.Pub = c.execRequestSubscription(ctx, res.Body) + return result + } + // ignore close errors because they have + // no perceivable effect on the end user + // and cannot be reconciled easily + defer res.Body.Close() //nolint:errcheck + + data, err := io.ReadAll(res.Body) + if err != nil { + result.GQL.Errors = []error{err} + return result + } + var response GraphQLResponse + if err = json.Unmarshal(data, &response); err != nil { + result.GQL.Errors = []error{err} + return result + } + result.GQL.Data = response.Data + result.GQL.Errors = response.Errors + return result +} + +func (c *Client) execRequestSubscription(ctx context.Context, r io.ReadCloser) *events.Publisher[events.Update] { + pubCh := events.New[events.Update](0, 0) + pub, err := events.NewPublisher[events.Update](pubCh, 0) + if err != nil { + return nil + } + + go func() { + eventReader := sse.NewReadCloser(r) + // ignore close errors because the status + // and body of the request are already + // checked and it cannot be handled properly + defer eventReader.Close() //nolint:errcheck + + for { + evt, err := eventReader.Next() + if err != nil { + return + } + var response GraphQLResponse + if err := json.Unmarshal(evt.Data, &response); err != nil { + return + } + pub.Publish(client.GQLResult{ + Errors: response.Errors, + Data: response.Data, + }) + } + }() + + return pub +} + +func (c *Client) PrintDump(ctx context.Context) error { + methodURL := c.http.baseURL.JoinPath("debug", "dump") + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, methodURL.String(), nil) + if err != nil { + return err + } + _, err = c.http.request(req) + return err +} + +func (c *Client) Close(ctx context.Context) { + // do nothing +} + +func (c *Client) Root() datastore.RootStore { + panic("client side database") +} + +func (c *Client) Blockstore() blockstore.Blockstore { + panic("client side database") +} + +func (c *Client) Events() events.Events { + panic("client side database") +} + +func (c *Client) MaxTxnRetries() int { + panic("client side database") +} diff --git a/http/client_collection.go b/http/client_collection.go new file mode 100644 index 0000000000..713d45d2bf --- /dev/null +++ b/http/client_collection.go @@ -0,0 +1,405 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package http + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "net/url" + "strings" + + sse "github.com/vito/go-sse/sse" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/client/request" + "github.com/sourcenetwork/defradb/datastore" +) + +var _ client.Collection = (*Collection)(nil) + +// Collection implements the client.Collection interface over HTTP. +type Collection struct { + http *httpClient + desc client.CollectionDescription +} + +func (c *Collection) Description() client.CollectionDescription { + return c.desc +} + +func (c *Collection) Name() string { + return c.desc.Name +} + +func (c *Collection) Schema() client.SchemaDescription { + return c.desc.Schema +} + +func (c *Collection) ID() uint32 { + return c.desc.ID +} + +func (c *Collection) SchemaID() string { + return c.desc.Schema.SchemaID +} + +func (c *Collection) Create(ctx context.Context, doc *client.Document) error { + methodURL := c.http.baseURL.JoinPath("collections", c.desc.Name) + + body, err := doc.String() + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, methodURL.String(), strings.NewReader(body)) + if err != nil { + return err + } + _, err = c.http.request(req) + if err != nil { + return err + } + doc.Clean() + return nil +} + +func (c *Collection) CreateMany(ctx context.Context, docs []*client.Document) error { + methodURL := c.http.baseURL.JoinPath("collections", c.desc.Name) + + var docMapList []json.RawMessage + for _, doc := range docs { + docMap, err := documentJSON(doc) + if err != nil { + return err + } + docMapList = append(docMapList, docMap) + } + body, err := json.Marshal(docMapList) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, methodURL.String(), bytes.NewBuffer(body)) + if err != nil { + return err + } + _, err = c.http.request(req) + if err != nil { + return err + } + for _, doc := range docs { + doc.Clean() + } + return nil +} + +func (c *Collection) Update(ctx context.Context, doc *client.Document) error { + methodURL := c.http.baseURL.JoinPath("collections", c.desc.Name, doc.Key().String()) + + body, err := documentJSON(doc) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPatch, methodURL.String(), bytes.NewBuffer(body)) + if err != nil { + return err + } + _, err = c.http.request(req) + if err != nil { + return err + } + doc.Clean() + return nil +} + +func (c *Collection) Save(ctx context.Context, doc *client.Document) error { + _, err := c.Get(ctx, doc.Key(), true) + if err == nil { + return c.Update(ctx, doc) + } + if errors.Is(err, client.ErrDocumentNotFound) { + return c.Create(ctx, doc) + } + return err +} + +func (c *Collection) Delete(ctx context.Context, docKey client.DocKey) (bool, error) { + methodURL := c.http.baseURL.JoinPath("collections", c.desc.Name, docKey.String()) + + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, methodURL.String(), nil) + if err != nil { + return false, err + } + _, err = c.http.request(req) + if err != nil { + return false, err + } + return true, nil +} + +func (c *Collection) Exists(ctx context.Context, docKey client.DocKey) (bool, error) { + _, err := c.Get(ctx, docKey, false) + if err != nil { + return false, err + } + return true, nil +} + +func (c *Collection) UpdateWith(ctx context.Context, target any, updater string) (*client.UpdateResult, error) { + switch t := target.(type) { + case string, map[string]any, *request.Filter: + return c.UpdateWithFilter(ctx, t, updater) + case client.DocKey: + return c.UpdateWithKey(ctx, t, updater) + case []client.DocKey: + return c.UpdateWithKeys(ctx, t, updater) + default: + return nil, client.ErrInvalidUpdateTarget + } +} + +func (c *Collection) updateWith( + ctx context.Context, + request CollectionUpdateRequest, +) (*client.UpdateResult, error) { + methodURL := c.http.baseURL.JoinPath("collections", c.desc.Name) + + body, err := json.Marshal(request) + if err != nil { + return nil, err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPatch, methodURL.String(), bytes.NewBuffer(body)) + if err != nil { + return nil, err + } + var result client.UpdateResult + if err := c.http.requestJson(req, &result); err != nil { + return nil, err + } + return &result, nil +} + +func (c *Collection) UpdateWithFilter( + ctx context.Context, + filter any, + updater string, +) (*client.UpdateResult, error) { + return c.updateWith(ctx, CollectionUpdateRequest{ + Filter: filter, + Updater: updater, + }) +} + +func (c *Collection) UpdateWithKey( + ctx context.Context, + key client.DocKey, + updater string, +) (*client.UpdateResult, error) { + return c.updateWith(ctx, CollectionUpdateRequest{ + Key: key.String(), + Updater: updater, + }) +} + +func (c *Collection) UpdateWithKeys( + ctx context.Context, + docKeys []client.DocKey, + updater string, +) (*client.UpdateResult, error) { + var keys []string + for _, key := range docKeys { + keys = append(keys, key.String()) + } + return c.updateWith(ctx, CollectionUpdateRequest{ + Keys: keys, + Updater: updater, + }) +} + +func (c *Collection) DeleteWith(ctx context.Context, target any) (*client.DeleteResult, error) { + switch t := target.(type) { + case string, map[string]any, *request.Filter: + return c.DeleteWithFilter(ctx, t) + case client.DocKey: + return c.DeleteWithKey(ctx, t) + case []client.DocKey: + return c.DeleteWithKeys(ctx, t) + default: + return nil, client.ErrInvalidDeleteTarget + } +} + +func (c *Collection) deleteWith( + ctx context.Context, + request CollectionDeleteRequest, +) (*client.DeleteResult, error) { + methodURL := c.http.baseURL.JoinPath("collections", c.desc.Name) + + body, err := json.Marshal(request) + if err != nil { + return nil, err + } + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, methodURL.String(), bytes.NewBuffer(body)) + if err != nil { + return nil, err + } + var result client.DeleteResult + if err := c.http.requestJson(req, &result); err != nil { + return nil, err + } + return &result, nil +} + +func (c *Collection) DeleteWithFilter(ctx context.Context, filter any) (*client.DeleteResult, error) { + return c.deleteWith(ctx, CollectionDeleteRequest{ + Filter: filter, + }) +} + +func (c *Collection) DeleteWithKey(ctx context.Context, docKey client.DocKey) (*client.DeleteResult, error) { + return c.deleteWith(ctx, CollectionDeleteRequest{ + Key: docKey.String(), + }) +} + +func (c *Collection) DeleteWithKeys(ctx context.Context, docKeys []client.DocKey) (*client.DeleteResult, error) { + var keys []string + for _, key := range docKeys { + keys = append(keys, key.String()) + } + return c.deleteWith(ctx, CollectionDeleteRequest{ + Keys: keys, + }) +} + +func (c *Collection) Get(ctx context.Context, key client.DocKey, showDeleted bool) (*client.Document, error) { + query := url.Values{} + if showDeleted { + query.Add("show_deleted", "true") + } + + methodURL := c.http.baseURL.JoinPath("collections", c.desc.Name, key.String()) + methodURL.RawQuery = query.Encode() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, methodURL.String(), nil) + if err != nil { + return nil, err + } + var docMap map[string]any + if err := c.http.requestJson(req, &docMap); err != nil { + return nil, err + } + return client.NewDocFromMap(docMap) +} + +func (c *Collection) WithTxn(tx datastore.Txn) client.Collection { + return &Collection{ + http: c.http.withTxn(tx.ID()), + desc: c.desc, + } +} + +func (c *Collection) GetAllDocKeys(ctx context.Context) (<-chan client.DocKeysResult, error) { + methodURL := c.http.baseURL.JoinPath("collections", c.desc.Name) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, methodURL.String(), nil) + if err != nil { + return nil, err + } + c.http.setDefaultHeaders(req) + + res, err := c.http.client.Do(req) + if err != nil { + return nil, err + } + docKeyCh := make(chan client.DocKeysResult) + + go func() { + eventReader := sse.NewReadCloser(res.Body) + // ignore close errors because the status + // and body of the request are already + // checked and it cannot be handled properly + defer eventReader.Close() //nolint:errcheck + defer close(docKeyCh) + + for { + evt, err := eventReader.Next() + if err != nil { + return + } + var res DocKeyResult + if err := json.Unmarshal(evt.Data, &res); err != nil { + return + } + key, err := client.NewDocKeyFromString(res.Key) + if err != nil { + return + } + docKey := client.DocKeysResult{ + Key: key, + } + if res.Error != "" { + docKey.Err = fmt.Errorf(res.Error) + } + docKeyCh <- docKey + } + }() + + return docKeyCh, nil +} + +func (c *Collection) CreateIndex( + ctx context.Context, + indexDesc client.IndexDescription, +) (client.IndexDescription, error) { + methodURL := c.http.baseURL.JoinPath("collections", c.desc.Name, "indexes") + + body, err := json.Marshal(&indexDesc) + if err != nil { + return client.IndexDescription{}, err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, methodURL.String(), bytes.NewBuffer(body)) + if err != nil { + return client.IndexDescription{}, err + } + var index client.IndexDescription + if err := c.http.requestJson(req, &index); err != nil { + return client.IndexDescription{}, err + } + return index, nil +} + +func (c *Collection) DropIndex(ctx context.Context, indexName string) error { + methodURL := c.http.baseURL.JoinPath("collections", c.desc.Name, "indexes", indexName) + + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, methodURL.String(), nil) + if err != nil { + return err + } + _, err = c.http.request(req) + return err +} + +func (c *Collection) GetIndexes(ctx context.Context) ([]client.IndexDescription, error) { + methodURL := c.http.baseURL.JoinPath("collections", c.desc.Name, "indexes") + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, methodURL.String(), nil) + if err != nil { + return nil, err + } + var indexes []client.IndexDescription + if err := c.http.requestJson(req, &indexes); err != nil { + return nil, err + } + return c.desc.Indexes, nil +} diff --git a/http/client_lens.go b/http/client_lens.go new file mode 100644 index 0000000000..3c8c2fc903 --- /dev/null +++ b/http/client_lens.go @@ -0,0 +1,147 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package http + +import ( + "bytes" + "context" + "encoding/json" + "net/http" + + "github.com/sourcenetwork/immutable/enumerable" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/datastore" +) + +var _ client.LensRegistry = (*LensRegistry)(nil) + +// LensRegistry implements the client.LensRegistry interface over HTTP. +type LensRegistry struct { + http *httpClient +} + +func (c *LensRegistry) WithTxn(tx datastore.Txn) client.LensRegistry { + http := c.http.withTxn(tx.ID()) + return &LensRegistry{http} +} + +func (c *LensRegistry) SetMigration(ctx context.Context, config client.LensConfig) error { + methodURL := c.http.baseURL.JoinPath("lens") + + body, err := json.Marshal(config) + if err != nil { + return err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, methodURL.String(), bytes.NewBuffer(body)) + if err != nil { + return err + } + _, err = c.http.request(req) + return err +} + +func (c *LensRegistry) ReloadLenses(ctx context.Context) error { + methodURL := c.http.baseURL.JoinPath("lens", "reload") + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, methodURL.String(), nil) + if err != nil { + return err + } + _, err = c.http.request(req) + return err +} + +func (c *LensRegistry) MigrateUp( + ctx context.Context, + src enumerable.Enumerable[map[string]any], + schemaVersionID string, +) (enumerable.Enumerable[map[string]any], error) { + methodURL := c.http.baseURL.JoinPath("lens", schemaVersionID, "up") + + var data []map[string]any + err := enumerable.ForEach(src, func(item map[string]any) { + data = append(data, item) + }) + if err != nil { + return nil, err + } + body, err := json.Marshal(data) + if err != nil { + return nil, err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, methodURL.String(), bytes.NewBuffer(body)) + if err != nil { + return nil, err + } + var result []map[string]any + if err := c.http.requestJson(req, &result); err != nil { + return nil, err + } + return enumerable.New(result), nil +} + +func (c *LensRegistry) MigrateDown( + ctx context.Context, + src enumerable.Enumerable[map[string]any], + schemaVersionID string, +) (enumerable.Enumerable[map[string]any], error) { + methodURL := c.http.baseURL.JoinPath("lens", schemaVersionID, "down") + + var data []map[string]any + err := enumerable.ForEach(src, func(item map[string]any) { + data = append(data, item) + }) + if err != nil { + return nil, err + } + body, err := json.Marshal(data) + if err != nil { + return nil, err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, methodURL.String(), bytes.NewBuffer(body)) + if err != nil { + return nil, err + } + var result []map[string]any + if err := c.http.requestJson(req, &result); err != nil { + return nil, err + } + return enumerable.New(result), nil +} + +func (c *LensRegistry) Config(ctx context.Context) ([]client.LensConfig, error) { + methodURL := c.http.baseURL.JoinPath("lens") + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, methodURL.String(), nil) + if err != nil { + return nil, err + } + var cfgs []client.LensConfig + if err := c.http.requestJson(req, &cfgs); err != nil { + return nil, err + } + return cfgs, nil +} + +func (c *LensRegistry) HasMigration(ctx context.Context, schemaVersionID string) (bool, error) { + methodURL := c.http.baseURL.JoinPath("lens", schemaVersionID) + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, methodURL.String(), nil) + if err != nil { + return false, err + } + _, err = c.http.request(req) + if err != nil { + return false, err + } + return true, nil +} diff --git a/http/client_tx.go b/http/client_tx.go new file mode 100644 index 0000000000..8df82007a6 --- /dev/null +++ b/http/client_tx.go @@ -0,0 +1,84 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package http + +import ( + "context" + "fmt" + "net/http" + + "github.com/sourcenetwork/defradb/datastore" +) + +var _ datastore.Txn = (*Transaction)(nil) + +// Transaction implements the datastore.Txn interface over HTTP. +type Transaction struct { + id uint64 + http *httpClient +} + +func (c *Transaction) ID() uint64 { + return c.id +} + +func (c *Transaction) Commit(ctx context.Context) error { + methodURL := c.http.baseURL.JoinPath("tx", fmt.Sprintf("%d", c.id)) + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, methodURL.String(), nil) + if err != nil { + return err + } + _, err = c.http.request(req) + return err +} + +func (c *Transaction) Discard(ctx context.Context) { + methodURL := c.http.baseURL.JoinPath("tx", fmt.Sprintf("%d", c.id)) + + req, err := http.NewRequestWithContext(ctx, http.MethodDelete, methodURL.String(), nil) + if err != nil { + return + } + c.http.request(req) //nolint:errcheck +} + +func (c *Transaction) OnSuccess(fn func()) { + panic("client side transaction") +} + +func (c *Transaction) OnError(fn func()) { + panic("client side transaction") +} + +func (c *Transaction) OnDiscard(fn func()) { + panic("client side transaction") +} + +func (c *Transaction) Rootstore() datastore.DSReaderWriter { + panic("client side transaction") +} + +func (c *Transaction) Datastore() datastore.DSReaderWriter { + panic("client side transaction") +} + +func (c *Transaction) Headstore() datastore.DSReaderWriter { + panic("client side transaction") +} + +func (c *Transaction) DAGstore() datastore.DAGStore { + panic("client side transaction") +} + +func (c *Transaction) Systemstore() datastore.DSReaderWriter { + panic("client side transaction") +} diff --git a/http/errors.go b/http/errors.go new file mode 100644 index 0000000000..c2808603cf --- /dev/null +++ b/http/errors.go @@ -0,0 +1,51 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package http + +import ( + "encoding/json" + "errors" +) + +const ( + errInvalidRequestBody = "invalid request body" + errDocKeyDoesNotMatch = "document key does not match" + errStreamingNotSupported = "streaming not supported" + errMigrationNotFound = "migration not found" + errMissingRequest = "missing request" + errInvalidTransactionId = "invalid transaction id" +) + +var ( + ErrInvalidRequestBody = errors.New(errInvalidRequestBody) + ErrDocKeyDoesNotMatch = errors.New(errDocKeyDoesNotMatch) + ErrStreamingNotSupported = errors.New(errStreamingNotSupported) + ErrMigrationNotFound = errors.New(errMigrationNotFound) + ErrMissingRequest = errors.New(errMissingRequest) + ErrInvalidTransactionId = errors.New(errInvalidTransactionId) +) + +type errorResponse struct { + Error error `json:"error"` +} + +func (e errorResponse) MarshalJSON() ([]byte, error) { + return json.Marshal(map[string]any{"error": e.Error.Error()}) +} + +func (e *errorResponse) UnmarshalJSON(data []byte) error { + var out map[string]any + if err := json.Unmarshal(data, &out); err != nil { + return err + } + e.Error = parseError(out["error"]) + return nil +} diff --git a/http/handler_collection.go b/http/handler_collection.go new file mode 100644 index 0000000000..8f8ff8423b --- /dev/null +++ b/http/handler_collection.go @@ -0,0 +1,328 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package http + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "strconv" + + "github.com/go-chi/chi/v5" + + "github.com/sourcenetwork/defradb/client" +) + +type collectionHandler struct{} + +type CollectionDeleteRequest struct { + Key string `json:"key"` + Keys []string `json:"keys"` + Filter any `json:"filter"` +} + +type CollectionUpdateRequest struct { + Key string `json:"key"` + Keys []string `json:"keys"` + Filter any `json:"filter"` + Updater string `json:"updater"` +} + +func (s *collectionHandler) Create(rw http.ResponseWriter, req *http.Request) { + col := req.Context().Value(colContextKey).(client.Collection) + + var body any + if err := requestJSON(req, &body); err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + + switch t := body.(type) { + case []map[string]any: + var docList []*client.Document + for _, docMap := range t { + doc, err := client.NewDocFromMap(docMap) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + docList = append(docList, doc) + } + if err := col.CreateMany(req.Context(), docList); err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + rw.WriteHeader(http.StatusOK) + case map[string]any: + doc, err := client.NewDocFromMap(t) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + if err := col.Create(req.Context(), doc); err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + rw.WriteHeader(http.StatusOK) + default: + responseJSON(rw, http.StatusBadRequest, errorResponse{ErrInvalidRequestBody}) + } +} + +func (s *collectionHandler) DeleteWith(rw http.ResponseWriter, req *http.Request) { + col := req.Context().Value(colContextKey).(client.Collection) + + var request CollectionDeleteRequest + if err := requestJSON(req, &request); err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + + switch { + case request.Filter != nil: + result, err := col.DeleteWith(req.Context(), request.Filter) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + responseJSON(rw, http.StatusOK, result) + case request.Key != "": + docKey, err := client.NewDocKeyFromString(request.Key) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + result, err := col.DeleteWith(req.Context(), docKey) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + responseJSON(rw, http.StatusOK, result) + case request.Keys != nil: + var docKeys []client.DocKey + for _, key := range request.Keys { + docKey, err := client.NewDocKeyFromString(key) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + docKeys = append(docKeys, docKey) + } + result, err := col.DeleteWith(req.Context(), docKeys) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + responseJSON(rw, http.StatusOK, result) + default: + responseJSON(rw, http.StatusBadRequest, errorResponse{ErrInvalidRequestBody}) + } +} + +func (s *collectionHandler) UpdateWith(rw http.ResponseWriter, req *http.Request) { + col := req.Context().Value(colContextKey).(client.Collection) + + var request CollectionUpdateRequest + if err := requestJSON(req, &request); err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + + switch { + case request.Filter != nil: + result, err := col.UpdateWith(req.Context(), request.Filter, request.Updater) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + responseJSON(rw, http.StatusOK, result) + case request.Key != "": + docKey, err := client.NewDocKeyFromString(request.Key) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + result, err := col.UpdateWith(req.Context(), docKey, request.Updater) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + responseJSON(rw, http.StatusOK, result) + case request.Keys != nil: + var docKeys []client.DocKey + for _, key := range request.Keys { + docKey, err := client.NewDocKeyFromString(key) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + docKeys = append(docKeys, docKey) + } + result, err := col.UpdateWith(req.Context(), docKeys, request.Updater) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + responseJSON(rw, http.StatusOK, result) + default: + responseJSON(rw, http.StatusBadRequest, errorResponse{ErrInvalidRequestBody}) + } +} + +func (s *collectionHandler) Update(rw http.ResponseWriter, req *http.Request) { + col := req.Context().Value(colContextKey).(client.Collection) + + docKey, err := client.NewDocKeyFromString(chi.URLParam(req, "key")) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + doc, err := col.Get(req.Context(), docKey, true) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + patch, err := io.ReadAll(req.Body) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + if err := doc.SetWithJSON(patch); err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + err = col.Update(req.Context(), doc) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + rw.WriteHeader(http.StatusOK) +} + +func (s *collectionHandler) Delete(rw http.ResponseWriter, req *http.Request) { + col := req.Context().Value(colContextKey).(client.Collection) + + docKey, err := client.NewDocKeyFromString(chi.URLParam(req, "key")) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + _, err = col.Delete(req.Context(), docKey) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + rw.WriteHeader(http.StatusOK) +} + +func (s *collectionHandler) Get(rw http.ResponseWriter, req *http.Request) { + col := req.Context().Value(colContextKey).(client.Collection) + showDeleted, _ := strconv.ParseBool(req.URL.Query().Get("show_deleted")) + + docKey, err := client.NewDocKeyFromString(chi.URLParam(req, "key")) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + doc, err := col.Get(req.Context(), docKey, showDeleted) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + docMap, err := doc.ToMap() + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + responseJSON(rw, http.StatusOK, docMap) +} + +type DocKeyResult struct { + Key string `json:"key"` + Error string `json:"error"` +} + +func (s *collectionHandler) GetAllDocKeys(rw http.ResponseWriter, req *http.Request) { + col := req.Context().Value(colContextKey).(client.Collection) + + flusher, ok := rw.(http.Flusher) + if !ok { + responseJSON(rw, http.StatusBadRequest, errorResponse{ErrStreamingNotSupported}) + return + } + + docKeyCh, err := col.GetAllDocKeys(req.Context()) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + + rw.Header().Set("Content-Type", "text/event-stream") + rw.Header().Set("Cache-Control", "no-cache") + rw.Header().Set("Connection", "keep-alive") + + rw.WriteHeader(http.StatusOK) + flusher.Flush() + + for docKey := range docKeyCh { + results := &DocKeyResult{ + Key: docKey.Key.String(), + } + if docKey.Err != nil { + results.Error = docKey.Err.Error() + } + data, err := json.Marshal(results) + if err != nil { + return + } + fmt.Fprintf(rw, "data: %s\n\n", data) + flusher.Flush() + } +} + +func (s *collectionHandler) CreateIndex(rw http.ResponseWriter, req *http.Request) { + col := req.Context().Value(colContextKey).(client.Collection) + + var indexDesc client.IndexDescription + if err := requestJSON(req, &indexDesc); err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + index, err := col.CreateIndex(req.Context(), indexDesc) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + responseJSON(rw, http.StatusOK, index) +} + +func (s *collectionHandler) GetIndexes(rw http.ResponseWriter, req *http.Request) { + col := req.Context().Value(colContextKey).(client.Collection) + + indexes, err := col.GetIndexes(req.Context()) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + responseJSON(rw, http.StatusOK, indexes) +} + +func (s *collectionHandler) DropIndex(rw http.ResponseWriter, req *http.Request) { + col := req.Context().Value(colContextKey).(client.Collection) + + err := col.DropIndex(req.Context(), chi.URLParam(req, "index")) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + rw.WriteHeader(http.StatusOK) +} diff --git a/http/handler_lens.go b/http/handler_lens.go new file mode 100644 index 0000000000..ccf8dd01a8 --- /dev/null +++ b/http/handler_lens.go @@ -0,0 +1,107 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package http + +import ( + "net/http" + + "github.com/go-chi/chi/v5" + "github.com/sourcenetwork/immutable/enumerable" + + "github.com/sourcenetwork/defradb/client" +) + +type lensHandler struct{} + +func (s *lensHandler) ReloadLenses(rw http.ResponseWriter, req *http.Request) { + lens := req.Context().Value(lensContextKey).(client.LensRegistry) + + err := lens.ReloadLenses(req.Context()) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + rw.WriteHeader(http.StatusOK) +} + +func (s *lensHandler) SetMigration(rw http.ResponseWriter, req *http.Request) { + lens := req.Context().Value(lensContextKey).(client.LensRegistry) + + var cfg client.LensConfig + if err := requestJSON(req, &cfg); err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + err := lens.SetMigration(req.Context(), cfg) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + rw.WriteHeader(http.StatusOK) +} + +func (s *lensHandler) MigrateUp(rw http.ResponseWriter, req *http.Request) { + lens := req.Context().Value(lensContextKey).(client.LensRegistry) + + var src []map[string]any + if err := requestJSON(req, &src); err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + result, err := lens.MigrateUp(req.Context(), enumerable.New(src), chi.URLParam(req, "version")) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + responseJSON(rw, http.StatusOK, result) +} + +func (s *lensHandler) MigrateDown(rw http.ResponseWriter, req *http.Request) { + lens := req.Context().Value(lensContextKey).(client.LensRegistry) + + var src []map[string]any + if err := requestJSON(req, &src); err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + result, err := lens.MigrateDown(req.Context(), enumerable.New(src), chi.URLParam(req, "version")) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + responseJSON(rw, http.StatusOK, result) +} + +func (s *lensHandler) Config(rw http.ResponseWriter, req *http.Request) { + lens := req.Context().Value(lensContextKey).(client.LensRegistry) + + cfgs, err := lens.Config(req.Context()) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + responseJSON(rw, http.StatusOK, cfgs) +} + +func (s *lensHandler) HasMigration(rw http.ResponseWriter, req *http.Request) { + lens := req.Context().Value(lensContextKey).(client.LensRegistry) + + exists, err := lens.HasMigration(req.Context(), chi.URLParam(req, "version")) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + if !exists { + responseJSON(rw, http.StatusNotFound, errorResponse{ErrMigrationNotFound}) + return + } + rw.WriteHeader(http.StatusOK) +} diff --git a/http/handler_store.go b/http/handler_store.go new file mode 100644 index 0000000000..d0cbdf42d2 --- /dev/null +++ b/http/handler_store.go @@ -0,0 +1,332 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package http + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/go-chi/chi/v5" + + "github.com/sourcenetwork/defradb/client" +) + +type storeHandler struct{} + +func (s *storeHandler) SetReplicator(rw http.ResponseWriter, req *http.Request) { + store := req.Context().Value(storeContextKey).(client.Store) + + var rep client.Replicator + if err := requestJSON(req, &rep); err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + err := store.SetReplicator(req.Context(), rep) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + rw.WriteHeader(http.StatusOK) +} + +func (s *storeHandler) DeleteReplicator(rw http.ResponseWriter, req *http.Request) { + store := req.Context().Value(storeContextKey).(client.Store) + + var rep client.Replicator + if err := requestJSON(req, &rep); err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + err := store.DeleteReplicator(req.Context(), rep) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + rw.WriteHeader(http.StatusOK) +} + +func (s *storeHandler) GetAllReplicators(rw http.ResponseWriter, req *http.Request) { + store := req.Context().Value(storeContextKey).(client.Store) + + reps, err := store.GetAllReplicators(req.Context()) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + responseJSON(rw, http.StatusOK, reps) +} + +func (s *storeHandler) AddP2PCollection(rw http.ResponseWriter, req *http.Request) { + store := req.Context().Value(storeContextKey).(client.Store) + + err := store.AddP2PCollection(req.Context(), chi.URLParam(req, "id")) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + rw.WriteHeader(http.StatusOK) +} + +func (s *storeHandler) RemoveP2PCollection(rw http.ResponseWriter, req *http.Request) { + store := req.Context().Value(storeContextKey).(client.Store) + + err := store.RemoveP2PCollection(req.Context(), chi.URLParam(req, "id")) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + rw.WriteHeader(http.StatusOK) +} + +func (s *storeHandler) GetAllP2PCollections(rw http.ResponseWriter, req *http.Request) { + store := req.Context().Value(storeContextKey).(client.Store) + + cols, err := store.GetAllP2PCollections(req.Context()) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + responseJSON(rw, http.StatusOK, cols) +} + +func (s *storeHandler) BasicImport(rw http.ResponseWriter, req *http.Request) { + store := req.Context().Value(storeContextKey).(client.Store) + + var config client.BackupConfig + if err := requestJSON(req, &config); err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + err := store.BasicImport(req.Context(), config.Filepath) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + rw.WriteHeader(http.StatusOK) +} + +func (s *storeHandler) BasicExport(rw http.ResponseWriter, req *http.Request) { + store := req.Context().Value(storeContextKey).(client.Store) + + var config client.BackupConfig + if err := requestJSON(req, &config); err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + err := store.BasicExport(req.Context(), &config) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + rw.WriteHeader(http.StatusOK) +} + +func (s *storeHandler) AddSchema(rw http.ResponseWriter, req *http.Request) { + store := req.Context().Value(storeContextKey).(client.Store) + + schema, err := io.ReadAll(req.Body) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + cols, err := store.AddSchema(req.Context(), string(schema)) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + responseJSON(rw, http.StatusOK, cols) +} + +func (s *storeHandler) PatchSchema(rw http.ResponseWriter, req *http.Request) { + store := req.Context().Value(storeContextKey).(client.Store) + + patch, err := io.ReadAll(req.Body) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + err = store.PatchSchema(req.Context(), string(patch)) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + rw.WriteHeader(http.StatusOK) +} + +func (s *storeHandler) GetCollection(rw http.ResponseWriter, req *http.Request) { + store := req.Context().Value(storeContextKey).(client.Store) + + switch { + case req.URL.Query().Has("name"): + col, err := store.GetCollectionByName(req.Context(), req.URL.Query().Get("name")) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + responseJSON(rw, http.StatusOK, col.Description()) + case req.URL.Query().Has("schema_id"): + col, err := store.GetCollectionBySchemaID(req.Context(), req.URL.Query().Get("schema_id")) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + responseJSON(rw, http.StatusOK, col.Description()) + case req.URL.Query().Has("version_id"): + col, err := store.GetCollectionByVersionID(req.Context(), req.URL.Query().Get("version_id")) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + responseJSON(rw, http.StatusOK, col.Description()) + default: + cols, err := store.GetAllCollections(req.Context()) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + colDesc := make([]client.CollectionDescription, len(cols)) + for i, col := range cols { + colDesc[i] = col.Description() + } + responseJSON(rw, http.StatusOK, colDesc) + } +} + +func (s *storeHandler) GetAllIndexes(rw http.ResponseWriter, req *http.Request) { + store := req.Context().Value(storeContextKey).(client.Store) + + indexes, err := store.GetAllIndexes(req.Context()) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + responseJSON(rw, http.StatusOK, indexes) +} + +func (s *storeHandler) PrintDump(rw http.ResponseWriter, req *http.Request) { + db := req.Context().Value(dbContextKey).(client.DB) + + if err := db.PrintDump(req.Context()); err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + rw.WriteHeader(http.StatusOK) +} + +type GraphQLRequest struct { + Query string `json:"query"` +} + +type GraphQLResponse struct { + Data any `json:"data"` + Errors []error `json:"errors,omitempty"` +} + +func (res GraphQLResponse) MarshalJSON() ([]byte, error) { + var errors []string + for _, err := range res.Errors { + errors = append(errors, err.Error()) + } + return json.Marshal(map[string]any{"data": res.Data, "errors": errors}) +} + +func (res *GraphQLResponse) UnmarshalJSON(data []byte) error { + // decode numbers to json.Number + dec := json.NewDecoder(bytes.NewBuffer(data)) + dec.UseNumber() + + var out map[string]any + if err := dec.Decode(&out); err != nil { + return err + } + + // fix errors type to match tests + switch t := out["errors"].(type) { + case []any: + for _, v := range t { + res.Errors = append(res.Errors, parseError(v)) + } + default: + res.Errors = nil + } + + // fix data type to match tests + switch t := out["data"].(type) { + case []any: + var fixed []map[string]any + for _, v := range t { + fixed = append(fixed, v.(map[string]any)) + } + res.Data = fixed + case map[string]any: + res.Data = t + default: + res.Data = []map[string]any{} + } + + return nil +} + +func (s *storeHandler) ExecRequest(rw http.ResponseWriter, req *http.Request) { + store := req.Context().Value(storeContextKey).(client.Store) + + var request GraphQLRequest + switch { + case req.URL.Query().Get("query") != "": + request.Query = req.URL.Query().Get("query") + case req.Body != nil: + if err := requestJSON(req, &request); err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + default: + responseJSON(rw, http.StatusBadRequest, errorResponse{ErrMissingRequest}) + return + } + result := store.ExecRequest(req.Context(), request.Query) + + if result.Pub == nil { + responseJSON(rw, http.StatusOK, GraphQLResponse{result.GQL.Data, result.GQL.Errors}) + return + } + flusher, ok := rw.(http.Flusher) + if !ok { + responseJSON(rw, http.StatusBadRequest, errorResponse{ErrStreamingNotSupported}) + return + } + + rw.Header().Add("Content-Type", "text/event-stream") + rw.Header().Add("Cache-Control", "no-cache") + rw.Header().Add("Connection", "keep-alive") + + rw.WriteHeader(http.StatusOK) + flusher.Flush() + + for { + select { + case <-req.Context().Done(): + return + case item, open := <-result.Pub.Stream(): + if !open { + return + } + data, err := json.Marshal(item) + if err != nil { + return + } + fmt.Fprintf(rw, "data: %s\n\n", data) + flusher.Flush() + } + } +} diff --git a/http/handler_tx.go b/http/handler_tx.go new file mode 100644 index 0000000000..b7f1c82545 --- /dev/null +++ b/http/handler_tx.go @@ -0,0 +1,95 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package http + +import ( + "net/http" + "strconv" + "sync" + + "github.com/go-chi/chi/v5" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/datastore" +) + +type txHandler struct{} + +type CreateTxResponse struct { + ID uint64 `json:"id"` +} + +func (h *txHandler) NewTxn(rw http.ResponseWriter, req *http.Request) { + db := req.Context().Value(dbContextKey).(client.DB) + txs := req.Context().Value(txsContextKey).(*sync.Map) + readOnly, _ := strconv.ParseBool(req.URL.Query().Get("read_only")) + + tx, err := db.NewTxn(req.Context(), readOnly) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + txs.Store(tx.ID(), tx) + responseJSON(rw, http.StatusOK, &CreateTxResponse{tx.ID()}) +} + +func (h *txHandler) NewConcurrentTxn(rw http.ResponseWriter, req *http.Request) { + db := req.Context().Value(dbContextKey).(client.DB) + txs := req.Context().Value(txsContextKey).(*sync.Map) + readOnly, _ := strconv.ParseBool(req.URL.Query().Get("read_only")) + + tx, err := db.NewConcurrentTxn(req.Context(), readOnly) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + txs.Store(tx.ID(), tx) + responseJSON(rw, http.StatusOK, &CreateTxResponse{tx.ID()}) +} + +func (h *txHandler) Commit(rw http.ResponseWriter, req *http.Request) { + txs := req.Context().Value(txsContextKey).(*sync.Map) + + txId, err := strconv.ParseUint(chi.URLParam(req, "id"), 10, 64) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{ErrInvalidTransactionId}) + return + } + txVal, ok := txs.Load(txId) + if !ok { + responseJSON(rw, http.StatusBadRequest, errorResponse{ErrInvalidTransactionId}) + return + } + err = txVal.(datastore.Txn).Commit(req.Context()) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{err}) + return + } + txs.Delete(txId) + rw.WriteHeader(http.StatusOK) +} + +func (h *txHandler) Discard(rw http.ResponseWriter, req *http.Request) { + txs := req.Context().Value(txsContextKey).(*sync.Map) + + txId, err := strconv.ParseUint(chi.URLParam(req, "id"), 10, 64) + if err != nil { + responseJSON(rw, http.StatusBadRequest, errorResponse{ErrInvalidTransactionId}) + return + } + txVal, ok := txs.LoadAndDelete(txId) + if !ok { + responseJSON(rw, http.StatusBadRequest, errorResponse{ErrInvalidTransactionId}) + return + } + txVal.(datastore.Txn).Discard(req.Context()) + rw.WriteHeader(http.StatusOK) +} diff --git a/http/http_client.go b/http/http_client.go new file mode 100644 index 0000000000..48323607ab --- /dev/null +++ b/http/http_client.go @@ -0,0 +1,86 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package http + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" +) + +type httpClient struct { + client *http.Client + baseURL *url.URL + txValue string +} + +func newHttpClient(baseURL *url.URL) *httpClient { + client := httpClient{ + client: http.DefaultClient, + baseURL: baseURL, + } + return &client +} + +func (c *httpClient) withTxn(value uint64) *httpClient { + return &httpClient{ + client: c.client, + baseURL: c.baseURL, + txValue: fmt.Sprintf("%d", value), + } +} + +func (c *httpClient) setDefaultHeaders(req *http.Request) { + req.Header.Set("Accept", "application/json") + req.Header.Set("Content-Type", "application/json") + + if c.txValue != "" { + req.Header.Set(TX_HEADER_NAME, c.txValue) + } +} + +func (c *httpClient) request(req *http.Request) ([]byte, error) { + c.setDefaultHeaders(req) + + res, err := c.client.Do(req) + if err != nil { + return nil, err + } + // ignore close errors because they have + // no perceivable effect on the end user + // and cannot be reconciled easily + defer res.Body.Close() //nolint:errcheck + + data, err := io.ReadAll(res.Body) + if err != nil { + return nil, err + } + // request was successful + if res.StatusCode == http.StatusOK { + return data, nil + } + // attempt to parse json error + var errRes errorResponse + if err := json.Unmarshal(data, &errRes); err != nil { + return nil, fmt.Errorf("%s", data) + } + return nil, errRes.Error +} + +func (c *httpClient) requestJson(req *http.Request, out any) error { + data, err := c.request(req) + if err != nil { + return err + } + return json.Unmarshal(data, out) +} diff --git a/http/logger.go b/http/logger.go new file mode 100644 index 0000000000..d23f65e94a --- /dev/null +++ b/http/logger.go @@ -0,0 +1,52 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package http + +import ( + "net/http" + "time" + + "github.com/go-chi/chi/v5/middleware" + + "github.com/sourcenetwork/defradb/logging" +) + +var log = logging.MustNewLogger("http") + +type logEntry struct { + req *http.Request +} + +var _ middleware.LogEntry = (*logEntry)(nil) + +func (e *logEntry) Write(status, bytes int, header http.Header, elapsed time.Duration, extra any) { + log.Info( + e.req.Context(), + "Request", + logging.NewKV("Method", e.req.Method), + logging.NewKV("Path", e.req.URL.Path), + logging.NewKV("Status", status), + logging.NewKV("LengthBytes", bytes), + logging.NewKV("ElapsedTime", elapsed.String()), + ) +} + +func (e *logEntry) Panic(v any, stack []byte) { + middleware.PrintPrettyStack(v) +} + +type logFormatter struct{} + +var _ middleware.LogFormatter = (*logFormatter)(nil) + +func (f *logFormatter) NewLogEntry(req *http.Request) middleware.LogEntry { + return &logEntry{req} +} diff --git a/http/middleware.go b/http/middleware.go new file mode 100644 index 0000000000..28f1e0ff1e --- /dev/null +++ b/http/middleware.go @@ -0,0 +1,145 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package http + +import ( + "context" + "net/http" + "strconv" + "sync" + + "github.com/go-chi/chi/v5" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/datastore" +) + +const TX_HEADER_NAME = "x-defradb-tx" + +type contextKey string + +var ( + // txsContextKey is the context key for the transaction *sync.Map + txsContextKey = contextKey("txs") + // dbContextKey is the context key for the client.DB + dbContextKey = contextKey("db") + // txContextKey is the context key for the datastore.Txn + // + // This will only be set if a transaction id is specified. + txContextKey = contextKey("tx") + // storeContextKey is the context key for the client.Store + // + // If a transaction exists, all operations will be executed + // in the current transaction context. + storeContextKey = contextKey("store") + // lensContextKey is the context key for the client.LensRegistry + // + // If a transaction exists, all operations will be executed + // in the current transaction context. + lensContextKey = contextKey("lens") + // colContextKey is the context key for the client.Collection + // + // If a transaction exists, all operations will be executed + // in the current transaction context. + colContextKey = contextKey("col") +) + +// ApiMiddleware sets the required context values for all API requests. +func ApiMiddleware(db client.DB, txs *sync.Map) func(http.Handler) http.Handler { + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + ctx := req.Context() + ctx = context.WithValue(ctx, dbContextKey, db) + ctx = context.WithValue(ctx, txsContextKey, txs) + next.ServeHTTP(rw, req.WithContext(ctx)) + }) + } +} + +// TransactionMiddleware sets the transaction context for the current request. +func TransactionMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + txs := req.Context().Value(txsContextKey).(*sync.Map) + + txValue := req.Header.Get(TX_HEADER_NAME) + if txValue == "" { + next.ServeHTTP(rw, req) + return + } + id, err := strconv.ParseUint(txValue, 10, 64) + if err != nil { + next.ServeHTTP(rw, req) + return + } + tx, ok := txs.Load(id) + if !ok { + next.ServeHTTP(rw, req) + return + } + + ctx := context.WithValue(req.Context(), txContextKey, tx) + next.ServeHTTP(rw, req.WithContext(ctx)) + }) +} + +// StoreMiddleware sets the db context for the current request. +func StoreMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + db := req.Context().Value(dbContextKey).(client.DB) + + var store client.Store + if tx, ok := req.Context().Value(txContextKey).(datastore.Txn); ok { + store = db.WithTxn(tx) + } else { + store = db + } + + ctx := context.WithValue(req.Context(), storeContextKey, store) + next.ServeHTTP(rw, req.WithContext(ctx)) + }) +} + +// LensMiddleware sets the lens context for the current request. +func LensMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + store := req.Context().Value(storeContextKey).(client.Store) + + var lens client.LensRegistry + if tx, ok := req.Context().Value(txContextKey).(datastore.Txn); ok { + lens = store.LensRegistry().WithTxn(tx) + } else { + lens = store.LensRegistry() + } + + ctx := context.WithValue(req.Context(), lensContextKey, lens) + next.ServeHTTP(rw, req.WithContext(ctx)) + }) +} + +// CollectionMiddleware sets the collection context for the current request. +func CollectionMiddleware(next http.Handler) http.Handler { + return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { + store := req.Context().Value(storeContextKey).(client.Store) + + col, err := store.GetCollectionByName(req.Context(), chi.URLParam(req, "name")) + if err != nil { + rw.WriteHeader(http.StatusNotFound) + return + } + + if tx, ok := req.Context().Value(txContextKey).(datastore.Txn); ok { + col = col.WithTxn(tx) + } + + ctx := context.WithValue(req.Context(), colContextKey, col) + next.ServeHTTP(rw, req.WithContext(ctx)) + }) +} diff --git a/http/server.go b/http/server.go new file mode 100644 index 0000000000..afee4b9217 --- /dev/null +++ b/http/server.go @@ -0,0 +1,111 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package http + +import ( + "net/http" + "sync" + + "github.com/go-chi/chi/v5" + "github.com/go-chi/chi/v5/middleware" + + "github.com/sourcenetwork/defradb/client" +) + +type Server struct { + db client.DB + router *chi.Mux + txs *sync.Map +} + +func NewServer(db client.DB) *Server { + txs := &sync.Map{} + + tx_handler := &txHandler{} + store_handler := &storeHandler{} + collection_handler := &collectionHandler{} + lens_handler := &lensHandler{} + + router := chi.NewRouter() + router.Use(middleware.RequestLogger(&logFormatter{})) + router.Use(middleware.Recoverer) + + router.Route("/api/v0", func(api chi.Router) { + api.Use(ApiMiddleware(db, txs), TransactionMiddleware, StoreMiddleware) + api.Route("/tx", func(tx chi.Router) { + tx.Post("/", tx_handler.NewTxn) + tx.Post("/concurrent", tx_handler.NewConcurrentTxn) + tx.Post("/{id}", tx_handler.Commit) + tx.Delete("/{id}", tx_handler.Discard) + }) + api.Route("/backup", func(backup chi.Router) { + backup.Post("/export", store_handler.BasicExport) + backup.Post("/import", store_handler.BasicImport) + }) + api.Route("/schema", func(schema chi.Router) { + schema.Post("/", store_handler.AddSchema) + schema.Patch("/", store_handler.PatchSchema) + }) + api.Route("/collections", func(collections chi.Router) { + collections.Get("/", store_handler.GetCollection) + // with collection middleware + collections_tx := collections.With(CollectionMiddleware) + collections_tx.Get("/{name}", collection_handler.GetAllDocKeys) + collections_tx.Post("/{name}", collection_handler.Create) + collections_tx.Patch("/{name}", collection_handler.UpdateWith) + collections_tx.Delete("/{name}", collection_handler.DeleteWith) + collections_tx.Post("/{name}/indexes", collection_handler.CreateIndex) + collections_tx.Get("/{name}/indexes", collection_handler.GetIndexes) + collections_tx.Delete("/{name}/indexes/{index}", collection_handler.DropIndex) + collections_tx.Get("/{name}/{key}", collection_handler.Get) + collections_tx.Patch("/{name}/{key}", collection_handler.Update) + collections_tx.Delete("/{name}/{key}", collection_handler.Delete) + }) + api.Route("/lens", func(lens chi.Router) { + lens.Use(LensMiddleware) + lens.Get("/", lens_handler.Config) + lens.Post("/", lens_handler.SetMigration) + lens.Post("/reload", lens_handler.ReloadLenses) + lens.Get("/{version}", lens_handler.HasMigration) + lens.Post("/{version}/up", lens_handler.MigrateUp) + lens.Post("/{version}/down", lens_handler.MigrateDown) + }) + api.Route("/graphql", func(graphQL chi.Router) { + graphQL.Get("/", store_handler.ExecRequest) + graphQL.Post("/", store_handler.ExecRequest) + }) + api.Route("/p2p", func(p2p chi.Router) { + p2p.Route("/replicators", func(p2p_replicators chi.Router) { + p2p_replicators.Get("/", store_handler.GetAllReplicators) + p2p_replicators.Post("/", store_handler.SetReplicator) + p2p_replicators.Delete("/", store_handler.DeleteReplicator) + }) + p2p.Route("/collections", func(p2p_collections chi.Router) { + p2p_collections.Get("/", store_handler.GetAllP2PCollections) + p2p_collections.Post("/{id}", store_handler.AddP2PCollection) + p2p_collections.Delete("/{id}", store_handler.RemoveP2PCollection) + }) + }) + api.Route("/debug", func(debug chi.Router) { + debug.Get("/dump", store_handler.PrintDump) + }) + }) + + return &Server{ + db: db, + router: router, + txs: txs, + } +} + +func (s *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) { + s.router.ServeHTTP(w, req) +} diff --git a/http/utils.go b/http/utils.go new file mode 100644 index 0000000000..a171e0ed38 --- /dev/null +++ b/http/utils.go @@ -0,0 +1,65 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package http + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/datastore/badger/v4" +) + +func requestJSON(req *http.Request, out any) error { + data, err := io.ReadAll(req.Body) + if err != nil { + return err + } + return json.Unmarshal(data, out) +} + +func responseJSON(rw http.ResponseWriter, status int, out any) { + rw.Header().Add("Content-Type", "application/json") + rw.WriteHeader(status) + json.NewEncoder(rw).Encode(out) //nolint:errcheck +} + +func documentJSON(doc *client.Document) ([]byte, error) { + docMap, err := doc.ToMap() + if err != nil { + return nil, err + } + delete(docMap, "_key") + + for field, value := range doc.Values() { + if !value.IsDirty() { + delete(docMap, field.Name()) + } + if value.IsDelete() { + docMap[field.Name()] = nil + } + } + + return json.Marshal(docMap) +} + +func parseError(msg any) error { + switch msg { + case client.ErrDocumentNotFound.Error(): + return client.ErrDocumentNotFound + case badger.ErrTxnConflict.Error(): + return badger.ErrTxnConflict + default: + return fmt.Errorf("%s", msg) + } +} diff --git a/http/wrapper.go b/http/wrapper.go new file mode 100644 index 0000000000..558dc79474 --- /dev/null +++ b/http/wrapper.go @@ -0,0 +1,177 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package http + +import ( + "context" + "fmt" + "net/http/httptest" + + blockstore "github.com/ipfs/boxo/blockstore" + + "github.com/sourcenetwork/defradb/client" + "github.com/sourcenetwork/defradb/datastore" + "github.com/sourcenetwork/defradb/events" +) + +var _ client.DB = (*Wrapper)(nil) + +// Wrapper combines an HTTP client and server into a +// single struct that implements the client.DB interface. +type Wrapper struct { + db client.DB + server *Server + client *Client + httpServer *httptest.Server +} + +func NewWrapper(db client.DB) (*Wrapper, error) { + server := NewServer(db) + httpServer := httptest.NewServer(server) + + client, err := NewClient(httpServer.URL) + if err != nil { + return nil, err + } + + return &Wrapper{ + db, + server, + client, + httpServer, + }, nil +} + +func (w *Wrapper) SetReplicator(ctx context.Context, rep client.Replicator) error { + return w.client.SetReplicator(ctx, rep) +} + +func (w *Wrapper) DeleteReplicator(ctx context.Context, rep client.Replicator) error { + return w.client.DeleteReplicator(ctx, rep) +} + +func (w *Wrapper) GetAllReplicators(ctx context.Context) ([]client.Replicator, error) { + return w.client.GetAllReplicators(ctx) +} + +func (w *Wrapper) AddP2PCollection(ctx context.Context, collectionID string) error { + return w.client.AddP2PCollection(ctx, collectionID) +} + +func (w *Wrapper) RemoveP2PCollection(ctx context.Context, collectionID string) error { + return w.client.RemoveP2PCollection(ctx, collectionID) +} + +func (w *Wrapper) GetAllP2PCollections(ctx context.Context) ([]string, error) { + return w.client.GetAllP2PCollections(ctx) +} + +func (w *Wrapper) BasicImport(ctx context.Context, filepath string) error { + return w.client.BasicImport(ctx, filepath) +} + +func (w *Wrapper) BasicExport(ctx context.Context, config *client.BackupConfig) error { + return w.client.BasicExport(ctx, config) +} + +func (w *Wrapper) AddSchema(ctx context.Context, schema string) ([]client.CollectionDescription, error) { + return w.client.AddSchema(ctx, schema) +} + +func (w *Wrapper) PatchSchema(ctx context.Context, patch string) error { + return w.client.PatchSchema(ctx, patch) +} + +func (w *Wrapper) SetMigration(ctx context.Context, config client.LensConfig) error { + return w.client.SetMigration(ctx, config) +} + +func (w *Wrapper) LensRegistry() client.LensRegistry { + return w.client.LensRegistry() +} + +func (w *Wrapper) GetCollectionByName(ctx context.Context, name client.CollectionName) (client.Collection, error) { + return w.client.GetCollectionByName(ctx, name) +} + +func (w *Wrapper) GetCollectionBySchemaID(ctx context.Context, schemaId string) (client.Collection, error) { + return w.client.GetCollectionBySchemaID(ctx, schemaId) +} + +func (w *Wrapper) GetCollectionByVersionID(ctx context.Context, versionId string) (client.Collection, error) { + return w.client.GetCollectionByVersionID(ctx, versionId) +} + +func (w *Wrapper) GetAllCollections(ctx context.Context) ([]client.Collection, error) { + return w.client.GetAllCollections(ctx) +} + +func (w *Wrapper) GetAllIndexes(ctx context.Context) (map[client.CollectionName][]client.IndexDescription, error) { + return w.client.GetAllIndexes(ctx) +} + +func (w *Wrapper) ExecRequest(ctx context.Context, query string) *client.RequestResult { + return w.client.ExecRequest(ctx, query) +} + +func (w *Wrapper) NewTxn(ctx context.Context, readOnly bool) (datastore.Txn, error) { + client, err := w.client.NewTxn(ctx, readOnly) + if err != nil { + return nil, err + } + server, ok := w.server.txs.Load(client.ID()) + if !ok { + return nil, fmt.Errorf("failed to get server transaction") + } + return &TxWrapper{server.(datastore.Txn), client}, nil +} + +func (w *Wrapper) NewConcurrentTxn(ctx context.Context, readOnly bool) (datastore.Txn, error) { + client, err := w.client.NewConcurrentTxn(ctx, readOnly) + if err != nil { + return nil, err + } + server, ok := w.server.txs.Load(client.ID()) + if !ok { + return nil, fmt.Errorf("failed to get server transaction") + } + return &TxWrapper{server.(datastore.Txn), client}, nil +} + +func (w *Wrapper) WithTxn(tx datastore.Txn) client.Store { + return w.client.WithTxn(tx) +} + +func (w *Wrapper) Root() datastore.RootStore { + return w.db.Root() +} + +func (w *Wrapper) Blockstore() blockstore.Blockstore { + return w.db.Blockstore() +} + +func (w *Wrapper) Close(ctx context.Context) { + w.httpServer.CloseClientConnections() + w.httpServer.Close() + w.db.Close(ctx) +} + +func (w *Wrapper) Events() events.Events { + return w.db.Events() +} + +func (w *Wrapper) MaxTxnRetries() int { + return w.db.MaxTxnRetries() +} + +func (w *Wrapper) PrintDump(ctx context.Context) error { + return w.db.PrintDump(ctx) +} diff --git a/http/wrapper_tx.go b/http/wrapper_tx.go new file mode 100644 index 0000000000..7c77b938f5 --- /dev/null +++ b/http/wrapper_tx.go @@ -0,0 +1,70 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package http + +import ( + "context" + + "github.com/sourcenetwork/defradb/datastore" +) + +var _ datastore.Txn = (*TxWrapper)(nil) + +// TxWrapper combines a client and server transaction into +// a single struct that implements the datastore.Txn interface. +type TxWrapper struct { + server datastore.Txn + client datastore.Txn +} + +func (w *TxWrapper) ID() uint64 { + return w.client.ID() +} + +func (w *TxWrapper) Commit(ctx context.Context) error { + return w.client.Commit(ctx) +} + +func (w *TxWrapper) Discard(ctx context.Context) { + w.client.Discard(ctx) +} + +func (w *TxWrapper) OnSuccess(fn func()) { + w.server.OnSuccess(fn) +} + +func (w *TxWrapper) OnError(fn func()) { + w.server.OnError(fn) +} + +func (w *TxWrapper) OnDiscard(fn func()) { + w.server.OnDiscard(fn) +} + +func (w *TxWrapper) Rootstore() datastore.DSReaderWriter { + return w.server.Rootstore() +} + +func (w *TxWrapper) Datastore() datastore.DSReaderWriter { + return w.server.Datastore() +} + +func (w *TxWrapper) Headstore() datastore.DSReaderWriter { + return w.server.Headstore() +} + +func (w *TxWrapper) DAGstore() datastore.DAGStore { + return w.server.DAGstore() +} + +func (w *TxWrapper) Systemstore() datastore.DSReaderWriter { + return w.server.Systemstore() +} diff --git a/tests/integration/explain.go b/tests/integration/explain.go index e4221ea76b..44c457c0f8 100644 --- a/tests/integration/explain.go +++ b/tests/integration/explain.go @@ -11,7 +11,6 @@ package tests import ( - "context" "reflect" "sort" "testing" @@ -127,50 +126,43 @@ func executeExplainRequest( for _, node := range getNodes(action.NodeID, s.nodes) { result := node.DB.ExecRequest(s.ctx, action.Request) - assertExplainRequestResults( - s.ctx, - s.t, - s.testCase.Description, - &result.GQL, - action, - ) + assertExplainRequestResults(s, &result.GQL, action) } } func assertExplainRequestResults( - ctx context.Context, - t *testing.T, - description string, + s *state, actualResult *client.GQLResult, action ExplainRequest, ) { // Check expected error matches actual error. If it does we are done. if AssertErrors( - t, - description, + s.t, + s.testCase.Description, actualResult.Errors, action.ExpectedError, ) { return } else if action.ExpectedError != "" { // If didn't find a match but did expected an error, then fail. - assert.Fail(t, "Expected an error however none was raised.", description) + assert.Fail(s.t, "Expected an error however none was raised.", s.testCase.Description) } // Note: if returned gql result is `nil` this panics (the panic seems useful while testing). resultantData := actualResult.Data.([]map[string]any) - log.Info(ctx, "", logging.NewKV("FullExplainGraphResult", actualResult.Data)) + log.Info(s.ctx, "", logging.NewKV("FullExplainGraphResult", actualResult.Data)) // Check if the expected full explain graph (if provided) matches the actual full explain graph // that is returned, if doesn't match we would like to still see a diff comparison (handy while debugging). if lengthOfExpectedFullGraph := len(action.ExpectedFullGraph); action.ExpectedFullGraph != nil { - require.Equal(t, lengthOfExpectedFullGraph, len(resultantData), description) + require.Equal(s.t, lengthOfExpectedFullGraph, len(resultantData), s.testCase.Description) for index, actualResult := range resultantData { if lengthOfExpectedFullGraph > index { - assert.Equal( - t, + assertResultsEqual( + s.t, + s.clientType, action.ExpectedFullGraph[index], actualResult, - description, + s.testCase.Description, ) } } @@ -179,15 +171,17 @@ func assertExplainRequestResults( // Ensure the complete high-level pattern matches, inother words check that all the // explain graph nodes are in the correct expected ordering. if action.ExpectedPatterns != nil { - require.Equal(t, len(action.ExpectedPatterns), len(resultantData), description) + require.Equal(s.t, len(action.ExpectedPatterns), len(resultantData), s.testCase.Description) + for index, actualResult := range resultantData { // Trim away all attributes (non-plan nodes) from the returned full explain graph result. - actualResultWithoutAttributes := trimExplainAttributes(t, description, actualResult) - assert.Equal( - t, + actualResultWithoutAttributes := trimExplainAttributes(s.t, s.testCase.Description, actualResult) + assertResultsEqual( + s.t, + s.clientType, action.ExpectedPatterns[index], actualResultWithoutAttributes, - description, + s.testCase.Description, ) } } @@ -196,14 +190,13 @@ func assertExplainRequestResults( // Note: This does not check if the node is in correct location or not. if action.ExpectedTargets != nil { for _, target := range action.ExpectedTargets { - assertExplainTargetCase(t, description, target, resultantData) + assertExplainTargetCase(s, target, resultantData) } } } func assertExplainTargetCase( - t *testing.T, - description string, + s *state, targetCase PlanNodeTargetCase, actualResults []map[string]any, ) { @@ -217,17 +210,18 @@ func assertExplainTargetCase( if !isFound { assert.Fail( - t, + s.t, "Expected target ["+targetCase.TargetNodeName+"], was not found in the explain graph.", - description, + s.testCase.Description, ) } - assert.Equal( - t, + assertResultsEqual( + s.t, + s.clientType, targetCase.ExpectedAttributes, foundActualTarget, - description, + s.testCase.Description, ) } } @@ -312,24 +306,41 @@ func findTargetNode( } } + case []any: + return findTargetNodeFromArray(targetName, toSkip, includeChildNodes, r) + case []map[string]any: - for _, item := range r { - target, matches, found := findTargetNode( - targetName, - toSkip, - includeChildNodes, - item, - ) + return findTargetNodeFromArray(targetName, toSkip, includeChildNodes, r) + } - totalMatchedSoFar = totalMatchedSoFar + matches - toSkip -= matches + return nil, totalMatchedSoFar, false +} - if found { - if includeChildNodes { - return target, totalMatchedSoFar, true - } - return trimSubNodes(target), totalMatchedSoFar, true +// findTargetNodeFromArray is a helper that runs findTargetNode for each item in an array. +func findTargetNodeFromArray[T any]( + targetName string, + toSkip uint, + includeChildNodes bool, + actualResult []T, +) (any, uint, bool) { + var totalMatchedSoFar uint = 0 + + for _, item := range actualResult { + target, matches, found := findTargetNode( + targetName, + toSkip, + includeChildNodes, + item, + ) + + totalMatchedSoFar = totalMatchedSoFar + matches + toSkip -= matches + + if found { + if includeChildNodes { + return target, totalMatchedSoFar, true } + return trimSubNodes(target), totalMatchedSoFar, true } } @@ -358,9 +369,9 @@ func trimSubNodes(graph any) any { func trimExplainAttributes( t *testing.T, description string, - actualResult map[string]any, + actualResult any, ) map[string]any { - trimmedMap := copyMap(actualResult) + trimmedMap := copyMap(actualResult.(map[string]any)) for key, value := range trimmedMap { if !isPlanNode(key) { @@ -373,14 +384,10 @@ func trimExplainAttributes( trimmedMap[key] = trimExplainAttributes(t, description, v) case []map[string]any: - trimmedArrayElements := []map[string]any{} - for _, valueItem := range v { - trimmedArrayElements = append( - trimmedArrayElements, - trimExplainAttributes(t, description, valueItem), - ) - } - trimmedMap[key] = trimmedArrayElements + trimmedMap[key] = trimExplainAttributesArray(t, description, v) + + case []any: + trimmedMap[key] = trimExplainAttributesArray(t, description, v) default: assert.Fail( @@ -394,6 +401,22 @@ func trimExplainAttributes( return trimmedMap } +// trimExplainAttributesArray is a helper that runs trimExplainAttributes for each item in an array. +func trimExplainAttributesArray[T any]( + t *testing.T, + description string, + actualResult []T, +) []map[string]any { + trimmedArrayElements := []map[string]any{} + for _, valueItem := range actualResult { + trimmedArrayElements = append( + trimmedArrayElements, + trimExplainAttributes(t, description, valueItem), + ) + } + return trimmedArrayElements +} + // isPlanNode returns true if someName matches a plan node name, retruns false otherwise. func isPlanNode(someName string) bool { _, isPlanNode := allPlanNodeNames[someName] diff --git a/tests/integration/lens.go b/tests/integration/lens.go index 2959867c1a..317864ab3e 100644 --- a/tests/integration/lens.go +++ b/tests/integration/lens.go @@ -75,9 +75,35 @@ func getMigrations( configs, err := db.LensRegistry().Config(s.ctx) require.NoError(s.t, err) + require.Equal(s.t, len(configs), len(action.ExpectedResults)) // The order of the results is not deterministic, so do not assert on the element - // locations. - assert.ElementsMatch(s.t, configs, action.ExpectedResults) + for _, expected := range action.ExpectedResults { + var actual client.LensConfig + var actualFound bool + + for _, config := range configs { + if config.SourceSchemaVersionID != expected.SourceSchemaVersionID { + continue + } + if config.DestinationSchemaVersionID != expected.DestinationSchemaVersionID { + continue + } + actual = config + actualFound = true + } + + require.True(s.t, actualFound, "matching lens config not found") + require.Equal(s.t, len(expected.Lenses), len(actual.Lenses)) + + for j, actualLens := range actual.Lenses { + expectedLens := expected.Lenses[j] + + assert.Equal(s.t, expectedLens.Inverse, actualLens.Inverse) + assert.Equal(s.t, expectedLens.Path, actualLens.Path) + + assertResultsEqual(s.t, s.clientType, expectedLens.Arguments, actualLens.Arguments) + } + } } } diff --git a/tests/integration/p2p.go b/tests/integration/p2p.go index 24d20d8c31..311a088c86 100644 --- a/tests/integration/p2p.go +++ b/tests/integration/p2p.go @@ -121,11 +121,6 @@ type GetAllP2PCollections struct { // node 1 to see if it has been replicated. type WaitForSync struct{} -// AnyOf may be used as `Results` field where the value may -// be one of several values, yet the value of that field must be the same -// across all nodes due to strong eventual consistency. -type AnyOf []any - // connectPeers connects two existing, started, nodes as peers. It returns a channel // that will receive an empty struct upon sync completion of all expected peer-sync events. // diff --git a/tests/integration/query/one_to_many/simple_test.go b/tests/integration/query/one_to_many/simple_test.go index d7e886907e..9e4ad72fd5 100644 --- a/tests/integration/query/one_to_many/simple_test.go +++ b/tests/integration/query/one_to_many/simple_test.go @@ -167,7 +167,7 @@ func TestQueryOneToManyWithNonExistantParent(t *testing.T) { { "name": "Painted House", "rating": 4.9, - "Author": nil, + "author": nil, }, }, } diff --git a/tests/integration/results.go b/tests/integration/results.go new file mode 100644 index 0000000000..052de310c5 --- /dev/null +++ b/tests/integration/results.go @@ -0,0 +1,180 @@ +// Copyright 2023 Democratized Data Foundation +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package tests + +import ( + "encoding/json" + "testing" + + "github.com/sourcenetwork/immutable" + "github.com/stretchr/testify/assert" +) + +// AnyOf may be used as `Results` field where the value may +// be one of several values, yet the value of that field must be the same +// across all nodes due to strong eventual consistency. +type AnyOf []any + +// assertResultsAnyOf asserts that actual result is equal to at least one of the expected results. +// +// The comparison is relaxed when using client types other than goClientType. +func assertResultsAnyOf(t *testing.T, client ClientType, expected AnyOf, actual any, msgAndArgs ...any) { + switch client { + case httpClientType: + if !areResultsAnyOf(expected, actual) { + assert.Contains(t, expected, actual, msgAndArgs...) + } + default: + assert.Contains(t, expected, actual, msgAndArgs...) + } +} + +// assertResultsEqual asserts that actual result is equal to the expected result. +// +// The comparison is relaxed when using client types other than goClientType. +func assertResultsEqual(t *testing.T, client ClientType, expected any, actual any, msgAndArgs ...any) { + switch client { + case httpClientType: + if !areResultsEqual(expected, actual) { + assert.EqualValues(t, expected, actual, msgAndArgs...) + } + default: + assert.EqualValues(t, expected, actual, msgAndArgs...) + } +} + +// areResultsAnyOf returns true if any of the expected results are of equal value. +// +// Values of type json.Number and immutable.Option will be reduced to their underlying types. +func areResultsAnyOf(expected AnyOf, actual any) bool { + for _, v := range expected { + if areResultsEqual(v, actual) { + return true + } + } + return false +} + +// areResultsEqual returns true if the expected and actual results are of equal value. +// +// Values of type json.Number and immutable.Option will be reduced to their underlying types. +func areResultsEqual(expected any, actual any) bool { + switch expectedVal := expected.(type) { + case map[string]any: + if len(expectedVal) == 0 && actual == nil { + return true + } + actualVal, ok := actual.(map[string]any) + if !ok { + return assert.ObjectsAreEqualValues(expected, actual) + } + if len(expectedVal) != len(actualVal) { + return false + } + for k, v := range expectedVal { + if !areResultsEqual(v, actualVal[k]) { + return false + } + } + return true + case uint64, uint32, uint16, uint8, uint, int64, int32, int16, int8, int: + jsonNum, ok := actual.(json.Number) + if !ok { + return assert.ObjectsAreEqualValues(expected, actual) + } + actualVal, err := jsonNum.Int64() + if err != nil { + return false + } + return assert.ObjectsAreEqualValues(expected, actualVal) + case float32, float64: + jsonNum, ok := actual.(json.Number) + if !ok { + return assert.ObjectsAreEqualValues(expected, actual) + } + actualVal, err := jsonNum.Float64() + if err != nil { + return false + } + return assert.ObjectsAreEqualValues(expected, actualVal) + case immutable.Option[float64]: + return areResultOptionsEqual(expectedVal, actual) + case immutable.Option[uint64]: + return areResultOptionsEqual(expectedVal, actual) + case immutable.Option[int64]: + return areResultOptionsEqual(expectedVal, actual) + case immutable.Option[bool]: + return areResultOptionsEqual(expectedVal, actual) + case immutable.Option[string]: + return areResultOptionsEqual(expectedVal, actual) + case []int64: + return areResultArraysEqual(expectedVal, actual) + case []uint64: + return areResultArraysEqual(expectedVal, actual) + case []float64: + return areResultArraysEqual(expectedVal, actual) + case []string: + return areResultArraysEqual(expectedVal, actual) + case []bool: + return areResultArraysEqual(expectedVal, actual) + case []any: + return areResultArraysEqual(expectedVal, actual) + case []map[string]any: + return areResultArraysEqual(expectedVal, actual) + case []immutable.Option[float64]: + return areResultArraysEqual(expectedVal, actual) + case []immutable.Option[uint64]: + return areResultArraysEqual(expectedVal, actual) + case []immutable.Option[int64]: + return areResultArraysEqual(expectedVal, actual) + case []immutable.Option[bool]: + return areResultArraysEqual(expectedVal, actual) + case []immutable.Option[string]: + return areResultArraysEqual(expectedVal, actual) + default: + return assert.ObjectsAreEqualValues(expected, actual) + } +} + +// areResultOptionsEqual returns true if the value of the expected immutable.Option +// and actual result are of equal value. +// +// Values of type json.Number and immutable.Option will be reduced to their underlying types. +func areResultOptionsEqual[S any](expected immutable.Option[S], actual any) bool { + var expectedVal any + if expected.HasValue() { + expectedVal = expected.Value() + } + return areResultsEqual(expectedVal, actual) +} + +// areResultArraysEqual returns true if the array of expected results and actual results +// are of equal value. +// +// Values of type json.Number and immutable.Option will be reduced to their underlying types. +func areResultArraysEqual[S any](expected []S, actual any) bool { + if len(expected) == 0 && actual == nil { + return true + } + actualVal, ok := actual.([]any) + if !ok { + return assert.ObjectsAreEqualValues(expected, actual) + } + if len(expected) != len(actualVal) { + return false + } + for i, v := range expected { + if !areResultsEqual(v, actualVal[i]) { + return false + } + } + return true +} diff --git a/tests/integration/state.go b/tests/integration/state.go index f7d4dd45a0..69bd65e2b5 100644 --- a/tests/integration/state.go +++ b/tests/integration/state.go @@ -33,6 +33,9 @@ type state struct { // The type of database currently being tested. dbt DatabaseType + // The type of client currently being tested. + clientType ClientType + // Any explicit transactions active in this test. // // This is order dependent and the property is accessed by index. @@ -83,6 +86,7 @@ func newState( t *testing.T, testCase TestCase, dbt DatabaseType, + clientType ClientType, collectionNames []string, ) *state { return &state{ @@ -90,6 +94,7 @@ func newState( t: t, testCase: testCase, dbt: dbt, + clientType: clientType, txns: []datastore.Txn{}, allActionsDone: make(chan struct{}), subscriptionResultsChans: []chan func(){}, diff --git a/tests/integration/utils2.go b/tests/integration/utils2.go index 5c42f7da12..ab2bea2898 100644 --- a/tests/integration/utils2.go +++ b/tests/integration/utils2.go @@ -17,6 +17,7 @@ import ( "os" "path" "reflect" + "strconv" "strings" "testing" "time" @@ -32,11 +33,14 @@ import ( "github.com/sourcenetwork/defradb/datastore/memory" "github.com/sourcenetwork/defradb/db" "github.com/sourcenetwork/defradb/errors" + "github.com/sourcenetwork/defradb/http" "github.com/sourcenetwork/defradb/logging" "github.com/sourcenetwork/defradb/net" ) const ( + clientGoEnvName = "DEFRA_CLIENT_GO" + clientHttpEnvName = "DEFRA_CLIENT_HTTP" memoryBadgerEnvName = "DEFRA_BADGER_MEMORY" fileBadgerEnvName = "DEFRA_BADGER_FILE" fileBadgerPathEnvName = "DEFRA_BADGER_FILE_PATH" @@ -58,6 +62,17 @@ const ( badgerFileType DatabaseType = "badger-file-system" ) +type ClientType string + +const ( + // goClientType enables running the test suite using + // the go implementation of the client.DB interface. + goClientType ClientType = "go" + // httpClientType enables running the test suite using + // the http implementation of the client.DB interface. + httpClientType ClientType = "http" +) + // The MutationType that tests will run using. // // For example if set to [CollectionSaveMutationType], all supporting @@ -90,6 +105,8 @@ var ( badgerInMemory bool badgerFile bool inMemoryStore bool + httpClient bool + goClient bool mutationType MutationType ) @@ -131,33 +148,30 @@ var previousTestCaseTestName string func init() { // We use environment variables instead of flags `go test ./...` throws for all packages // that don't have the flag defined - badgerFileValue, _ := os.LookupEnv(fileBadgerEnvName) - badgerInMemoryValue, _ := os.LookupEnv(memoryBadgerEnvName) - databaseDir, _ = os.LookupEnv(fileBadgerPathEnvName) - rootDatabaseDir, _ = os.LookupEnv(rootDBFilePathEnvName) - detectDbChangesValue, _ := os.LookupEnv(detectDbChangesEnvName) - inMemoryStoreValue, _ := os.LookupEnv(inMemoryEnvName) - repositoryValue, repositorySpecified := os.LookupEnv(repositoryEnvName) - setupOnlyValue, _ := os.LookupEnv(setupOnlyEnvName) - targetBranchValue, targetBranchSpecified := os.LookupEnv(targetBranchEnvName) - mutType, mutationTypeSpecified := os.LookupEnv(mutationTypeEnvName) - - badgerFile = getBool(badgerFileValue) - badgerInMemory = getBool(badgerInMemoryValue) - inMemoryStore = getBool(inMemoryStoreValue) - DetectDbChanges = getBool(detectDbChangesValue) - SetupOnly = getBool(setupOnlyValue) - - if !repositorySpecified { + httpClient, _ = strconv.ParseBool(os.Getenv(clientHttpEnvName)) + goClient, _ = strconv.ParseBool(os.Getenv(clientGoEnvName)) + badgerFile, _ = strconv.ParseBool(os.Getenv(fileBadgerEnvName)) + badgerInMemory, _ = strconv.ParseBool(os.Getenv(memoryBadgerEnvName)) + inMemoryStore, _ = strconv.ParseBool(os.Getenv(inMemoryEnvName)) + DetectDbChanges, _ = strconv.ParseBool(os.Getenv(detectDbChangesEnvName)) + SetupOnly, _ = strconv.ParseBool(os.Getenv(setupOnlyEnvName)) + + var repositoryValue string + if value, ok := os.LookupEnv(repositoryEnvName); ok { + repositoryValue = value + } else { repositoryValue = "https://github.com/sourcenetwork/defradb.git" } - if !targetBranchSpecified { + var targetBranchValue string + if value, ok := os.LookupEnv(targetBranchEnvName); ok { + targetBranchValue = value + } else { targetBranchValue = "develop" } - if mutationTypeSpecified { - mutationType = MutationType(mutType) + if value, ok := os.LookupEnv(mutationTypeEnvName); ok { + mutationType = MutationType(value) } else { // Default to testing mutations via Collection.Save - it should be simpler and // faster. We assume this is desirable when not explicitly testing any particular @@ -165,11 +179,15 @@ func init() { mutationType = CollectionSaveMutationType } - // default is to run against all + // Default is to test go client type. + if !goClient && !httpClient { + goClient = true + } + + // Default is to test all but filesystem db types. if !badgerInMemory && !badgerFile && !inMemoryStore && !DetectDbChanges { - badgerInMemory = true - // Testing against the file system is off by default badgerFile = false + badgerInMemory = true inMemoryStore = true } @@ -178,15 +196,6 @@ func init() { } } -func getBool(val string) bool { - switch strings.ToLower(val) { - case "true": - return true - default: - return false - } -} - // AssertPanicAndSkipChangeDetection asserts that the code of function actually panics, // // also ensures the change detection is skipped so no false fails happen. @@ -257,49 +266,44 @@ func newBadgerFileDB(ctx context.Context, t testing.TB, path string) (client.DB, return db, nil } -func GetDatabaseTypes() []DatabaseType { - databases := []DatabaseType{} +// GetDatabase returns the database implementation for the current +// testing state. The database type and client type on the test state +// are used to select the datastore and client implementation to use. +func GetDatabase(s *state) (cdb client.DB, path string, err error) { + switch s.dbt { + case badgerIMType: + cdb, err = NewBadgerMemoryDB(s.ctx, db.WithUpdateEvents()) - if badgerInMemory { - databases = append(databases, badgerIMType) - } + case badgerFileType: + cdb, path, err = NewBadgerFileDB(s.ctx, s.t) - if badgerFile { - databases = append(databases, badgerFileType) + case defraIMType: + cdb, err = NewInMemoryDB(s.ctx) + + default: + err = fmt.Errorf("invalid database type: %v", s.dbt) } - if inMemoryStore { - databases = append(databases, defraIMType) + if err != nil { + return nil, "", err } - return databases -} + switch s.clientType { + case httpClientType: + cdb, err = http.NewWrapper(cdb) -func GetDatabase(ctx context.Context, t *testing.T, dbt DatabaseType) (client.DB, string, error) { - switch dbt { - case badgerIMType: - db, err := NewBadgerMemoryDB(ctx, db.WithUpdateEvents()) - if err != nil { - return nil, "", err - } - return db, "", nil + case goClientType: + return - case badgerFileType: - db, path, err := NewBadgerFileDB(ctx, t) - if err != nil { - return nil, "", err - } - return db, path, nil + default: + err = fmt.Errorf("invalid client type: %v", s.dbt) + } - case defraIMType: - db, err := NewInMemoryDB(ctx) - if err != nil { - return nil, "", err - } - return db, "", nil + if err != nil { + return nil, "", err } - return nil, "", nil + return } // ExecuteTestCase executes the given TestCase against the configured database @@ -319,14 +323,35 @@ func ExecuteTestCase( skipIfMutationTypeUnsupported(t, testCase.SupportedMutationTypes) - ctx := context.Background() - dbts := GetDatabaseTypes() - // Assert that this is not empty to protect against accidental mis-configurations, + var clients []ClientType + if httpClient { + clients = append(clients, httpClientType) + } + if goClient { + clients = append(clients, goClientType) + } + + var databases []DatabaseType + if badgerInMemory { + databases = append(databases, badgerIMType) + } + if badgerFile { + databases = append(databases, badgerFileType) + } + if inMemoryStore { + databases = append(databases, defraIMType) + } + + // Assert that these are not empty to protect against accidental mis-configurations, // otherwise an empty set would silently pass all the tests. - require.NotEmpty(t, dbts) + require.NotEmpty(t, databases) + require.NotEmpty(t, clients) - for _, dbt := range dbts { - executeTestCase(ctx, t, collectionNames, testCase, dbt) + ctx := context.Background() + for _, ct := range clients { + for _, dbt := range databases { + executeTestCase(ctx, t, collectionNames, testCase, dbt, ct) + } } } @@ -336,13 +361,14 @@ func executeTestCase( collectionNames []string, testCase TestCase, dbt DatabaseType, + clientType ClientType, ) { log.Info(ctx, testCase.Description, logging.NewKV("Database", dbt)) flattenActions(&testCase) startActionIndex, endActionIndex := getActionRange(testCase) - s := newState(ctx, t, testCase, dbt, collectionNames) + s := newState(ctx, t, testCase, dbt, clientType, collectionNames) setStartingNodes(s) // It is very important that the databases are always closed, otherwise resources will leak @@ -663,7 +689,7 @@ func setStartingNodes( // If nodes have not been explicitly configured via actions, setup a default one. if !hasExplicitNode { - db, path, err := GetDatabase(s.ctx, s.t, s.dbt) + db, path, err := GetDatabase(s) require.Nil(s.t, err) s.nodes = append(s.nodes, &net.Node{ @@ -686,7 +712,7 @@ func restartNodes( for i := len(s.nodes) - 1; i >= 0; i-- { originalPath := databaseDir databaseDir = s.dbPaths[i] - db, _, err := GetDatabase(s.ctx, s.t, s.dbt) + db, _, err := GetDatabase(s) require.Nil(s.t, err) databaseDir = originalPath @@ -804,7 +830,7 @@ func configureNode( // an in memory store. cfg.Datastore.Badger.Path = s.t.TempDir() - db, path, err := GetDatabase(s.ctx, s.t, s.dbt) //disable change dector, or allow it? + db, path, err := GetDatabase(s) //disable change dector, or allow it? require.NoError(s.t, err) var n *net.Node @@ -1487,9 +1513,7 @@ func executeRequest( anyOfByFieldKey := map[docFieldKey][]any{} expectedErrorRaised = assertRequestResults( - s.ctx, - s.t, - s.testCase.Description, + s, &result.GQL, action.Results, action.ExpectedError, @@ -1554,9 +1578,7 @@ func executeSubscriptionRequest( // This assert should be executed from the main test routine // so that failures will be properly handled. expectedErrorRaised := assertRequestResults( - s.ctx, - s.t, - s.testCase.Description, + s, finalResult, action.Results, action.ExpectedError, @@ -1628,16 +1650,14 @@ type docFieldKey struct { } func assertRequestResults( - ctx context.Context, - t *testing.T, - description string, + s *state, result *client.GQLResult, expectedResults []map[string]any, expectedError string, nodeID int, anyOfByField map[docFieldKey][]any, ) bool { - if AssertErrors(t, description, result.Errors, expectedError) { + if AssertErrors(s.t, s.testCase.Description, result.Errors, expectedError) { return true } @@ -1648,15 +1668,9 @@ func assertRequestResults( // Note: if result.Data == nil this panics (the panic seems useful while testing). resultantData := result.Data.([]map[string]any) - log.Info(ctx, "", logging.NewKV("RequestResults", result.Data)) + log.Info(s.ctx, "", logging.NewKV("RequestResults", result.Data)) - // compare results - assert.Equal(t, len(expectedResults), len(resultantData), description) - if len(expectedResults) == 0 { - // Need `require` here otherwise will panic in the for loop that ranges over - // resultantData and tries to access expectedResults[0]. - require.Equal(t, expectedResults, resultantData) - } + require.Equal(s.t, len(expectedResults), len(resultantData), s.testCase.Description) for docIndex, result := range resultantData { expectedResult := expectedResults[docIndex] @@ -1665,14 +1679,20 @@ func assertRequestResults( switch r := expectedValue.(type) { case AnyOf: - assert.Contains(t, r, actualValue) + assertResultsAnyOf(s.t, s.clientType, r, actualValue) dfk := docFieldKey{docIndex, field} valueSet := anyOfByField[dfk] valueSet = append(valueSet, actualValue) anyOfByField[dfk] = valueSet default: - assert.Equal(t, expectedValue, actualValue, fmt.Sprintf("node: %v, doc: %v", nodeID, docIndex)) + assertResultsEqual( + s.t, + s.clientType, + expectedValue, + actualValue, + fmt.Sprintf("node: %v, doc: %v", nodeID, docIndex), + ) } } }