diff --git a/.github/workflows/dataflow_engine_chaos.yaml b/.github/workflows/dataflow_engine_chaos.yaml index cc35cdd6bcd..6dbebe6da40 100644 --- a/.github/workflows/dataflow_engine_chaos.yaml +++ b/.github/workflows/dataflow_engine_chaos.yaml @@ -138,7 +138,38 @@ jobs: echo describe pvc kubectl describe pvc -l app=tidb kubectl wait --for=condition=Ready pod/tidb-0 --timeout=0s - + + # Set up minio and create a bucket for tests + - name: Set up minio + run: | + kubectl apply -f $GITHUB_WORKSPACE/deployments/engine/helm/minio/minio.yaml + kubectl get -f $GITHUB_WORKSPACE/deployments/engine/helm/minio/minio.yaml + kubectl describe -f $GITHUB_WORKSPACE/deployments/engine/helm/minio/minio.yaml + - name: Wait for minio ready + run: | + kubectl wait --for=condition=Ready pod/chaos-minio-0 --timeout=10m || true + echo show pvc + kubectl get pvc -l app=minio -o wide + echo show pv + kubectl get pv -o wide + echo show svc + kubectl get svc -l app=minio -o wide + echo show sts + kubectl get sts -l app=minio -o wide + echo show po + kubectl get po -l app=minio -o wide + echo describe po + kubectl describe po -l app=minio + echo describe pvc + kubectl describe pvc -l app=minio + kubectl wait --for=condition=Ready pod/chaos-minio-0 --timeout=0s + - name: Set up minio-create-bucket job + run: | + kubectl apply -f $GITHUB_WORKSPACE/deployments/engine/helm/minio/minio-create-bucket.yaml + kubectl get -f $GITHUB_WORKSPACE/deployments/engine/helm/minio/minio-create-bucket.yaml + kubectl describe -f $GITHUB_WORKSPACE/deployments/engine/helm/minio/minio-create-bucket.yaml + kubectl wait --for=condition=Complete job/chaos-minio-create-bucket-job --timeout=2m + # Set up metastore and basic services - name: Set up metastore and basic services run: | diff --git a/.github/workflows/dataflow_engine_e2e.yaml b/.github/workflows/dataflow_engine_e2e.yaml deleted file mode 100644 index 26f7a173c62..00000000000 --- a/.github/workflows/dataflow_engine_e2e.yaml +++ /dev/null @@ -1,105 +0,0 @@ -name: Dataflow Engine e2e tests - -# Controls when the action will run. Triggers the workflow on push or pull request -# events but only for the master branch -on: - push: - branches: [ master ] - pull_request: - branches: [ master ] - paths: - - 'engine/**' - - 'dm/**' - -# See: https://docs.github.com/en/actions/reference/workflow-syntax-for-github-actions#concurrency. -concurrency: - group: ${{ github.ref }}-${{ github.workflow }} - cancel-in-progress: true - -jobs: - Node-failure-workflow: - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v2 - - - uses: actions/setup-go@v3 - with: - go-version: 1.19 - - - name: Build images - run: make engine_image - - - name: Run containers - run: docker compose -f $GITHUB_WORKSPACE/deployments/engine/docker-compose/3m3e.yaml up -d - - - name: Run tests - run: | - cd $GITHUB_WORKSPACE/engine/test/e2e - go test -count=1 -v -run=TestNodeFailure - - - name: Upload logs to GitHub - if: ${{ failure() }} - uses: actions/upload-artifact@master - with: - name: node-failure-workflow-logs - path: /tmp/tiflow_engine_test/*log - - Worker-error-workflow: - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v2 - - - uses: actions/setup-go@v3 - with: - go-version: 1.19 - - - name: Build images - run: make engine_image - - - name: Run containers - run: docker compose -f $GITHUB_WORKSPACE/deployments/engine/docker-compose/3m3e.yaml up -d - - - name: Run tests - run: | - cd $GITHUB_WORKSPACE/engine/test/e2e - go test -count=1 -v -run=TestWorkerExit - - - name: Upload logs to GitHub - if: ${{ failure() }} - uses: actions/upload-artifact@master - with: - name: worker-errror-workflow-logs - path: /tmp/tiflow_engine_test/*log - - DM-workflow: - runs-on: ubuntu-latest - - steps: - - uses: actions/checkout@v2 - - - uses: actions/setup-go@v3 - with: - go-version: 1.19 - - - name: Build images - run: make engine_image - - - - name: Run containers - run: docker compose -f $GITHUB_WORKSPACE/deployments/engine/docker-compose/3m3e.yaml -f $GITHUB_WORKSPACE/deployments/engine/docker-compose/dm_databases.yaml up -d - - - name: Run tests - run: | - $GITHUB_WORKSPACE/engine/test/utils/wait_mysql_online.sh --host 127.0.0.1 --port 3306 - $GITHUB_WORKSPACE/engine/test/utils/wait_mysql_online.sh --host 127.0.0.1 --port 4000 - cd $GITHUB_WORKSPACE/engine/test/e2e - go test -count=1 -v -run=TestDMJob - - - name: Upload logs to GitHub - if: ${{ failure() }} - uses: actions/upload-artifact@master - with: - name: dm-workflow-logs - path: /tmp/tiflow_engine_test/*log diff --git a/deployments/engine/docker-compose/1minio.yaml b/deployments/engine/docker-compose/1minio.yaml index d9203e0c3ce..5c10d5d1c6c 100644 --- a/deployments/engine/docker-compose/1minio.yaml +++ b/deployments/engine/docker-compose/1minio.yaml @@ -1,7 +1,7 @@ version: '2.3' services: - minio: + minio-standalone: image: minio/minio container_name: minio-standalone command: server --console-address ":9001" /data @@ -21,10 +21,10 @@ services: createbuckets: image: minio/mc depends_on: - - minio + - minio-standalone entrypoint: > /bin/sh -c " - /usr/bin/mc alias set myminio http://minio:9000 engine engineSecret; + /usr/bin/mc alias set myminio http://minio-standalone:9000 engine engineSecret; /usr/bin/mc mb myminio/engine-ut; /usr/bin/mc version enable myminio/engine-ut; exit 0; diff --git a/deployments/engine/docker-compose/3m3e_with_s3.yaml b/deployments/engine/docker-compose/3m3e_with_s3.yaml new file mode 100644 index 00000000000..b8e53fa052c --- /dev/null +++ b/deployments/engine/docker-compose/3m3e_with_s3.yaml @@ -0,0 +1,194 @@ +version: "2.3" +services: + server-master-0: + image: dataflow:test + container_name: server-master-0 + ports: + - "10245:10240" + volumes: + - ./config/master_with_s3.toml:/config.toml + - /tmp/tiflow_engine_test:/log + command: + - "/tiflow" + - "master" + - "--name=server-master-0" + - "--addr=0.0.0.0:10240" + - "--advertise-addr=server-master-0:10240" + - "--config=/config.toml" + - "--log-file=/log/server-master-0.log" + depends_on: + "etcd-standalone": + condition: service_started + "mysql-standalone": + condition: service_healthy + "minio-standalone": + condition: service_healthy + server-master-1: + image: dataflow:test + container_name: server-master-1 + ports: + - "10246:10240" + volumes: + - ./config/master_with_s3.toml:/config.toml + - /tmp/tiflow_engine_test:/log + command: + - "/tiflow" + - "master" + - "--name=server-master-1" + - "--addr=0.0.0.0:10240" + - "--advertise-addr=server-master-1:10240" + - "--config=/config.toml" + - "--log-file=/log/server-master-1.log" + depends_on: + "etcd-standalone": + condition: service_started + "mysql-standalone": + condition: service_healthy + "minio-standalone": + condition: service_healthy + server-master-2: + image: dataflow:test + container_name: server-master-2 + ports: + - "10247:10240" + volumes: + - ./config/master_with_s3.toml:/config.toml + - /tmp/tiflow_engine_test:/log + command: + - "/tiflow" + - "master" + - "--name=server-master-2" + - "--addr=0.0.0.0:10240" + - "--advertise-addr=server-master-2:10240" + - "--config=/config.toml" + - "--log-file=/log/server-master-2.log" + depends_on: + "etcd-standalone": + condition: service_started + "mysql-standalone": + condition: service_healthy + "minio-standalone": + condition: service_healthy + server-executor-0: + image: dataflow:test + container_name: server-executor-0 + ports: + - "11241:10241" + volumes: + - ./config/executor_with_s3.toml:/config.toml + - /tmp/tiflow_engine_test:/log + command: + - "/tiflow" + - "executor" + - "--name=server-executor-0" + - "--addr=0.0.0.0:10241" + - "--advertise-addr=server-executor-0:10241" + - "--join=server-master-0:10240,server-master-1:10240,server-master-2:10240" + - "--config=/config.toml" + - "--log-file=/log/server-executor-0.log" + - "--labels=\"name=exec-0\"" + depends_on: + - "server-master-0" + - "server-master-2" + - "server-master-1" + extra_hosts: + - "host.docker.internal:host-gateway" + server-executor-1: + image: dataflow:test + container_name: server-executor-1 + ports: + - "11242:10241" + volumes: + - ./config/executor_with_s3.toml:/config.toml + - /tmp/tiflow_engine_test:/log + command: + - "/tiflow" + - "executor" + - "--name=server-executor-1" + - "--addr=0.0.0.0:10241" + - "--advertise-addr=server-executor-1:10241" + - "--join=server-master-0:10240,server-master-1:10240,server-master-2:10240" + - "--config=/config.toml" + - "--log-file=/log/server-executor-1.log" + - "--labels=\"name=exec-1\"" + depends_on: + - "server-master-0" + - "server-master-2" + - "server-master-1" + extra_hosts: + - "host.docker.internal:host-gateway" + server-executor-2: + image: dataflow:test + container_name: server-executor-2 + ports: + - "11243:10241" + volumes: + - ./config/executor_with_s3.toml:/config.toml + - /tmp/tiflow_engine_test:/log + command: + - "/tiflow" + - "executor" + - "--name=server-executor-2" + - "--addr=0.0.0.0:10241" + - "--advertise-addr=server-executor-2:10241" + - "--join=server-master-0:10240,server-master-1:10240,server-master-2:10240" + - "--config=/config.toml" + - "--log-file=/log/server-executor-2.log" + - "--labels=\"name=exec-2\"" + depends_on: + - "server-master-0" + - "server-master-2" + - "server-master-1" + extra_hosts: + - "host.docker.internal:host-gateway" + etcd-standalone: + image: quay.io/coreos/etcd + container_name: etcd-standalone + command: + - "etcd" + - "--listen-client-urls=http://0.0.0.0:2379" + - "--advertise-client-urls=http://etcd-standalone:2379" + ports: + - "12479:2379" + mysql-standalone: + image: mysql:8.0 + container_name: mysql-standalone + command: --default-authentication-plugin=mysql_native_password + environment: + MYSQL_ALLOW_EMPTY_PASSWORD: "yes" + volumes: + - './config/mysql_meta.cnf:/etc/my.cnf' + ports: + - "3336:3306" + healthcheck: + test: mysql -h127.0.0.1 -P3306 -e "show databases" + interval: 10s + timeout: 600s + retries: 60 + minio-standalone: + image: minio/minio + container_name: minio-standalone + command: server --console-address ":9001" /data + ports: + - "9000:9000" + - "9001:9001" + environment: + MINIO_ROOT_USER: engine + MINIO_ROOT_PASSWORD: engineSecret + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 10s + timeout: 60s + retries: 6 + minio-createbuckets: + image: minio/mc + container_name: minio-createbuckets + depends_on: + - minio-standalone + entrypoint: > + /bin/sh -c " + /usr/bin/mc alias set myminio http://minio-standalone:9000 engine engineSecret || exit 1; + /usr/bin/mc mb myminio/engine-it || exit 1; + /usr/bin/mc version enable myminio/engine-ut; + exit 0; + " diff --git a/deployments/engine/docker-compose/3m3e_with_tls.yaml b/deployments/engine/docker-compose/3m3e_with_tls.yaml index a6c7819dd8d..02edc5f82e2 100644 --- a/deployments/engine/docker-compose/3m3e_with_tls.yaml +++ b/deployments/engine/docker-compose/3m3e_with_tls.yaml @@ -75,7 +75,7 @@ services: ports: - "11241:10241" volumes: - - ./config/executor.toml:/config.toml + - ./config/executor_with_s3.toml:/config.toml - /tmp/tiflow_engine_test:/log command: - "/tiflow" @@ -101,7 +101,7 @@ services: ports: - "11242:10241" volumes: - - ./config/executor.toml:/config.toml + - ./config/executor_with_s3.toml:/config.toml - /tmp/tiflow_engine_test:/log command: - "/tiflow" @@ -127,7 +127,7 @@ services: ports: - "11243:10241" volumes: - - ./config/executor.toml:/config.toml + - ./config/executor_with_s3.toml:/config.toml - /tmp/tiflow_engine_test:/log command: - "/tiflow" @@ -172,3 +172,30 @@ services: interval: 10s timeout: 600s retries: 60 + minio-standalone: + image: minio/minio + container_name: minio-standalone + command: server --console-address ":9001" /data + ports: + - "9000:9000" + - "9001:9001" + environment: + MINIO_ROOT_USER: engine + MINIO_ROOT_PASSWORD: engineSecret + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"] + interval: 10s + timeout: 60s + retries: 6 + minio-createbuckets: + image: minio/mc + container_name: minio-createbuckets + depends_on: + - minio-standalone + entrypoint: > + /bin/sh -c " + /usr/bin/mc alias set myminio http://minio-standalone:9000 engine engineSecret || exit 1; + /usr/bin/mc mb myminio/engine-it || exit 1; + /usr/bin/mc version enable myminio/engine-ut; + exit 0; + " \ No newline at end of file diff --git a/deployments/engine/docker-compose/config/executor_with_s3.toml b/deployments/engine/docker-compose/config/executor_with_s3.toml new file mode 100644 index 00000000000..82197a9d342 --- /dev/null +++ b/deployments/engine/docker-compose/config/executor_with_s3.toml @@ -0,0 +1,12 @@ +keepalive-ttl = "20s" +keepalive-interval = "500ms" +session-ttl = 20 + +[log] +level = "debug" + +[storage.s3] +bucket = "engine-it" +endpoint = "http://minio-standalone:9000/" +access-key = "engine" +secret-access-key = "engineSecret" diff --git a/deployments/engine/docker-compose/config/master_with_s3.toml b/deployments/engine/docker-compose/config/master_with_s3.toml new file mode 100644 index 00000000000..137d6d91a3b --- /dev/null +++ b/deployments/engine/docker-compose/config/master_with_s3.toml @@ -0,0 +1,26 @@ +[framework-meta] +endpoints = ["mysql-standalone:3306"] +schema = "test_framework" +user = "root" +password = "" + +[business-meta] +endpoints = ["mysql-standalone:3306"] +schema = "test_business" +user = "root" +password = "" + +[log] +level = "debug" + +[job-backoff] +reset-interval = "2s" +initial-interval = "1s" +max-interval = "15s" +max-try-time = 100 + +[storage.s3] +bucket = "engine-it" +endpoint = "http://minio-standalone:9000/" +access-key = "engine" +secret-access-key = "engineSecret" diff --git a/deployments/engine/docker-compose/config/master_with_tls.toml b/deployments/engine/docker-compose/config/master_with_tls.toml index 69bce254e79..9d037d0265d 100644 --- a/deployments/engine/docker-compose/config/master_with_tls.toml +++ b/deployments/engine/docker-compose/config/master_with_tls.toml @@ -23,3 +23,9 @@ level = "debug" reset-interval = "2s" initial-interval = "1s" max-interval = "15s" + +[storage.s3] +bucket = "engine-it" +endpoint = "http://minio-standalone:9000/" +access-key = "engine" +secret-access-key = "engineSecret" diff --git a/deployments/engine/helm/minio/minio-create-bucket.yaml b/deployments/engine/helm/minio/minio-create-bucket.yaml new file mode 100644 index 00000000000..2a7e3afea1c --- /dev/null +++ b/deployments/engine/helm/minio/minio-create-bucket.yaml @@ -0,0 +1,14 @@ +apiVersion: batch/v1 +kind: Job +metadata: + name: chaos-minio-create-bucket-job +spec: + template: + spec: + containers: + - name: chaos-test-case + image: minio/mc + imagePullPolicy: IfNotPresent + command: ['/bin/sh', '-c', '/usr/bin/mc alias set myminio http://chaos-minio:9000 engine engineSecret || exit 1; /usr/bin/mc mb myminio/engine-it || exit 1;'] + restartPolicy: Never + backoffLimit: 0 # fail immediately diff --git a/deployments/engine/helm/minio/minio.yaml b/deployments/engine/helm/minio/minio.yaml new file mode 100644 index 00000000000..c8dad84eefb --- /dev/null +++ b/deployments/engine/helm/minio/minio.yaml @@ -0,0 +1,73 @@ +apiVersion: v1 +kind: Service +metadata: + name: chaos-minio + labels: + app: chaos-minio +spec: + ports: + - name: port-minio # note the name is no more than 15 characters + port: 9000 + targetPort: 9000 + selector: + app: chaos-minio + +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: chaos-minio + labels: + app: chaos-minio +spec: + selector: + matchLabels: + app: chaos-minio + serviceName: chaos-minio + replicas: 1 + template: + metadata: + labels: + app: chaos-minio + spec: + containers: + - name: minio + image: minio/minio:latest + imagePullPolicy: IfNotPresent + env: + - name: MINIO_ACCESS_KEY + value: "engine" + - name: MINIO_SECRET_KEY + value: "engineSecret" + args: + - server + - /data + ports: + - containerPort: 9000 + hostPort: 9000 + # These volume mounts are persistent. Each pod in the StatefulSet + # gets a volume mounted based on this field. + volumeMounts: + - name: data + mountPath: /data + livenessProbe: + exec: + command: + - "curl" + - "-f" + - "http://localhost:9000/minio/health/live" + initialDelaySeconds: 3 + periodSeconds: 10 + timeoutSeconds: 60 + # These are converted to volume claims by the controller + # and mounted at the paths mentioned above. + volumeClaimTemplates: + - metadata: + name: data + spec: + accessModes: + - ReadWriteOnce + storageClassName: standard + resources: + requests: + storage: 10Gi diff --git a/deployments/engine/helm/tiflow/values.yaml b/deployments/engine/helm/tiflow/values.yaml index 714e62c4389..d749d61c07d 100644 --- a/deployments/engine/helm/tiflow/values.yaml +++ b/deployments/engine/helm/tiflow/values.yaml @@ -31,6 +31,12 @@ master: max-interval = "15s" max-try-time = 100 + [storage.s3] + bucket = "engine-it" + endpoint = "http://chaos-minio:9000/" + access-key = "engine" + secret-access-key = "engineSecret" + executor: replicas: 4 logStorage: 1Gi @@ -39,6 +45,12 @@ executor: keepalive-interval = "500ms" session-ttl = 20 + [storage.s3] + bucket = "engine-it" + endpoint = "http://chaos-minio:9000/" + access-key = "engine" + secret-access-key = "engineSecret" + metastore: frameworkStorage: 5Gi businessStorage: 5Gi diff --git a/engine/executor/config.go b/engine/executor/config.go index a86c4715c0f..8cdc80cd344 100644 --- a/engine/executor/config.go +++ b/engine/executor/config.go @@ -169,9 +169,6 @@ func GetDefaultExecutorConfig() *Config { KeepAliveTTLStr: defaultKeepAliveTTL, KeepAliveIntervalStr: defaultKeepAliveInterval, RPCTimeoutStr: defaultRPCTimeout, - Storage: resModel.Config{ - Local: resModel.LocalFileConfig{BaseDir: ""}, - S3: resModel.S3Config{Bucket: ""}, - }, + Storage: resModel.DefaultConfig, } } diff --git a/engine/framework/mock_worker_util.go b/engine/framework/mock_worker_util.go index 479ee86cde8..b7e44b99056 100644 --- a/engine/framework/mock_worker_util.go +++ b/engine/framework/mock_worker_util.go @@ -34,7 +34,7 @@ import ( // BaseWorkerForTesting mocks base worker type BaseWorkerForTesting struct { *DefaultBaseWorker - Broker *broker.LocalBroker + Broker *broker.MockBroker } // MockBaseWorker creates a mock base worker for test diff --git a/engine/jobmaster/dm/utils.go b/engine/jobmaster/dm/utils.go index b4de66a4664..7c8982eaced 100644 --- a/engine/jobmaster/dm/utils.go +++ b/engine/jobmaster/dm/utils.go @@ -17,7 +17,7 @@ import ( resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" ) -// NewDMResourceID returns a ResourceID in DM's style. Currently only support local resource. +// NewDMResourceID returns a ResourceID in DM's style. Currently only support s3 resource. func NewDMResourceID(taskName, sourceName string) resModel.ResourceID { - return "/" + string(resModel.ResourceTypeLocalFile) + "/" + taskName + "/" + sourceName + return "/" + string(resModel.ResourceTypeS3) + "/" + taskName + "-" + sourceName } diff --git a/engine/pkg/externalresource/broker/broker_integration_test.go b/engine/pkg/externalresource/broker/broker_integration_test.go index 738ada29882..7bd7857a5f5 100644 --- a/engine/pkg/externalresource/broker/broker_integration_test.go +++ b/engine/pkg/externalresource/broker/broker_integration_test.go @@ -80,7 +80,7 @@ func newBrokerForS3WithPrefix( cli.On("CreateResource", mock.Anything, &pb.CreateResourceRequest{ ProjectInfo: &pb.ProjectInfo{}, - ResourceId: fmt.Sprintf("/s3/%s", s3.DummyResourceName), + ResourceId: s3.DummyResourceID, CreatorExecutor: string(executorID), JobId: s3.GetDummyJobID(executorID), CreatorWorkerId: s3.DummyWorkerID, @@ -104,7 +104,7 @@ func newBrokerForS3WithPrefix( require.NoError(t, err) // check dummy resource exists - checkFile(t, rootStrorage, "keep-alive-worker/dummy/.keep", fileExists) + checkFile(t, rootStrorage, s3.GetDummyResPath(".keep"), fileExists) return broker, cli, tmpDir, rootStrorage, s3Prefix } @@ -239,12 +239,16 @@ func TestIntegrationBrokerOpenNewS3Storage(t *testing.T) { brk, cli, dir, rootStrorage, _ := newBrokerForS3(t, s3.MockExecutorID) // test local file works well under this condition + resID := "/local/test-1" + _, resName, err := resModel.ParseResourceID(resID) + require.NoError(t, err) + cli.On("QueryResource", mock.Anything, - &pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: fakeJobID, ResourceId: "/local/test-1"}}, mock.Anything). + &pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: fakeJobID, ResourceId: resID}}, mock.Anything). Return((*pb.QueryResourceResponse)(nil), status.Error(codes.NotFound, "resource manager error")) - hdl, err := brk.OpenStorage(context.Background(), fakeProjectInfo, "worker-1", fakeJobID, "/local/test-1") + hdl, err := brk.OpenStorage(context.Background(), fakeProjectInfo, "worker-1", fakeJobID, resID) require.NoError(t, err) - require.Equal(t, "/local/test-1", hdl.ID()) + require.Equal(t, resID, hdl.ID()) cli.AssertExpectations(t) cli.ExpectedCalls = nil @@ -260,7 +264,7 @@ func TestIntegrationBrokerOpenNewS3Storage(t *testing.T) { TenantId: fakeProjectInfo.TenantID(), ProjectId: fakeProjectInfo.ProjectID(), }, - ResourceId: "/local/test-1", + ResourceId: resID, CreatorExecutor: s3.MockExecutorID, JobId: fakeJobID, CreatorWorkerId: "worker-1", @@ -271,7 +275,7 @@ func TestIntegrationBrokerOpenNewS3Storage(t *testing.T) { cli.AssertExpectations(t) - local.AssertLocalFileExists(t, dir, "worker-1", "test-1", "1.txt") + local.AssertLocalFileExists(t, dir, "worker-1", resName, "1.txt") // test s3 testFiles := []string{"1.txt", "inner1/2.txt", "inner1/inner2/3.txt"} diff --git a/engine/pkg/externalresource/broker/broker_test.go b/engine/pkg/externalresource/broker/broker_test.go index aa470dc1f3c..03fa5da9b04 100644 --- a/engine/pkg/externalresource/broker/broker_test.go +++ b/engine/pkg/externalresource/broker/broker_test.go @@ -46,12 +46,16 @@ func TestBrokerOpenNewStorage(t *testing.T) { brk, cli, dir := newBroker(t) defer brk.Close() + resID := "/local/test-1" + _, resName, err := resModel.ParseResourceID(resID) + require.NoError(t, err) + cli.On("QueryResource", mock.Anything, - &pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: "job-1", ResourceId: "/local/test-1"}}, mock.Anything). + &pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: "job-1", ResourceId: resID}}, mock.Anything). Return((*pb.QueryResourceResponse)(nil), status.Error(codes.NotFound, "resource manager error")) - hdl, err := brk.OpenStorage(context.Background(), fakeProjectInfo, "worker-1", "job-1", "/local/test-1") + hdl, err := brk.OpenStorage(context.Background(), fakeProjectInfo, "worker-1", "job-1", resID) require.NoError(t, err) - require.Equal(t, "/local/test-1", hdl.ID()) + require.Equal(t, resID, hdl.ID()) cli.AssertExpectations(t) cli.ExpectedCalls = nil @@ -64,7 +68,7 @@ func TestBrokerOpenNewStorage(t *testing.T) { cli.On("CreateResource", mock.Anything, &pb.CreateResourceRequest{ ProjectInfo: &pb.ProjectInfo{TenantId: fakeProjectInfo.TenantID(), ProjectId: fakeProjectInfo.ProjectID()}, - ResourceId: "/local/test-1", + ResourceId: resID, CreatorExecutor: "executor-1", JobId: "job-1", CreatorWorkerId: "worker-1", @@ -75,7 +79,7 @@ func TestBrokerOpenNewStorage(t *testing.T) { cli.AssertExpectations(t) - local.AssertLocalFileExists(t, dir, "worker-1", "test-1", "1.txt") + local.AssertLocalFileExists(t, dir, "worker-1", resName, "1.txt") } func TestBrokerOpenExistingStorage(t *testing.T) { @@ -84,12 +88,15 @@ func TestBrokerOpenExistingStorage(t *testing.T) { brk, cli, dir := newBroker(t) defer brk.Close() + resID := "/local/test-2" + _, resName, err := resModel.ParseResourceID(resID) + require.NoError(t, err) cli.On("QueryResource", mock.Anything, - &pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: "job-1", ResourceId: "/local/test-2"}}, mock.Anything). + &pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: "job-1", ResourceId: resID}}, mock.Anything). Return((*pb.QueryResourceResponse)(nil), status.Error(codes.NotFound, "resource manager error")).Once() cli.On("CreateResource", mock.Anything, &pb.CreateResourceRequest{ ProjectInfo: &pb.ProjectInfo{TenantId: fakeProjectInfo.TenantID(), ProjectId: fakeProjectInfo.ProjectID()}, - ResourceId: "/local/test-2", + ResourceId: resID, CreatorExecutor: "executor-1", JobId: "job-1", CreatorWorkerId: "worker-2", @@ -100,23 +107,23 @@ func TestBrokerOpenExistingStorage(t *testing.T) { fakeProjectInfo, "worker-2", "job-1", - "/local/test-2") + resID) require.NoError(t, err) err = hdl.Persist(context.Background()) require.NoError(t, err) cli.On("QueryResource", mock.Anything, - &pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: "job-1", ResourceId: "/local/test-2"}}, mock.Anything). + &pb.QueryResourceRequest{ResourceKey: &pb.ResourceKey{JobId: "job-1", ResourceId: resID}}, mock.Anything). Return(&pb.QueryResourceResponse{ CreatorExecutor: "executor-1", JobId: "job-1", CreatorWorkerId: "worker-2", }, nil) - hdl, err = brk.OpenStorage(context.Background(), fakeProjectInfo, "worker-1", "job-1", "/local/test-2") + hdl, err = brk.OpenStorage(context.Background(), fakeProjectInfo, "worker-1", "job-1", resID) require.NoError(t, err) - require.Equal(t, "/local/test-2", hdl.ID()) + require.Equal(t, resID, hdl.ID()) cli.AssertExpectations(t) @@ -126,7 +133,7 @@ func TestBrokerOpenExistingStorage(t *testing.T) { err = f.Close(context.Background()) require.NoError(t, err) - local.AssertLocalFileExists(t, dir, "worker-2", "test-2", "1.txt") + local.AssertLocalFileExists(t, dir, "worker-2", resName, "1.txt") } func TestBrokerRemoveResource(t *testing.T) { @@ -134,7 +141,8 @@ func TestBrokerRemoveResource(t *testing.T) { brk, _, dir := newBroker(t) defer brk.Close() - resPath := filepath.Join(dir, "worker-1", local.ResourceNameToFilePathName("resource-1")) + resName := resModel.EncodeResourceName("resource-1") + resPath := filepath.Join(dir, "worker-1", local.ResourceNameToFilePathName(resName)) err := os.MkdirAll(resPath, 0o700) require.NoError(t, err) diff --git a/engine/pkg/externalresource/broker/local_broker.go b/engine/pkg/externalresource/broker/mock_broker.go similarity index 73% rename from engine/pkg/externalresource/broker/local_broker.go rename to engine/pkg/externalresource/broker/mock_broker.go index 25345769f7b..1a60bc43f82 100644 --- a/engine/pkg/externalresource/broker/local_broker.go +++ b/engine/pkg/externalresource/broker/mock_broker.go @@ -16,27 +16,27 @@ package broker import ( "context" "os" - "strings" "sync" "testing" "github.com/pingcap/log" pb "github.com/pingcap/tiflow/engine/enginepb" "github.com/pingcap/tiflow/engine/pkg/externalresource/internal/local" + "github.com/pingcap/tiflow/engine/pkg/externalresource/internal/s3" "github.com/pingcap/tiflow/engine/pkg/externalresource/manager" resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/engine/pkg/tenant" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" ) -var _ Broker = (*LocalBroker)(nil) +var _ Broker = (*MockBroker)(nil) -// LocalBroker is a broker unit-testing other components -// that depend on a Broker. -type LocalBroker struct { +// MockBroker is a broker used to testing other components that depend on a Broker +type MockBroker struct { *DefaultBroker config *resModel.Config @@ -47,11 +47,11 @@ type LocalBroker struct { persistedList []resModel.ResourceID } -// NewBrokerForTesting creates a LocalBroker instance for testing only -func NewBrokerForTesting(executorID resModel.ExecutorID) *LocalBroker { +// NewBrokerForTesting creates a MockBroker instance for testing only +func NewBrokerForTesting(executorID resModel.ExecutorID) *MockBroker { dir, err := os.MkdirTemp("/tmp", "*-localfiles") if err != nil { - log.Panic("failed to make tempdir") + log.Panic("failed to make tempdir", zap.Error(err)) } cfg := &resModel.Config{Local: resModel.LocalFileConfig{BaseDir: dir}} client := manager.NewMockClient() @@ -59,7 +59,15 @@ func NewBrokerForTesting(executorID resModel.ExecutorID) *LocalBroker { if err != nil { log.Panic("failed to create broker") } - return &LocalBroker{ + + s3dir, err := os.MkdirTemp("/tmp", "*-s3files") + if err != nil { + log.Panic("failed to make tempdir", zap.Error(err)) + } + s3FileManager, _ := s3.NewFileManagerForUT(s3dir, executorID) + broker.fileManagers[resModel.ResourceTypeS3] = s3FileManager + + return &MockBroker{ DefaultBroker: broker, config: cfg, client: client, @@ -67,7 +75,7 @@ func NewBrokerForTesting(executorID resModel.ExecutorID) *LocalBroker { } // OpenStorage wraps broker.OpenStorage -func (b *LocalBroker) OpenStorage( +func (b *MockBroker) OpenStorage( ctx context.Context, projectInfo tenant.ProjectInfo, workerID resModel.WorkerID, @@ -94,14 +102,14 @@ func (b *LocalBroker) OpenStorage( } // AssertPersisted checks resource is in persisted list -func (b *LocalBroker) AssertPersisted(t *testing.T, id resModel.ResourceID) { +func (b *MockBroker) AssertPersisted(t *testing.T, id resModel.ResourceID) { b.mu.Lock() defer b.mu.Unlock() require.Contains(t, b.persistedList, id) } -func (b *LocalBroker) appendPersistRecord(id resModel.ResourceID) { +func (b *MockBroker) appendPersistRecord(id resModel.ResourceID) { b.mu.Lock() defer b.mu.Unlock() @@ -109,19 +117,21 @@ func (b *LocalBroker) appendPersistRecord(id resModel.ResourceID) { } // AssertFileExists checks lock file exists -func (b *LocalBroker) AssertFileExists( +func (b *MockBroker) AssertFileExists( t *testing.T, workerID resModel.WorkerID, resourceID resModel.ResourceID, fileName string, ) { - suffix := strings.TrimPrefix(resourceID, "/local/") - local.AssertLocalFileExists(t, b.config.Local.BaseDir, workerID, suffix, fileName) + tp, resName, err := resModel.ParseResourceID(resourceID) + require.NoError(t, err) + require.Equal(t, resModel.ResourceTypeLocalFile, tp) + local.AssertLocalFileExists(t, b.config.Local.BaseDir, workerID, resName, fileName) } type brExternalStorageHandleForTesting struct { Handle - parent *LocalBroker + parent *MockBroker } func (h *brExternalStorageHandleForTesting) Persist(ctx context.Context) error { diff --git a/engine/pkg/externalresource/broker/storage_handle_test.go b/engine/pkg/externalresource/broker/storage_handle_test.go index 0eb4163cb9d..14eed1e9c7f 100644 --- a/engine/pkg/externalresource/broker/storage_handle_test.go +++ b/engine/pkg/externalresource/broker/storage_handle_test.go @@ -21,7 +21,7 @@ import ( "github.com/pingcap/tiflow/engine/pkg/externalresource/internal" "github.com/pingcap/tiflow/engine/pkg/externalresource/internal/local" "github.com/pingcap/tiflow/engine/pkg/externalresource/manager" - "github.com/pingcap/tiflow/engine/pkg/externalresource/model" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/engine/pkg/tenant" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -29,10 +29,10 @@ import ( func newResourceIdentForTesting(executor, workerID, resourceName string) internal.ResourceIdent { return internal.ResourceIdent{ - Name: resourceName, + Name: resModel.EncodeResourceName(resourceName), ResourceScope: internal.ResourceScope{ ProjectInfo: tenant.NewProjectInfo("fakeTenant", "fakeProject"), - Executor: model.ExecutorID(executor), + Executor: resModel.ExecutorID(executor), WorkerID: workerID, }, } @@ -42,9 +42,9 @@ func TestStorageHandlePersistAndDiscard(t *testing.T) { t.Parallel() dir := t.TempDir() - executor := model.ExecutorID("executor-1") + executor := resModel.ExecutorID("executor-1") ident := newResourceIdentForTesting(string(executor), "worker-1", "test-resource") - fm := local.NewLocalFileManager(executor, model.LocalFileConfig{BaseDir: dir}) + fm := local.NewLocalFileManager(executor, resModel.LocalFileConfig{BaseDir: dir}) cli := manager.NewMockClient() ctx := context.Background() @@ -108,7 +108,7 @@ func TestStorageHandlePersistAndDiscard(t *testing.T) { func TestStorageHandleDiscardTemporaryResource(t *testing.T) { t.Parallel() dir := t.TempDir() - fm := local.NewLocalFileManager("", model.LocalFileConfig{BaseDir: dir}) + fm := local.NewLocalFileManager("", resModel.LocalFileConfig{BaseDir: dir}) cli := manager.NewMockClient() ctx := context.Background() diff --git a/engine/pkg/externalresource/integration_test/gc_test.go b/engine/pkg/externalresource/integration_test/gc_test.go index 01623202827..8af6b35489d 100644 --- a/engine/pkg/externalresource/integration_test/gc_test.go +++ b/engine/pkg/externalresource/integration_test/gc_test.go @@ -42,12 +42,15 @@ func TestLocalFileTriggeredByJobRemoval(t *testing.T) { cluster.jobInfo.SetJobStatus("job-1", frameModel.MasterStateInit) + resID := "/local/resource-1" + _, resName, err := resModel.ParseResourceID(resID) + require.NoError(t, err) handle, err := brk.OpenStorage( context.Background(), fakeProjectInfo, "worker-1", "job-1", - "/local/resource-1") + resID) require.NoError(t, err) _, err = handle.BrExternalStorage().Create(context.Background(), "1.txt") @@ -56,19 +59,19 @@ func TestLocalFileTriggeredByJobRemoval(t *testing.T) { require.NoError(t, err) // Assert meta exists for `/local/resource-1` - resMeta, err := cluster.meta.GetResourceByID(ctx, pkgOrm.ResourceKey{JobID: "job-1", ID: "/local/resource-1"}) + resMeta, err := cluster.meta.GetResourceByID(ctx, pkgOrm.ResourceKey{JobID: "job-1", ID: resID}) require.NoError(t, err) require.Equal(t, model.ExecutorID("executor-1"), resMeta.Executor) - local.AssertLocalFileExists(t, baseDir, "worker-1", "resource-1", "1.txt") + local.AssertLocalFileExists(t, baseDir, "worker-1", resName, "1.txt") // Triggers GC by removing the job cluster.jobInfo.RemoveJob("job-1") require.Eventually(t, func() bool { - _, err := cluster.meta.GetResourceByID(ctx, pkgOrm.ResourceKey{JobID: "job-1", ID: "/local/resource-1"}) + _, err := cluster.meta.GetResourceByID(ctx, pkgOrm.ResourceKey{JobID: "job-1", ID: resID}) log.Warn("GetResourceByID", zap.Error(err)) return err != nil && pkgOrm.IsNotFoundError(err) }, 1*time.Second, 5*time.Millisecond) - local.AssertNoLocalFileExists(t, baseDir, "worker-1", "resource-1", "1.txt") + local.AssertNoLocalFileExists(t, baseDir, "worker-1", resName, "1.txt") cluster.Stop() } diff --git a/engine/pkg/externalresource/internal/s3/file_manager.go b/engine/pkg/externalresource/internal/s3/file_manager.go index 9537578e0e4..e4fb13b1c49 100644 --- a/engine/pkg/externalresource/internal/s3/file_manager.go +++ b/engine/pkg/externalresource/internal/s3/file_manager.go @@ -164,7 +164,7 @@ func (m *FileManager) removeTemporaryFilesForExecutor( ctx context.Context, scope internal.ResourceScope, ) error { // Get all persisted files which is created by current executor. - persistedResSet := make(map[resModel.ResourceName]struct{}) + persistedResSet := make(map[string]struct{}) m.mu.RLock() for workerID, resources := range m.persistedResMap { @@ -183,7 +183,7 @@ func (m *FileManager) removeTemporaryFilesForExecutor( func (m *FileManager) removeAllTemporaryFilesByMeta( ctx context.Context, scope internal.ResourceScope, - persistedResSet map[resModel.ResourceName]struct{}, + persistedResSet map[string]struct{}, ) error { log.Info("Removing temporary resources for executor", zap.Any("scope", scope)) @@ -310,6 +310,7 @@ func createPlaceholderFile(ctx context.Context, storage brStorage.ExternalStorag // PreCheckConfig does a preflight check on the executor's storage configurations. func PreCheckConfig(config resModel.S3Config) error { // TODO: use customized retry policy. + log.Debug("pre-checking broker config", zap.Any("config", config)) factory := NewExternalStorageFactory(config.Bucket, config.Prefix, &config.S3BackendOptions) _, err := factory.newS3ExternalStorageForScope(context.Background(), internal.ResourceScope{}) diff --git a/engine/pkg/externalresource/internal/s3/file_manager_test.go b/engine/pkg/externalresource/internal/s3/file_manager_test.go index 3d24921916e..a674f9c7b29 100644 --- a/engine/pkg/externalresource/internal/s3/file_manager_test.go +++ b/engine/pkg/externalresource/internal/s3/file_manager_test.go @@ -15,73 +15,18 @@ package s3 import ( "context" - "fmt" "path/filepath" "testing" - brStorage "github.com/pingcap/tidb/br/pkg/storage" - "github.com/pingcap/tiflow/engine/model" "github.com/pingcap/tiflow/engine/pkg/externalresource/internal" "github.com/stretchr/testify/require" ) -type mockExternalStorageFactory struct { - baseDir string - bucket string -} - -func newMockExternalStorageFactory(tempDir string, bucket string) *mockExternalStorageFactory { - return &mockExternalStorageFactory{ - baseDir: tempDir, - bucket: bucket, - } -} - -func (f *mockExternalStorageFactory) newS3ExternalStorageForScope( - ctx context.Context, scope internal.ResourceScope, -) (brStorage.ExternalStorage, error) { - uri := fmt.Sprintf("%s/%s", f.baseURI(), scope.BuildResPath()) - return f.newS3ExternalStorageFromURI(ctx, uri) -} - -func (f *mockExternalStorageFactory) newS3ExternalStorageFromURI( - ctx context.Context, - uri string, -) (brStorage.ExternalStorage, error) { - return brStorage.NewLocalStorage(uri) -} - -func (f *mockExternalStorageFactory) baseURI() string { - return fmt.Sprintf("%s/%s", f.baseDir, f.bucket) -} - -func (f *mockExternalStorageFactory) assertFileExists(t *testing.T, uri string) { - require.FileExists(t, filepath.Join(f.baseDir, uri)) -} - -func (f *mockExternalStorageFactory) assertFileNotExist(t *testing.T, uri string) { - require.NoFileExists(t, filepath.Join(f.baseDir, uri)) -} - -func newFileManagerForUT(t *testing.T) (*FileManager, *mockExternalStorageFactory) { - factory := newMockExternalStorageFactory(t.TempDir(), UtBucketName) - return NewFileManager( - MockExecutorID, - factory, - ), factory -} - -func newFileManagerForUTFromSharedStorageFactory( - executorID model.ExecutorID, factory *mockExternalStorageFactory, -) *FileManager { - return NewFileManager(executorID, factory) -} - func TestFileManagerCreateAndRemoveResource(t *testing.T) { t.Parallel() ctx := context.Background() - fm, factory := newFileManagerForUT(t) + fm, factory := NewFileManagerForUT(t.TempDir(), MockExecutorID) ident := internal.ResourceIdent{ ResourceScope: internal.ResourceScope{ @@ -115,7 +60,7 @@ func TestFileManagerCreateDuplicate(t *testing.T) { t.Parallel() ctx := context.Background() - fm, _ := newFileManagerForUT(t) + fm, _ := NewFileManagerForUT(t.TempDir(), MockExecutorID) ident := internal.ResourceIdent{ ResourceScope: internal.ResourceScope{ @@ -135,7 +80,7 @@ func TestFileManagerSetAndGetPersisted(t *testing.T) { t.Parallel() ctx := context.Background() - fm, _ := newFileManagerForUT(t) + fm, _ := NewFileManagerForUT(t.TempDir(), MockExecutorID) ident := internal.ResourceIdent{ ResourceScope: internal.ResourceScope{ @@ -168,7 +113,7 @@ func TestFileManagerDoublePersisted(t *testing.T) { t.Parallel() ctx := context.Background() - fm, _ := newFileManagerForUT(t) + fm, _ := NewFileManagerForUT(t.TempDir(), MockExecutorID) ident := internal.ResourceIdent{ ResourceScope: internal.ResourceScope{ @@ -197,7 +142,7 @@ func TestFileManagerRemoveTemporaryResources(t *testing.T) { t.Parallel() ctx := context.Background() - fm, factory := newFileManagerForUT(t) + fm, factory := NewFileManagerForUT(t.TempDir(), MockExecutorID) ident1 := internal.ResourceIdent{ ResourceScope: internal.ResourceScope{ diff --git a/engine/pkg/externalresource/internal/s3/file_path_util.go b/engine/pkg/externalresource/internal/s3/file_path_util.go index 4143ea74852..9bfb229f763 100644 --- a/engine/pkg/externalresource/internal/s3/file_path_util.go +++ b/engine/pkg/externalresource/internal/s3/file_path_util.go @@ -37,24 +37,24 @@ func getPathPredByName(target string) pathPredFunc { } func getPathPredByPersistedResources( - resources persistedResources, prefixCnt int, + resourcePaths map[string]struct{}, prefixCnt int, ) pathPredFunc { return func(path string) bool { - resName := "" + resPath := "" for i := 0; i < prefixCnt; i++ { prefix, newPath, ok := strings.Cut(path, "/") if !ok { return false } - if resName == "" { - resName = prefix + if resPath == "" { + resPath = prefix } else { - resName = fmt.Sprintf("%s/%s", resName, prefix) + resPath = fmt.Sprintf("%s/%s", resPath, prefix) } path = newPath } - _, ok := resources[resName] + _, ok := resourcePaths[resPath] return !ok } } diff --git a/engine/pkg/externalresource/internal/s3/resource_controller.go b/engine/pkg/externalresource/internal/s3/resource_controller.go index 865517c9080..3cfa0c65a40 100644 --- a/engine/pkg/externalresource/internal/s3/resource_controller.go +++ b/engine/pkg/externalresource/internal/s3/resource_controller.go @@ -36,6 +36,7 @@ type resourceController struct { // NewResourceController creates a new s3 resourceController. func NewResourceController(config resModel.S3Config) *resourceController { + log.Debug("create s3 resource controller", zap.Any("config", config)) fm := NewFileManagerWithConfig(defaultControllerID, config) return &resourceController{fm: fm} } diff --git a/engine/pkg/externalresource/internal/s3/resource_controller_test.go b/engine/pkg/externalresource/internal/s3/resource_controller_test.go index 8648c9e91bc..0fc4d0e147a 100644 --- a/engine/pkg/externalresource/internal/s3/resource_controller_test.go +++ b/engine/pkg/externalresource/internal/s3/resource_controller_test.go @@ -34,9 +34,25 @@ func TestS3ResourceController(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), caseTimeout) defer cancel() - fm, factory := newFileManagerForUT(t) + temproraryResNames := make([]resModel.ResourceName, numTemporaryResources) + for i := 0; i < numTemporaryResources; i++ { + resID := fmt.Sprintf("/s3/temporary-resource-%d", i) + _, resName, err := resModel.ParseResourceID(resID) + require.NoError(t, err) + temproraryResNames[i] = resName + } + + persistedResNames := make([]resModel.ResourceName, numPersistedResources) + persistedResMetas := []*resModel.ResourceMeta{} + for i := 0; i < numPersistedResources; i++ { + resID := fmt.Sprintf("/s3/persisted-resource-%d", i) + _, resName, err := resModel.ParseResourceID(resID) + require.NoError(t, err) + persistedResNames[i] = resName + } + + fm, factory := NewFileManagerForUT(t.TempDir(), MockExecutorID) workers := []string{"worker-1", "worker-2", "worker-3"} - persistedResources := []*resModel.ResourceMeta{} // generate mock data for _, worker := range workers { scope := internal.ResourceScope{ @@ -44,19 +60,10 @@ func TestS3ResourceController(t *testing.T) { WorkerID: worker, } - for i := 0; i < numTemporaryResources; i++ { - _, err := fm.CreateResource(ctx, internal.ResourceIdent{ - ResourceScope: scope, - Name: fmt.Sprintf("temp-resource-%d", i), - }) - require.NoError(t, err) - } - - for i := 0; i < numPersistedResources; i++ { - name := fmt.Sprintf("persisted-resource-%d", i) + for _, persistedResName := range persistedResNames { ident := internal.ResourceIdent{ ResourceScope: scope, - Name: name, + Name: persistedResName, } _, err := fm.CreateResource(ctx, ident) require.NoError(t, err) @@ -64,12 +71,20 @@ func TestS3ResourceController(t *testing.T) { err = fm.SetPersisted(ctx, ident) require.NoError(t, err) - persistedResources = append(persistedResources, &resModel.ResourceMeta{ - ID: "/s3/" + name, - Executor: ident.Executor, + persistedResMetas = append(persistedResMetas, &resModel.ResourceMeta{ + ID: resModel.BuildResourceID(resModel.ResourceTypeS3, persistedResName), + Executor: MockExecutorID, Worker: worker, }) } + + for _, tempResName := range temproraryResNames { + _, err := fm.CreateResource(ctx, internal.ResourceIdent{ + ResourceScope: scope, + Name: tempResName, + }) + require.NoError(t, err) + } } checkWorker := func(worker string, removed bool) { @@ -77,19 +92,19 @@ func TestS3ResourceController(t *testing.T) { Executor: MockExecutorID, WorkerID: worker, } - for i := 0; i < numPersistedResources; i++ { + for _, persistedResName := range persistedResNames { ident := internal.ResourceIdent{ ResourceScope: scope, - Name: fmt.Sprintf("persisted-resource-%d", i), + Name: persistedResName, } _, err := fm.GetPersistedResource(ctx, ident) require.NoError(t, err) } - for i := 0; i < numTemporaryResources; i++ { + for _, tempResName := range temproraryResNames { _, err := fm.GetPersistedResource(ctx, internal.ResourceIdent{ ResourceScope: scope, - Name: fmt.Sprintf("temp-resource-%d", i), + Name: tempResName, }) if removed { require.ErrorContains(t, err, "ResourceFilesNotFoundError") @@ -106,7 +121,7 @@ func TestS3ResourceController(t *testing.T) { fm1 := newFileManagerForUTFromSharedStorageFactory("leader-controller", factory) controller := &resourceController{fm: fm1} gcExecutor := func() { - err := controller.GCExecutor(ctx, persistedResources, MockExecutorID) + err := controller.GCExecutor(ctx, persistedResMetas, MockExecutorID) require.NoError(t, err) checkWorker(workers[0], true) checkWorker(workers[1], true) @@ -117,7 +132,7 @@ func TestS3ResourceController(t *testing.T) { gcExecutor() // test GCSingleResource - for _, res := range persistedResources { + for _, res := range persistedResMetas { _, resName, err := resModel.ParseResourceID(res.ID) require.NoError(t, err) ident := internal.ResourceIdent{ diff --git a/engine/pkg/externalresource/internal/s3/s3_dummy_resouce.go b/engine/pkg/externalresource/internal/s3/s3_dummy_resouce.go index 7bf6b8ad5df..14809174aa4 100644 --- a/engine/pkg/externalresource/internal/s3/s3_dummy_resouce.go +++ b/engine/pkg/externalresource/internal/s3/s3_dummy_resouce.go @@ -15,6 +15,7 @@ package s3 import ( "fmt" + "path" "github.com/pingcap/tiflow/engine/model" "github.com/pingcap/tiflow/engine/pkg/externalresource/internal" @@ -22,20 +23,21 @@ import ( ) const ( - // dummyJobID is a dummy job ID used for the s3 storage. - dummyJobID = "dummy-job-%s" // DummyWorkerID is a dummy worker ID used for the s3 storage. DummyWorkerID = "keep-alive-worker" // DummyResourceID is a dummy resource ID used for the s3 storage. DummyResourceID = "/s3/dummy" - // DummyResourceName is a dummy resource name used for the s3 storage. - DummyResourceName = "dummy" +) + +var ( + dummyJobID = "dummy-job-%s" + dummyResourceName = resModel.EncodeResourceName("dummy") ) // GetDummyIdent returns a dummy resource ident for testing. func GetDummyIdent(executorID model.ExecutorID) internal.ResourceIdent { return internal.ResourceIdent{ - Name: DummyResourceName, + Name: GetDummyResourceName(), ResourceScope: internal.ResourceScope{ Executor: executorID, WorkerID: DummyWorkerID, @@ -43,6 +45,11 @@ func GetDummyIdent(executorID model.ExecutorID) internal.ResourceIdent { } } +// GetDummyResourceName returns a dummy resource name for s3 storage. +func GetDummyResourceName() string { + return dummyResourceName +} + // GetDummyResourceKey returns a dummy resource key for s3 storage. func GetDummyResourceKey(executorID model.ExecutorID) resModel.ResourceKey { return resModel.ResourceKey{ @@ -55,3 +62,8 @@ func GetDummyResourceKey(executorID model.ExecutorID) resModel.ResourceKey { func GetDummyJobID(executorID model.ExecutorID) model.JobID { return fmt.Sprintf(dummyJobID, executorID) } + +// GetDummyResPath returns a file path located in dummy resource for s3 storage. +func GetDummyResPath(filename string) string { + return path.Join(DummyWorkerID, dummyResourceName, filename) +} diff --git a/engine/pkg/externalresource/internal/s3/s3_test_utils.go b/engine/pkg/externalresource/internal/s3/s3_test_utils.go index 465bb3ce1bb..1d2a9572397 100644 --- a/engine/pkg/externalresource/internal/s3/s3_test_utils.go +++ b/engine/pkg/externalresource/internal/s3/s3_test_utils.go @@ -14,10 +14,18 @@ package s3 import ( + "context" + "fmt" "os" + "path/filepath" + "testing" "github.com/pingcap/errors" brStorage "github.com/pingcap/tidb/br/pkg/storage" + "github.com/pingcap/tiflow/engine/model" + "github.com/pingcap/tiflow/engine/pkg/externalresource/internal" + resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" + "github.com/stretchr/testify/require" ) const ( @@ -56,3 +64,56 @@ func GetS3OptionsForUT() (*brStorage.S3BackendOptions, error) { ForcePathStyle: true, }, nil } + +type mockExternalStorageFactory struct { + baseDir string + bucket string +} + +func newMockExternalStorageFactory(tempDir string, bucket string) *mockExternalStorageFactory { + return &mockExternalStorageFactory{ + baseDir: tempDir, + bucket: bucket, + } +} + +func (f *mockExternalStorageFactory) newS3ExternalStorageForScope( + ctx context.Context, scope internal.ResourceScope, +) (brStorage.ExternalStorage, error) { + uri := fmt.Sprintf("%s/%s", f.baseURI(), scope.BuildResPath()) + return f.newS3ExternalStorageFromURI(ctx, uri) +} + +func (f *mockExternalStorageFactory) newS3ExternalStorageFromURI( + ctx context.Context, + uri string, +) (brStorage.ExternalStorage, error) { + return brStorage.NewLocalStorage(uri) +} + +func (f *mockExternalStorageFactory) baseURI() string { + return fmt.Sprintf("%s/%s", f.baseDir, f.bucket) +} + +func (f *mockExternalStorageFactory) assertFileExists(t *testing.T, uri string) { + require.FileExists(t, filepath.Join(f.baseDir, uri)) +} + +func (f *mockExternalStorageFactory) assertFileNotExist(t *testing.T, uri string) { + require.NoFileExists(t, filepath.Join(f.baseDir, uri)) +} + +// NewFileManagerForUT returns a file manager for UT. +func NewFileManagerForUT(tempDir string, executorID resModel.ExecutorID) (*FileManager, *mockExternalStorageFactory) { + factory := newMockExternalStorageFactory(tempDir, UtBucketName) + return NewFileManager( + executorID, + factory, + ), factory +} + +func newFileManagerForUTFromSharedStorageFactory( + executorID model.ExecutorID, factory *mockExternalStorageFactory, +) *FileManager { + return NewFileManager(MockExecutorID, factory) +} diff --git a/engine/pkg/externalresource/manager/gc_coordinator.go b/engine/pkg/externalresource/manager/gc_coordinator.go index f000a9bbf3c..ee85d860059 100644 --- a/engine/pkg/externalresource/manager/gc_coordinator.go +++ b/engine/pkg/externalresource/manager/gc_coordinator.go @@ -70,6 +70,7 @@ func (c *DefaultGCCoordinator) Run(ctx context.Context) error { default: } + // add unit test for initializeGC jobReceiver, executorReceiver, err := c.initializeGC(ctx) if err != nil { log.Warn("GC error", zap.Error(err)) @@ -189,7 +190,7 @@ func (c *DefaultGCCoordinator) gcByStatusSnapshots( return err } if tp == resModel.ResourceTypeLocalFile || - tp == resModel.ResourceTypeS3 && resName == s3.DummyResourceName { + tp == resModel.ResourceTypeS3 && resName == s3.GetDummyResourceName() { toGCExecutorSet[resMeta.Executor] = struct{}{} } continue diff --git a/engine/pkg/externalresource/manager/gc_runner_test.go b/engine/pkg/externalresource/manager/gc_runner_test.go index 0fa285d8600..2579414d8c9 100644 --- a/engine/pkg/externalresource/manager/gc_runner_test.go +++ b/engine/pkg/externalresource/manager/gc_runner_test.go @@ -353,7 +353,7 @@ func testGCExecutors(t *testing.T, helper *gcRunnerTestHelper) { tp, resName, err := resModel.ParseResourceID(meta.ID) require.NoError(t, err) require.Equal(t, resModel.ResourceTypeS3, tp) - require.NotEqual(t, s3.DummyResourceName, resName) + require.NotEqual(t, s3.GetDummyResourceName(), resName) } } diff --git a/engine/pkg/externalresource/model/config.go b/engine/pkg/externalresource/model/config.go index 9a3403fb902..617bd863738 100644 --- a/engine/pkg/externalresource/model/config.go +++ b/engine/pkg/externalresource/model/config.go @@ -17,6 +17,18 @@ import ( brStorage "github.com/pingcap/tidb/br/pkg/storage" ) +// DefaultConfig defines the default configuration for external storage +var DefaultConfig = Config{ + Local: LocalFileConfig{BaseDir: ""}, + S3: S3Config{ + S3BackendOptions: brStorage.S3BackendOptions{ + ForcePathStyle: true, + }, + Bucket: "", + Prefix: "", + }, +} + // Config defines configurations for an external storage resource type Config struct { Local LocalFileConfig `json:"local" toml:"local"` diff --git a/engine/pkg/externalresource/model/model.go b/engine/pkg/externalresource/model/model.go index fc57debbd47..1148b04288c 100644 --- a/engine/pkg/externalresource/model/model.go +++ b/engine/pkg/externalresource/model/model.go @@ -14,27 +14,37 @@ package model import ( + "encoding/hex" "fmt" "path" "strings" + "github.com/pingcap/log" pb "github.com/pingcap/tiflow/engine/enginepb" "github.com/pingcap/tiflow/engine/model" ormModel "github.com/pingcap/tiflow/engine/pkg/orm/model" "github.com/pingcap/tiflow/engine/pkg/tenant" "github.com/pingcap/tiflow/pkg/errors" + "go.uber.org/zap" ) type ( // WorkerID alias worker id string WorkerID = string - // ResourceID should be in the form of `//`, currently - // only local type is available. - ResourceID = string // JobID alias job id string JobID = model.JobID // ExecutorID alias model.ExecutorID ExecutorID = model.ExecutorID + + // ResourceType represents the type of the resource + ResourceType string + // ResourceID should be in the form of `//`, currently + // only local type is available. + ResourceID = string + // ResourceName is a string encoding raw resource name in hexadecimal. + // The raw resource name is the ResourceID with its type prefix removed. + // For example, the raw resource name of `/local/resource-1` is `resource-1`. + ResourceName = string ) // ResourceUpdateColumns is used in gorm update @@ -124,14 +134,7 @@ func (m *ResourceMeta) Map() map[string]interface{} { } } -// ResourceType represents the type of the resource -type ResourceType string - -// ResourceName is the ResourceID with its type prefix removed. -// For example, the resource name of `/local/resource-1` is `resource-1`. -type ResourceName = string - -// Define all supported resource types +// Define all supported resource types. const ( ResourceTypeLocalFile = ResourceType("local") ResourceTypeS3 = ResourceType("s3") @@ -165,10 +168,29 @@ func ParseResourceID(rpath ResourceID) (ResourceType, ResourceName, error) { } suffix := path.Join(segments[1:]...) - return resourceType, suffix, nil + return resourceType, EncodeResourceName(suffix), nil } // BuildResourceID returns an ResourceID based on given ResourceType and ResourceName. -func BuildResourceID(rtype ResourceType, name ResourceName) ResourceID { +func BuildResourceID(rtype ResourceType, resName ResourceName) ResourceID { + name, err := DecodeResourceName(resName) + if err != nil { + log.Panic("invalid resource name", zap.Error(err)) + } return path.Join("/"+string(rtype), name) } + +// EncodeResourceName encodes raw resource name to a valid resource name. +func EncodeResourceName(rawResName string) ResourceName { + resName := hex.EncodeToString([]byte(rawResName)) + return resName +} + +// DecodeResourceName decodes resource name to raw resource name. +func DecodeResourceName(resName ResourceName) (string, error) { + rawResName, err := hex.DecodeString(resName) + if err != nil { + return "", err + } + return string(rawResName), nil +} diff --git a/engine/pkg/externalresource/model/model_test.go b/engine/pkg/externalresource/model/model_test.go index 6c9bc7bfeb2..a844ddd6180 100644 --- a/engine/pkg/externalresource/model/model_test.go +++ b/engine/pkg/externalresource/model/model_test.go @@ -23,14 +23,29 @@ func TestParseResource(t *testing.T) { tp, suffix, err := ParseResourceID("/local/my-local-resource/a/b/c") require.NoError(t, err) require.Equal(t, ResourceTypeLocalFile, tp) - require.Equal(t, "my-local-resource/a/b/c", suffix) - + rawResName, err := DecodeResourceName(suffix) + require.NoError(t, err) + require.Equal(t, "my-local-resource/a/b/c", rawResName) require.Equal(t, "/local/my-local-resource/a/b/c", BuildResourceID(tp, suffix)) tp, suffix, err = ParseResourceID("/s3/my-local-resource/a/b/c") require.NoError(t, err) require.Equal(t, ResourceTypeS3, tp) - require.Equal(t, "my-local-resource/a/b/c", suffix) - + rawResName, err = DecodeResourceName(suffix) + require.NoError(t, err) + require.Equal(t, "my-local-resource/a/b/c", rawResName) require.Equal(t, "/s3/my-local-resource/a/b/c", BuildResourceID(tp, suffix)) } + +func FuzzEncodeResourceName(f *testing.F) { + testcases := []string{"resource-1", "resource-1/inner", "!resource-1+-%/*inner"} + for _, tc := range testcases { + f.Add(tc) + } + f.Fuzz(func(t *testing.T, rawResName string) { + resName := EncodeResourceName(rawResName) + decodedResName, err := DecodeResourceName(resName) + require.NoError(t, err) + require.Equal(t, rawResName, decodedResName) + }) +} diff --git a/engine/servermaster/config.go b/engine/servermaster/config.go index a51fff3dfa2..0cc5afa9f93 100644 --- a/engine/servermaster/config.go +++ b/engine/servermaster/config.go @@ -174,10 +174,7 @@ func GetDefaultMasterConfig() *Config { KeepAliveIntervalStr: defaultKeepAliveInterval, RPCTimeoutStr: defaultRPCTimeout, JobBackoff: jobop.NewDefaultBackoffConfig(), - Storage: resModel.Config{ - Local: resModel.LocalFileConfig{BaseDir: ""}, - S3: resModel.S3Config{Bucket: ""}, - }, + Storage: resModel.DefaultConfig, } } diff --git a/engine/test/integration_tests/dm_basic/run.sh b/engine/test/integration_tests/dm_basic/run.sh index a2f920acbd4..48ea53ff450 100644 --- a/engine/test/integration_tests/dm_basic/run.sh +++ b/engine/test/integration_tests/dm_basic/run.sh @@ -5,7 +5,7 @@ set -eu WORK_DIR=$OUT_DIR/$TEST_NAME CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -CONFIG="$DOCKER_COMPOSE_DIR/3m3e.yaml $DOCKER_COMPOSE_DIR/dm_databases.yaml" +CONFIG="$DOCKER_COMPOSE_DIR/3m3e_with_s3.yaml $DOCKER_COMPOSE_DIR/dm_databases.yaml" CONFIG=$(adjust_config $OUT_DIR $TEST_NAME $CONFIG) echo "using adjusted configs to deploy cluster: $CONFIG" diff --git a/engine/test/integration_tests/dm_case_sensitive/run.sh b/engine/test/integration_tests/dm_case_sensitive/run.sh index 0dac4e35984..9dc7c7d322b 100644 --- a/engine/test/integration_tests/dm_case_sensitive/run.sh +++ b/engine/test/integration_tests/dm_case_sensitive/run.sh @@ -5,7 +5,7 @@ set -eu CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) WORK_DIR=$OUT_DIR/$TEST_NAME -CONFIG="$DOCKER_COMPOSE_DIR/3m3e.yaml $DOCKER_COMPOSE_DIR/dm_databases.yaml" +CONFIG="$DOCKER_COMPOSE_DIR/3m3e_with_s3.yaml $DOCKER_COMPOSE_DIR/dm_databases.yaml" CONFIG=$(adjust_config $OUT_DIR $TEST_NAME $CONFIG) echo "using adjusted configs to deploy cluster: $CONFIG" TABLE_NUM=500 diff --git a/engine/test/integration_tests/dm_collation/run.sh b/engine/test/integration_tests/dm_collation/run.sh index f63fd221cdc..cd940d7e688 100644 --- a/engine/test/integration_tests/dm_collation/run.sh +++ b/engine/test/integration_tests/dm_collation/run.sh @@ -5,7 +5,7 @@ set -eu WORK_DIR=$OUT_DIR/$TEST_NAME CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -CONFIG="$DOCKER_COMPOSE_DIR/3m3e.yaml $DOCKER_COMPOSE_DIR/dm_databases.yaml" +CONFIG="$DOCKER_COMPOSE_DIR/3m3e_with_s3.yaml $DOCKER_COMPOSE_DIR/dm_databases.yaml" CONFIG=$(adjust_config $OUT_DIR $TEST_NAME $CONFIG) echo "using adjusted configs to deploy cluster: $CONFIG" TABLE_NUM=500 diff --git a/engine/test/integration_tests/dm_full_mode/run.sh b/engine/test/integration_tests/dm_full_mode/run.sh index 8dfd9b87d56..c7fba7894b3 100644 --- a/engine/test/integration_tests/dm_full_mode/run.sh +++ b/engine/test/integration_tests/dm_full_mode/run.sh @@ -5,7 +5,7 @@ set -eu WORK_DIR=$OUT_DIR/$TEST_NAME CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -CONFIG="$DOCKER_COMPOSE_DIR/3m3e.yaml $DOCKER_COMPOSE_DIR/dm_databases.yaml" +CONFIG="$DOCKER_COMPOSE_DIR/3m3e_with_s3.yaml $DOCKER_COMPOSE_DIR/dm_databases.yaml" CONFIG=$(adjust_config $OUT_DIR $TEST_NAME $CONFIG) echo "using adjusted configs to deploy cluster: $CONFIG" diff --git a/engine/test/integration_tests/dm_many_tables/run.sh b/engine/test/integration_tests/dm_many_tables/run.sh index 54d2d2702c2..a0072dc19e3 100644 --- a/engine/test/integration_tests/dm_many_tables/run.sh +++ b/engine/test/integration_tests/dm_many_tables/run.sh @@ -5,7 +5,7 @@ set -eu WORK_DIR=$OUT_DIR/$TEST_NAME CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -CONFIG="$DOCKER_COMPOSE_DIR/3m3e.yaml $DOCKER_COMPOSE_DIR/dm_databases.yaml" +CONFIG="$DOCKER_COMPOSE_DIR/3m3e_with_s3.yaml $DOCKER_COMPOSE_DIR/dm_databases.yaml" CONFIG=$(adjust_config $OUT_DIR $TEST_NAME $CONFIG) echo "using adjusted configs to deploy cluster: $CONFIG" TABLE_NUM=500 @@ -31,7 +31,7 @@ function run() { job_id=$(create_job "DM" "$CUR_DIR/conf/job.yaml" "dm_many_tables") # check progress is forwarded gradually, not jump to "finished" exec_with_retry --count 500 "curl \"http://127.0.0.1:10245/api/v1/jobs/$job_id/status\" | tee /dev/stderr | jq -e '.task_status.\"mysql-01\".status.status | .finishedBytes > 0 and .finishedBytes < .totalBytes'" - exec_with_retry --count 50 "curl \"http://127.0.0.1:10245/api/v1/jobs/$job_id\" | tee /dev/stderr | jq -e '.state == \"Finished\"'" + exec_with_retry --count 100 "curl \"http://127.0.0.1:10245/api/v1/jobs/$job_id\" | tee /dev/stderr | jq -e '.state == \"Finished\"'" # check data check_sync_diff $WORK_DIR $CUR_DIR/conf/diff_config.toml 1 diff --git a/engine/test/integration_tests/dm_new_collation_off/run.sh b/engine/test/integration_tests/dm_new_collation_off/run.sh index 37313623e57..683ba183345 100644 --- a/engine/test/integration_tests/dm_new_collation_off/run.sh +++ b/engine/test/integration_tests/dm_new_collation_off/run.sh @@ -5,7 +5,7 @@ set -eu WORK_DIR=$OUT_DIR/$TEST_NAME CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -CONFIG="$DOCKER_COMPOSE_DIR/3m3e.yaml $DOCKER_COMPOSE_DIR/dm_databases_tidb_new_collation_off.yaml" +CONFIG="$DOCKER_COMPOSE_DIR/3m3e_with_s3.yaml $DOCKER_COMPOSE_DIR/dm_databases_tidb_new_collation_off.yaml" CONFIG=$(adjust_config $OUT_DIR $TEST_NAME $CONFIG) echo "using adjusted configs to deploy cluster: $CONFIG" diff --git a/engine/test/integration_tests/dm_sql_mode/run.sh b/engine/test/integration_tests/dm_sql_mode/run.sh index f3f05031325..60cefa7daaf 100644 --- a/engine/test/integration_tests/dm_sql_mode/run.sh +++ b/engine/test/integration_tests/dm_sql_mode/run.sh @@ -5,7 +5,7 @@ set -eu WORK_DIR=$OUT_DIR/$TEST_NAME CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd) -CONFIG="$DOCKER_COMPOSE_DIR/3m3e.yaml $DOCKER_COMPOSE_DIR/dm_databases.yaml" +CONFIG="$DOCKER_COMPOSE_DIR/3m3e_with_s3.yaml $DOCKER_COMPOSE_DIR/dm_databases.yaml" CONFIG=$(adjust_config $OUT_DIR $TEST_NAME $CONFIG) echo "using adjusted configs to deploy cluster: $CONFIG" diff --git a/engine/test/utils/exec_with_retry b/engine/test/utils/exec_with_retry index 3356eea5492..9ea281e0c46 100755 --- a/engine/test/utils/exec_with_retry +++ b/engine/test/utils/exec_with_retry @@ -32,7 +32,7 @@ i=0 while ! eval $cmd; do i=$((i + 1)) if [ "$i" -gt "$count" ]; then - echo "failed to execute cmd: $cmd" + echo -e "\nfailed to execute cmd for $count times: $cmd\n" exit 2 fi sleep $interval_sec