diff --git a/deployments/engine/docker-compose/3m3e_with_s3.yaml b/deployments/engine/docker-compose/3m3e_with_s3.yaml index b8e53fa052c..a509a9a6034 100644 --- a/deployments/engine/docker-compose/3m3e_with_s3.yaml +++ b/deployments/engine/docker-compose/3m3e_with_s3.yaml @@ -75,7 +75,7 @@ services: ports: - "11241:10241" volumes: - - ./config/executor_with_s3.toml:/config.toml + - ./config/executor.toml:/config.toml - /tmp/tiflow_engine_test:/log command: - "/tiflow" @@ -99,7 +99,7 @@ services: ports: - "11242:10241" volumes: - - ./config/executor_with_s3.toml:/config.toml + - ./config/executor.toml:/config.toml - /tmp/tiflow_engine_test:/log command: - "/tiflow" @@ -123,7 +123,7 @@ services: ports: - "11243:10241" volumes: - - ./config/executor_with_s3.toml:/config.toml + - ./config/executor.toml:/config.toml - /tmp/tiflow_engine_test:/log command: - "/tiflow" diff --git a/deployments/engine/docker-compose/3m3e_with_tls.yaml b/deployments/engine/docker-compose/3m3e_with_tls.yaml index 02edc5f82e2..dac5bfe1a7d 100644 --- a/deployments/engine/docker-compose/3m3e_with_tls.yaml +++ b/deployments/engine/docker-compose/3m3e_with_tls.yaml @@ -75,7 +75,7 @@ services: ports: - "11241:10241" volumes: - - ./config/executor_with_s3.toml:/config.toml + - ./config/executor.toml:/config.toml - /tmp/tiflow_engine_test:/log command: - "/tiflow" @@ -101,7 +101,7 @@ services: ports: - "11242:10241" volumes: - - ./config/executor_with_s3.toml:/config.toml + - ./config/executor.toml:/config.toml - /tmp/tiflow_engine_test:/log command: - "/tiflow" @@ -127,7 +127,7 @@ services: ports: - "11243:10241" volumes: - - ./config/executor_with_s3.toml:/config.toml + - ./config/executor.toml:/config.toml - /tmp/tiflow_engine_test:/log command: - "/tiflow" diff --git a/deployments/engine/docker-compose/config/executor_with_s3.toml b/deployments/engine/docker-compose/config/executor_with_s3.toml deleted file mode 100644 index 82197a9d342..00000000000 --- a/deployments/engine/docker-compose/config/executor_with_s3.toml +++ /dev/null @@ -1,12 +0,0 @@ -keepalive-ttl = "20s" -keepalive-interval = "500ms" -session-ttl = 20 - -[log] -level = "debug" - -[storage.s3] -bucket = "engine-it" -endpoint = "http://minio-standalone:9000/" -access-key = "engine" -secret-access-key = "engineSecret" diff --git a/deployments/engine/helm/tiflow/values.yaml b/deployments/engine/helm/tiflow/values.yaml index d749d61c07d..668338ba4d2 100644 --- a/deployments/engine/helm/tiflow/values.yaml +++ b/deployments/engine/helm/tiflow/values.yaml @@ -45,12 +45,6 @@ executor: keepalive-interval = "500ms" session-ttl = 20 - [storage.s3] - bucket = "engine-it" - endpoint = "http://chaos-minio:9000/" - access-key = "engine" - secret-access-key = "engineSecret" - metastore: frameworkStorage: 5Gi businessStorage: 5Gi diff --git a/engine/enginepb/master.pb.go b/engine/enginepb/master.pb.go index ff31a523382..4e0acb6d83b 100644 --- a/engine/enginepb/master.pb.go +++ b/engine/enginepb/master.pb.go @@ -1655,6 +1655,99 @@ func (x *DeleteJobRequest) GetProjectId() string { return "" } +type QueryStorageConfigRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *QueryStorageConfigRequest) Reset() { + *x = QueryStorageConfigRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_engine_proto_master_proto_msgTypes[25] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *QueryStorageConfigRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*QueryStorageConfigRequest) ProtoMessage() {} + +func (x *QueryStorageConfigRequest) ProtoReflect() protoreflect.Message { + mi := &file_engine_proto_master_proto_msgTypes[25] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use QueryStorageConfigRequest.ProtoReflect.Descriptor instead. +func (*QueryStorageConfigRequest) Descriptor() ([]byte, []int) { + return file_engine_proto_master_proto_rawDescGZIP(), []int{25} +} + +type QueryStorageConfigResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Err *Error `protobuf:"bytes,1,opt,name=err,proto3" json:"err,omitempty"` + Config string `protobuf:"bytes,2,opt,name=config,proto3" json:"config,omitempty"` +} + +func (x *QueryStorageConfigResponse) Reset() { + *x = QueryStorageConfigResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_engine_proto_master_proto_msgTypes[26] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *QueryStorageConfigResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*QueryStorageConfigResponse) ProtoMessage() {} + +func (x *QueryStorageConfigResponse) ProtoReflect() protoreflect.Message { + mi := &file_engine_proto_master_proto_msgTypes[26] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use QueryStorageConfigResponse.ProtoReflect.Descriptor instead. +func (*QueryStorageConfigResponse) Descriptor() ([]byte, []int) { + return file_engine_proto_master_proto_rawDescGZIP(), []int{26} +} + +func (x *QueryStorageConfigResponse) GetErr() *Error { + if x != nil { + return x.Err + } + return nil +} + +func (x *QueryStorageConfigResponse) GetConfig() string { + if x != nil { + return x.Config + } + return "" +} + var File_engine_proto_master_proto protoreflect.FileDescriptor var file_engine_proto_master_proto_rawDesc = []byte{ @@ -1841,100 +1934,114 @@ var file_engine_proto_master_proto_rawDesc = []byte{ 0x1b, 0x0a, 0x09, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x65, 0x6e, 0x61, 0x6e, 0x74, 0x49, 0x64, 0x12, 0x1d, 0x0a, 0x0a, 0x70, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x09, 0x70, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x49, 0x64, 0x32, 0x99, 0x06, 0x0a, 0x09, - 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x12, 0x77, 0x0a, 0x10, 0x52, 0x65, 0x67, - 0x69, 0x73, 0x74, 0x65, 0x72, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x12, 0x21, 0x2e, - 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, - 0x72, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x12, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x45, 0x78, 0x65, 0x63, - 0x75, 0x74, 0x6f, 0x72, 0x22, 0x2c, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x26, 0x22, 0x1a, 0x2f, 0x61, - 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x73, 0x2f, - 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x3a, 0x08, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, - 0x6f, 0x72, 0x12, 0x6b, 0x0a, 0x0d, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, - 0x6f, 0x72, 0x73, 0x12, 0x1e, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x4c, - 0x69, 0x73, 0x74, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x4c, - 0x69, 0x73, 0x74, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x19, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x13, 0x12, 0x11, 0x2f, 0x61, - 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x73, 0x12, - 0x63, 0x0a, 0x0b, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x73, 0x12, 0x1c, - 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x61, - 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x65, - 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x61, 0x73, 0x74, - 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x17, 0x82, 0xd3, 0xe4, - 0x93, 0x02, 0x11, 0x12, 0x0f, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x6d, 0x61, 0x73, - 0x74, 0x65, 0x72, 0x73, 0x12, 0x46, 0x0a, 0x09, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, - 0x74, 0x12, 0x1a, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x48, 0x65, 0x61, - 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, - 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, - 0x61, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5e, 0x0a, 0x11, - 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x4d, 0x65, 0x74, 0x61, 0x53, 0x74, 0x6f, 0x72, - 0x65, 0x12, 0x22, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x67, - 0x69, 0x73, 0x74, 0x65, 0x72, 0x4d, 0x65, 0x74, 0x61, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, - 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x4d, 0x65, 0x74, 0x61, 0x53, 0x74, 0x6f, - 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x55, 0x0a, 0x0e, - 0x51, 0x75, 0x65, 0x72, 0x79, 0x4d, 0x65, 0x74, 0x61, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x12, 0x1f, - 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4d, - 0x65, 0x74, 0x61, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x20, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, - 0x4d, 0x65, 0x74, 0x61, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x22, 0x00, 0x12, 0x5c, 0x0a, 0x09, 0x47, 0x65, 0x74, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, - 0x12, 0x1a, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x4c, - 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x65, - 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x4c, 0x65, 0x61, 0x64, 0x65, - 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x16, 0x82, 0xd3, 0xe4, 0x93, 0x02, - 0x10, 0x12, 0x0e, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x6c, 0x65, 0x61, 0x64, 0x65, - 0x72, 0x12, 0x64, 0x0a, 0x0c, 0x52, 0x65, 0x73, 0x69, 0x67, 0x6e, 0x4c, 0x65, 0x61, 0x64, 0x65, - 0x72, 0x12, 0x1d, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x73, - 0x69, 0x67, 0x6e, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, - 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, - 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x1d, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x17, - 0x22, 0x15, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, - 0x2f, 0x72, 0x65, 0x73, 0x69, 0x67, 0x6e, 0x32, 0xbb, 0x01, 0x0a, 0x0d, 0x54, 0x61, 0x73, 0x6b, - 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x72, 0x12, 0x4f, 0x0a, 0x0c, 0x53, 0x63, 0x68, - 0x65, 0x64, 0x75, 0x6c, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x12, 0x1d, 0x2e, 0x65, 0x6e, 0x67, 0x69, - 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x54, 0x61, 0x73, - 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, - 0x65, 0x70, 0x62, 0x2e, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x54, 0x61, 0x73, 0x6b, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x59, 0x0a, 0x16, 0x52, 0x65, - 0x70, 0x6f, 0x72, 0x74, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x57, 0x6f, 0x72, 0x6b, - 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x1d, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, - 0x45, 0x78, 0x65, 0x63, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, - 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x45, - 0x78, 0x65, 0x63, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, - 0x6e, 0x73, 0x65, 0x22, 0x00, 0x32, 0xc1, 0x03, 0x0a, 0x0a, 0x4a, 0x6f, 0x62, 0x4d, 0x61, 0x6e, - 0x61, 0x67, 0x65, 0x72, 0x12, 0x51, 0x0a, 0x09, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4a, 0x6f, - 0x62, 0x12, 0x1a, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x43, 0x72, 0x65, - 0x61, 0x74, 0x65, 0x4a, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, - 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x4a, 0x6f, 0x62, 0x22, 0x19, 0x82, 0xd3, - 0xe4, 0x93, 0x02, 0x13, 0x22, 0x0c, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x6a, 0x6f, - 0x62, 0x73, 0x3a, 0x03, 0x6a, 0x6f, 0x62, 0x12, 0x4d, 0x0a, 0x06, 0x47, 0x65, 0x74, 0x4a, 0x6f, - 0x62, 0x12, 0x17, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, - 0x4a, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x65, 0x6e, 0x67, - 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x4a, 0x6f, 0x62, 0x22, 0x1b, 0x82, 0xd3, 0xe4, 0x93, 0x02, - 0x15, 0x12, 0x13, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x6a, 0x6f, 0x62, 0x73, 0x2f, - 0x7b, 0x69, 0x64, 0x3d, 0x2a, 0x7d, 0x12, 0x57, 0x0a, 0x08, 0x4c, 0x69, 0x73, 0x74, 0x4a, 0x6f, - 0x62, 0x73, 0x12, 0x19, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x4c, 0x69, - 0x73, 0x74, 0x4a, 0x6f, 0x62, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, - 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4a, 0x6f, 0x62, - 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x14, 0x82, 0xd3, 0xe4, 0x93, 0x02, - 0x0e, 0x12, 0x0c, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x6a, 0x6f, 0x62, 0x73, 0x12, - 0x5a, 0x0a, 0x09, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x4a, 0x6f, 0x62, 0x12, 0x1a, 0x2e, 0x65, - 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x4a, 0x6f, - 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, - 0x65, 0x70, 0x62, 0x2e, 0x4a, 0x6f, 0x62, 0x22, 0x22, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1c, 0x22, - 0x1a, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x6a, 0x6f, 0x62, 0x73, 0x2f, 0x7b, 0x69, - 0x64, 0x3d, 0x2a, 0x7d, 0x2f, 0x63, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x12, 0x5c, 0x0a, 0x09, 0x44, - 0x65, 0x6c, 0x65, 0x74, 0x65, 0x4a, 0x6f, 0x62, 0x12, 0x1a, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, - 0x65, 0x70, 0x62, 0x2e, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x4a, 0x6f, 0x62, 0x52, 0x65, 0x71, - 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, - 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x1b, 0x82, 0xd3, - 0xe4, 0x93, 0x02, 0x15, 0x2a, 0x13, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x6a, 0x6f, - 0x62, 0x73, 0x2f, 0x7b, 0x69, 0x64, 0x3d, 0x2a, 0x7d, 0x42, 0x2b, 0x5a, 0x29, 0x67, 0x69, 0x74, - 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x70, 0x69, 0x6e, 0x67, 0x63, 0x61, 0x70, 0x2f, - 0x74, 0x69, 0x66, 0x6c, 0x6f, 0x77, 0x2f, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x2f, 0x65, 0x6e, - 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x52, 0x09, 0x70, 0x72, 0x6f, 0x6a, 0x65, 0x63, 0x74, 0x49, 0x64, 0x22, 0x1b, 0x0a, 0x19, 0x51, + 0x75, 0x65, 0x72, 0x79, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x57, 0x0a, 0x1a, 0x51, 0x75, 0x65, 0x72, + 0x79, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x21, 0x0a, 0x03, 0x65, 0x72, 0x72, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x0b, 0x32, 0x0f, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x45, + 0x72, 0x72, 0x6f, 0x72, 0x52, 0x03, 0x65, 0x72, 0x72, 0x12, 0x16, 0x0a, 0x06, 0x63, 0x6f, 0x6e, + 0x66, 0x69, 0x67, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x63, 0x6f, 0x6e, 0x66, 0x69, + 0x67, 0x32, 0xfc, 0x06, 0x0a, 0x09, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x76, 0x65, 0x72, 0x79, 0x12, + 0x77, 0x0a, 0x10, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x45, 0x78, 0x65, 0x63, 0x75, + 0x74, 0x6f, 0x72, 0x12, 0x21, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x52, + 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, + 0x62, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x22, 0x2c, 0x82, 0xd3, 0xe4, 0x93, + 0x02, 0x26, 0x22, 0x1a, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x65, 0x78, 0x65, 0x63, + 0x75, 0x74, 0x6f, 0x72, 0x73, 0x2f, 0x72, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x3a, 0x08, + 0x65, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x12, 0x6b, 0x0a, 0x0d, 0x4c, 0x69, 0x73, 0x74, + 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, 0x72, 0x73, 0x12, 0x1e, 0x2e, 0x65, 0x6e, 0x67, 0x69, + 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, + 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1f, 0x2e, 0x65, 0x6e, 0x67, 0x69, + 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x45, 0x78, 0x65, 0x63, 0x75, 0x74, 0x6f, + 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x19, 0x82, 0xd3, 0xe4, 0x93, + 0x02, 0x13, 0x12, 0x11, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x65, 0x78, 0x65, 0x63, + 0x75, 0x74, 0x6f, 0x72, 0x73, 0x12, 0x63, 0x0a, 0x0b, 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x61, 0x73, + 0x74, 0x65, 0x72, 0x73, 0x12, 0x1c, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, + 0x4c, 0x69, 0x73, 0x74, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, + 0x73, 0x74, 0x1a, 0x1d, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x4c, 0x69, + 0x73, 0x74, 0x4d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x17, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x11, 0x12, 0x0f, 0x2f, 0x61, 0x70, 0x69, 0x2f, + 0x76, 0x31, 0x2f, 0x6d, 0x61, 0x73, 0x74, 0x65, 0x72, 0x73, 0x12, 0x46, 0x0a, 0x09, 0x48, 0x65, + 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x12, 0x1a, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, + 0x70, 0x62, 0x2e, 0x48, 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x48, + 0x65, 0x61, 0x72, 0x74, 0x62, 0x65, 0x61, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x00, 0x12, 0x5e, 0x0a, 0x11, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x4d, 0x65, + 0x74, 0x61, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x12, 0x22, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, + 0x70, 0x62, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x4d, 0x65, 0x74, 0x61, 0x53, + 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x23, 0x2e, 0x65, 0x6e, + 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x67, 0x69, 0x73, 0x74, 0x65, 0x72, 0x4d, + 0x65, 0x74, 0x61, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x22, 0x00, 0x12, 0x55, 0x0a, 0x0e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4d, 0x65, 0x74, 0x61, 0x53, + 0x74, 0x6f, 0x72, 0x65, 0x12, 0x1f, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, + 0x51, 0x75, 0x65, 0x72, 0x79, 0x4d, 0x65, 0x74, 0x61, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x20, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, + 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, 0x4d, 0x65, 0x74, 0x61, 0x53, 0x74, 0x6f, 0x72, 0x65, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x61, 0x0a, 0x12, 0x51, 0x75, 0x65, + 0x72, 0x79, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, + 0x23, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x51, 0x75, 0x65, 0x72, 0x79, + 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x1a, 0x24, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, + 0x51, 0x75, 0x65, 0x72, 0x79, 0x53, 0x74, 0x6f, 0x72, 0x61, 0x67, 0x65, 0x43, 0x6f, 0x6e, 0x66, + 0x69, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5c, 0x0a, 0x09, + 0x47, 0x65, 0x74, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x1a, 0x2e, 0x65, 0x6e, 0x67, 0x69, + 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, + 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1b, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, + 0x2e, 0x47, 0x65, 0x74, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x16, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x10, 0x12, 0x0e, 0x2f, 0x61, 0x70, 0x69, + 0x2f, 0x76, 0x31, 0x2f, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x64, 0x0a, 0x0c, 0x52, 0x65, + 0x73, 0x69, 0x67, 0x6e, 0x4c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x12, 0x1d, 0x2e, 0x65, 0x6e, 0x67, + 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x52, 0x65, 0x73, 0x69, 0x67, 0x6e, 0x4c, 0x65, 0x61, 0x64, + 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, 0x67, 0x6f, 0x6f, 0x67, + 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x22, 0x1d, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x17, 0x22, 0x15, 0x2f, 0x61, 0x70, 0x69, 0x2f, + 0x76, 0x31, 0x2f, 0x6c, 0x65, 0x61, 0x64, 0x65, 0x72, 0x2f, 0x72, 0x65, 0x73, 0x69, 0x67, 0x6e, + 0x32, 0xbb, 0x01, 0x0a, 0x0d, 0x54, 0x61, 0x73, 0x6b, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, + 0x65, 0x72, 0x12, 0x4f, 0x0a, 0x0c, 0x53, 0x63, 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x54, 0x61, + 0x73, 0x6b, 0x12, 0x1d, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x53, 0x63, + 0x68, 0x65, 0x64, 0x75, 0x6c, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x1e, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x53, 0x63, 0x68, + 0x65, 0x64, 0x75, 0x6c, 0x65, 0x54, 0x61, 0x73, 0x6b, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x22, 0x00, 0x12, 0x59, 0x0a, 0x16, 0x52, 0x65, 0x70, 0x6f, 0x72, 0x74, 0x45, 0x78, 0x65, + 0x63, 0x75, 0x74, 0x6f, 0x72, 0x57, 0x6f, 0x72, 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x12, 0x1d, 0x2e, + 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x57, 0x6f, 0x72, + 0x6b, 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1e, 0x2e, 0x65, + 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x45, 0x78, 0x65, 0x63, 0x57, 0x6f, 0x72, 0x6b, + 0x6c, 0x6f, 0x61, 0x64, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x32, 0xc1, + 0x03, 0x0a, 0x0a, 0x4a, 0x6f, 0x62, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x12, 0x51, 0x0a, + 0x09, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4a, 0x6f, 0x62, 0x12, 0x1a, 0x2e, 0x65, 0x6e, 0x67, + 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x43, 0x72, 0x65, 0x61, 0x74, 0x65, 0x4a, 0x6f, 0x62, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, + 0x62, 0x2e, 0x4a, 0x6f, 0x62, 0x22, 0x19, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x13, 0x22, 0x0c, 0x2f, + 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x6a, 0x6f, 0x62, 0x73, 0x3a, 0x03, 0x6a, 0x6f, 0x62, + 0x12, 0x4d, 0x0a, 0x06, 0x47, 0x65, 0x74, 0x4a, 0x6f, 0x62, 0x12, 0x17, 0x2e, 0x65, 0x6e, 0x67, + 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x47, 0x65, 0x74, 0x4a, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x1a, 0x0d, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x4a, + 0x6f, 0x62, 0x22, 0x1b, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x15, 0x12, 0x13, 0x2f, 0x61, 0x70, 0x69, + 0x2f, 0x76, 0x31, 0x2f, 0x6a, 0x6f, 0x62, 0x73, 0x2f, 0x7b, 0x69, 0x64, 0x3d, 0x2a, 0x7d, 0x12, + 0x57, 0x0a, 0x08, 0x4c, 0x69, 0x73, 0x74, 0x4a, 0x6f, 0x62, 0x73, 0x12, 0x19, 0x2e, 0x65, 0x6e, + 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4a, 0x6f, 0x62, 0x73, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, + 0x62, 0x2e, 0x4c, 0x69, 0x73, 0x74, 0x4a, 0x6f, 0x62, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x22, 0x14, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x0e, 0x12, 0x0c, 0x2f, 0x61, 0x70, 0x69, + 0x2f, 0x76, 0x31, 0x2f, 0x6a, 0x6f, 0x62, 0x73, 0x12, 0x5a, 0x0a, 0x09, 0x43, 0x61, 0x6e, 0x63, + 0x65, 0x6c, 0x4a, 0x6f, 0x62, 0x12, 0x1a, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, + 0x2e, 0x43, 0x61, 0x6e, 0x63, 0x65, 0x6c, 0x4a, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x0d, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x4a, 0x6f, 0x62, + 0x22, 0x22, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x1c, 0x22, 0x1a, 0x2f, 0x61, 0x70, 0x69, 0x2f, 0x76, + 0x31, 0x2f, 0x6a, 0x6f, 0x62, 0x73, 0x2f, 0x7b, 0x69, 0x64, 0x3d, 0x2a, 0x7d, 0x2f, 0x63, 0x61, + 0x6e, 0x63, 0x65, 0x6c, 0x12, 0x5c, 0x0a, 0x09, 0x44, 0x65, 0x6c, 0x65, 0x74, 0x65, 0x4a, 0x6f, + 0x62, 0x12, 0x1a, 0x2e, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x2e, 0x44, 0x65, 0x6c, + 0x65, 0x74, 0x65, 0x4a, 0x6f, 0x62, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x16, 0x2e, + 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, + 0x45, 0x6d, 0x70, 0x74, 0x79, 0x22, 0x1b, 0x82, 0xd3, 0xe4, 0x93, 0x02, 0x15, 0x2a, 0x13, 0x2f, + 0x61, 0x70, 0x69, 0x2f, 0x76, 0x31, 0x2f, 0x6a, 0x6f, 0x62, 0x73, 0x2f, 0x7b, 0x69, 0x64, 0x3d, + 0x2a, 0x7d, 0x42, 0x2b, 0x5a, 0x29, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, + 0x2f, 0x70, 0x69, 0x6e, 0x67, 0x63, 0x61, 0x70, 0x2f, 0x74, 0x69, 0x66, 0x6c, 0x6f, 0x77, 0x2f, + 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x2f, 0x65, 0x6e, 0x67, 0x69, 0x6e, 0x65, 0x70, 0x62, 0x62, + 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1950,97 +2057,102 @@ func file_engine_proto_master_proto_rawDescGZIP() []byte { } var file_engine_proto_master_proto_enumTypes = make([]protoimpl.EnumInfo, 3) -var file_engine_proto_master_proto_msgTypes = make([]protoimpl.MessageInfo, 26) +var file_engine_proto_master_proto_msgTypes = make([]protoimpl.MessageInfo, 28) var file_engine_proto_master_proto_goTypes = []interface{}{ - (Selector_Op)(0), // 0: enginepb.Selector.Op - (Job_Type)(0), // 1: enginepb.Job.Type - (Job_State)(0), // 2: enginepb.Job.State - (*Selector)(nil), // 3: enginepb.Selector - (*HeartbeatRequest)(nil), // 4: enginepb.HeartbeatRequest - (*HeartbeatResponse)(nil), // 5: enginepb.HeartbeatResponse - (*Executor)(nil), // 6: enginepb.Executor - (*RegisterExecutorRequest)(nil), // 7: enginepb.RegisterExecutorRequest - (*ListExecutorsRequest)(nil), // 8: enginepb.ListExecutorsRequest - (*ListExecutorsResponse)(nil), // 9: enginepb.ListExecutorsResponse - (*Master)(nil), // 10: enginepb.Master - (*ListMastersRequest)(nil), // 11: enginepb.ListMastersRequest - (*ListMastersResponse)(nil), // 12: enginepb.ListMastersResponse - (*ScheduleTaskRequest)(nil), // 13: enginepb.ScheduleTaskRequest - (*ScheduleTaskResponse)(nil), // 14: enginepb.ScheduleTaskResponse - (*ExecWorkload)(nil), // 15: enginepb.ExecWorkload - (*ExecWorkloadRequest)(nil), // 16: enginepb.ExecWorkloadRequest - (*ExecWorkloadResponse)(nil), // 17: enginepb.ExecWorkloadResponse - (*GetLeaderRequest)(nil), // 18: enginepb.GetLeaderRequest - (*GetLeaderResponse)(nil), // 19: enginepb.GetLeaderResponse - (*ResignLeaderRequest)(nil), // 20: enginepb.ResignLeaderRequest - (*Job)(nil), // 21: enginepb.Job - (*CreateJobRequest)(nil), // 22: enginepb.CreateJobRequest - (*GetJobRequest)(nil), // 23: enginepb.GetJobRequest - (*ListJobsRequest)(nil), // 24: enginepb.ListJobsRequest - (*ListJobsResponse)(nil), // 25: enginepb.ListJobsResponse - (*CancelJobRequest)(nil), // 26: enginepb.CancelJobRequest - (*DeleteJobRequest)(nil), // 27: enginepb.DeleteJobRequest - nil, // 28: enginepb.Executor.LabelsEntry - (*Error)(nil), // 29: enginepb.Error - (*ResourceKey)(nil), // 30: enginepb.ResourceKey - (*RegisterMetaStoreRequest)(nil), // 31: enginepb.RegisterMetaStoreRequest - (*QueryMetaStoreRequest)(nil), // 32: enginepb.QueryMetaStoreRequest - (*RegisterMetaStoreResponse)(nil), // 33: enginepb.RegisterMetaStoreResponse - (*QueryMetaStoreResponse)(nil), // 34: enginepb.QueryMetaStoreResponse - (*emptypb.Empty)(nil), // 35: google.protobuf.Empty + (Selector_Op)(0), // 0: enginepb.Selector.Op + (Job_Type)(0), // 1: enginepb.Job.Type + (Job_State)(0), // 2: enginepb.Job.State + (*Selector)(nil), // 3: enginepb.Selector + (*HeartbeatRequest)(nil), // 4: enginepb.HeartbeatRequest + (*HeartbeatResponse)(nil), // 5: enginepb.HeartbeatResponse + (*Executor)(nil), // 6: enginepb.Executor + (*RegisterExecutorRequest)(nil), // 7: enginepb.RegisterExecutorRequest + (*ListExecutorsRequest)(nil), // 8: enginepb.ListExecutorsRequest + (*ListExecutorsResponse)(nil), // 9: enginepb.ListExecutorsResponse + (*Master)(nil), // 10: enginepb.Master + (*ListMastersRequest)(nil), // 11: enginepb.ListMastersRequest + (*ListMastersResponse)(nil), // 12: enginepb.ListMastersResponse + (*ScheduleTaskRequest)(nil), // 13: enginepb.ScheduleTaskRequest + (*ScheduleTaskResponse)(nil), // 14: enginepb.ScheduleTaskResponse + (*ExecWorkload)(nil), // 15: enginepb.ExecWorkload + (*ExecWorkloadRequest)(nil), // 16: enginepb.ExecWorkloadRequest + (*ExecWorkloadResponse)(nil), // 17: enginepb.ExecWorkloadResponse + (*GetLeaderRequest)(nil), // 18: enginepb.GetLeaderRequest + (*GetLeaderResponse)(nil), // 19: enginepb.GetLeaderResponse + (*ResignLeaderRequest)(nil), // 20: enginepb.ResignLeaderRequest + (*Job)(nil), // 21: enginepb.Job + (*CreateJobRequest)(nil), // 22: enginepb.CreateJobRequest + (*GetJobRequest)(nil), // 23: enginepb.GetJobRequest + (*ListJobsRequest)(nil), // 24: enginepb.ListJobsRequest + (*ListJobsResponse)(nil), // 25: enginepb.ListJobsResponse + (*CancelJobRequest)(nil), // 26: enginepb.CancelJobRequest + (*DeleteJobRequest)(nil), // 27: enginepb.DeleteJobRequest + (*QueryStorageConfigRequest)(nil), // 28: enginepb.QueryStorageConfigRequest + (*QueryStorageConfigResponse)(nil), // 29: enginepb.QueryStorageConfigResponse + nil, // 30: enginepb.Executor.LabelsEntry + (*Error)(nil), // 31: enginepb.Error + (*ResourceKey)(nil), // 32: enginepb.ResourceKey + (*RegisterMetaStoreRequest)(nil), // 33: enginepb.RegisterMetaStoreRequest + (*QueryMetaStoreRequest)(nil), // 34: enginepb.QueryMetaStoreRequest + (*RegisterMetaStoreResponse)(nil), // 35: enginepb.RegisterMetaStoreResponse + (*QueryMetaStoreResponse)(nil), // 36: enginepb.QueryMetaStoreResponse + (*emptypb.Empty)(nil), // 37: google.protobuf.Empty } var file_engine_proto_master_proto_depIdxs = []int32{ 0, // 0: enginepb.Selector.op:type_name -> enginepb.Selector.Op - 29, // 1: enginepb.HeartbeatResponse.err:type_name -> enginepb.Error - 28, // 2: enginepb.Executor.labels:type_name -> enginepb.Executor.LabelsEntry + 31, // 1: enginepb.HeartbeatResponse.err:type_name -> enginepb.Error + 30, // 2: enginepb.Executor.labels:type_name -> enginepb.Executor.LabelsEntry 6, // 3: enginepb.RegisterExecutorRequest.executor:type_name -> enginepb.Executor 6, // 4: enginepb.ListExecutorsResponse.executors:type_name -> enginepb.Executor 10, // 5: enginepb.ListMastersResponse.masters:type_name -> enginepb.Master - 30, // 6: enginepb.ScheduleTaskRequest.resource_requirements:type_name -> enginepb.ResourceKey + 32, // 6: enginepb.ScheduleTaskRequest.resource_requirements:type_name -> enginepb.ResourceKey 3, // 7: enginepb.ScheduleTaskRequest.selectors:type_name -> enginepb.Selector 15, // 8: enginepb.ExecWorkloadRequest.workloads:type_name -> enginepb.ExecWorkload - 29, // 9: enginepb.ExecWorkloadResponse.err:type_name -> enginepb.Error + 31, // 9: enginepb.ExecWorkloadResponse.err:type_name -> enginepb.Error 1, // 10: enginepb.Job.type:type_name -> enginepb.Job.Type 2, // 11: enginepb.Job.state:type_name -> enginepb.Job.State - 29, // 12: enginepb.Job.error:type_name -> enginepb.Error + 31, // 12: enginepb.Job.error:type_name -> enginepb.Error 3, // 13: enginepb.Job.selectors:type_name -> enginepb.Selector 21, // 14: enginepb.CreateJobRequest.job:type_name -> enginepb.Job 21, // 15: enginepb.ListJobsResponse.jobs:type_name -> enginepb.Job - 7, // 16: enginepb.Discovery.RegisterExecutor:input_type -> enginepb.RegisterExecutorRequest - 8, // 17: enginepb.Discovery.ListExecutors:input_type -> enginepb.ListExecutorsRequest - 11, // 18: enginepb.Discovery.ListMasters:input_type -> enginepb.ListMastersRequest - 4, // 19: enginepb.Discovery.Heartbeat:input_type -> enginepb.HeartbeatRequest - 31, // 20: enginepb.Discovery.RegisterMetaStore:input_type -> enginepb.RegisterMetaStoreRequest - 32, // 21: enginepb.Discovery.QueryMetaStore:input_type -> enginepb.QueryMetaStoreRequest - 18, // 22: enginepb.Discovery.GetLeader:input_type -> enginepb.GetLeaderRequest - 20, // 23: enginepb.Discovery.ResignLeader:input_type -> enginepb.ResignLeaderRequest - 13, // 24: enginepb.TaskScheduler.ScheduleTask:input_type -> enginepb.ScheduleTaskRequest - 16, // 25: enginepb.TaskScheduler.ReportExecutorWorkload:input_type -> enginepb.ExecWorkloadRequest - 22, // 26: enginepb.JobManager.CreateJob:input_type -> enginepb.CreateJobRequest - 23, // 27: enginepb.JobManager.GetJob:input_type -> enginepb.GetJobRequest - 24, // 28: enginepb.JobManager.ListJobs:input_type -> enginepb.ListJobsRequest - 26, // 29: enginepb.JobManager.CancelJob:input_type -> enginepb.CancelJobRequest - 27, // 30: enginepb.JobManager.DeleteJob:input_type -> enginepb.DeleteJobRequest - 6, // 31: enginepb.Discovery.RegisterExecutor:output_type -> enginepb.Executor - 9, // 32: enginepb.Discovery.ListExecutors:output_type -> enginepb.ListExecutorsResponse - 12, // 33: enginepb.Discovery.ListMasters:output_type -> enginepb.ListMastersResponse - 5, // 34: enginepb.Discovery.Heartbeat:output_type -> enginepb.HeartbeatResponse - 33, // 35: enginepb.Discovery.RegisterMetaStore:output_type -> enginepb.RegisterMetaStoreResponse - 34, // 36: enginepb.Discovery.QueryMetaStore:output_type -> enginepb.QueryMetaStoreResponse - 19, // 37: enginepb.Discovery.GetLeader:output_type -> enginepb.GetLeaderResponse - 35, // 38: enginepb.Discovery.ResignLeader:output_type -> google.protobuf.Empty - 14, // 39: enginepb.TaskScheduler.ScheduleTask:output_type -> enginepb.ScheduleTaskResponse - 17, // 40: enginepb.TaskScheduler.ReportExecutorWorkload:output_type -> enginepb.ExecWorkloadResponse - 21, // 41: enginepb.JobManager.CreateJob:output_type -> enginepb.Job - 21, // 42: enginepb.JobManager.GetJob:output_type -> enginepb.Job - 25, // 43: enginepb.JobManager.ListJobs:output_type -> enginepb.ListJobsResponse - 21, // 44: enginepb.JobManager.CancelJob:output_type -> enginepb.Job - 35, // 45: enginepb.JobManager.DeleteJob:output_type -> google.protobuf.Empty - 31, // [31:46] is the sub-list for method output_type - 16, // [16:31] is the sub-list for method input_type - 16, // [16:16] is the sub-list for extension type_name - 16, // [16:16] is the sub-list for extension extendee - 0, // [0:16] is the sub-list for field type_name + 31, // 16: enginepb.QueryStorageConfigResponse.err:type_name -> enginepb.Error + 7, // 17: enginepb.Discovery.RegisterExecutor:input_type -> enginepb.RegisterExecutorRequest + 8, // 18: enginepb.Discovery.ListExecutors:input_type -> enginepb.ListExecutorsRequest + 11, // 19: enginepb.Discovery.ListMasters:input_type -> enginepb.ListMastersRequest + 4, // 20: enginepb.Discovery.Heartbeat:input_type -> enginepb.HeartbeatRequest + 33, // 21: enginepb.Discovery.RegisterMetaStore:input_type -> enginepb.RegisterMetaStoreRequest + 34, // 22: enginepb.Discovery.QueryMetaStore:input_type -> enginepb.QueryMetaStoreRequest + 28, // 23: enginepb.Discovery.QueryStorageConfig:input_type -> enginepb.QueryStorageConfigRequest + 18, // 24: enginepb.Discovery.GetLeader:input_type -> enginepb.GetLeaderRequest + 20, // 25: enginepb.Discovery.ResignLeader:input_type -> enginepb.ResignLeaderRequest + 13, // 26: enginepb.TaskScheduler.ScheduleTask:input_type -> enginepb.ScheduleTaskRequest + 16, // 27: enginepb.TaskScheduler.ReportExecutorWorkload:input_type -> enginepb.ExecWorkloadRequest + 22, // 28: enginepb.JobManager.CreateJob:input_type -> enginepb.CreateJobRequest + 23, // 29: enginepb.JobManager.GetJob:input_type -> enginepb.GetJobRequest + 24, // 30: enginepb.JobManager.ListJobs:input_type -> enginepb.ListJobsRequest + 26, // 31: enginepb.JobManager.CancelJob:input_type -> enginepb.CancelJobRequest + 27, // 32: enginepb.JobManager.DeleteJob:input_type -> enginepb.DeleteJobRequest + 6, // 33: enginepb.Discovery.RegisterExecutor:output_type -> enginepb.Executor + 9, // 34: enginepb.Discovery.ListExecutors:output_type -> enginepb.ListExecutorsResponse + 12, // 35: enginepb.Discovery.ListMasters:output_type -> enginepb.ListMastersResponse + 5, // 36: enginepb.Discovery.Heartbeat:output_type -> enginepb.HeartbeatResponse + 35, // 37: enginepb.Discovery.RegisterMetaStore:output_type -> enginepb.RegisterMetaStoreResponse + 36, // 38: enginepb.Discovery.QueryMetaStore:output_type -> enginepb.QueryMetaStoreResponse + 29, // 39: enginepb.Discovery.QueryStorageConfig:output_type -> enginepb.QueryStorageConfigResponse + 19, // 40: enginepb.Discovery.GetLeader:output_type -> enginepb.GetLeaderResponse + 37, // 41: enginepb.Discovery.ResignLeader:output_type -> google.protobuf.Empty + 14, // 42: enginepb.TaskScheduler.ScheduleTask:output_type -> enginepb.ScheduleTaskResponse + 17, // 43: enginepb.TaskScheduler.ReportExecutorWorkload:output_type -> enginepb.ExecWorkloadResponse + 21, // 44: enginepb.JobManager.CreateJob:output_type -> enginepb.Job + 21, // 45: enginepb.JobManager.GetJob:output_type -> enginepb.Job + 25, // 46: enginepb.JobManager.ListJobs:output_type -> enginepb.ListJobsResponse + 21, // 47: enginepb.JobManager.CancelJob:output_type -> enginepb.Job + 37, // 48: enginepb.JobManager.DeleteJob:output_type -> google.protobuf.Empty + 33, // [33:49] is the sub-list for method output_type + 17, // [17:33] is the sub-list for method input_type + 17, // [17:17] is the sub-list for extension type_name + 17, // [17:17] is the sub-list for extension extendee + 0, // [0:17] is the sub-list for field type_name } func init() { file_engine_proto_master_proto_init() } @@ -2352,6 +2464,30 @@ func file_engine_proto_master_proto_init() { return nil } } + file_engine_proto_master_proto_msgTypes[25].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*QueryStorageConfigRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_engine_proto_master_proto_msgTypes[26].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*QueryStorageConfigResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -2359,7 +2495,7 @@ func file_engine_proto_master_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_engine_proto_master_proto_rawDesc, NumEnums: 3, - NumMessages: 26, + NumMessages: 28, NumExtensions: 0, NumServices: 3, }, diff --git a/engine/enginepb/master_grpc.pb.go b/engine/enginepb/master_grpc.pb.go index b23fa633ce4..58c625d10aa 100644 --- a/engine/enginepb/master_grpc.pb.go +++ b/engine/enginepb/master_grpc.pb.go @@ -34,6 +34,7 @@ type DiscoveryClient interface { // QueryMetaStore queries metastore manager and returns // the information of a matching metastore QueryMetaStore(ctx context.Context, in *QueryMetaStoreRequest, opts ...grpc.CallOption) (*QueryMetaStoreResponse, error) + QueryStorageConfig(ctx context.Context, in *QueryStorageConfigRequest, opts ...grpc.CallOption) (*QueryStorageConfigResponse, error) GetLeader(ctx context.Context, in *GetLeaderRequest, opts ...grpc.CallOption) (*GetLeaderResponse, error) ResignLeader(ctx context.Context, in *ResignLeaderRequest, opts ...grpc.CallOption) (*emptypb.Empty, error) } @@ -100,6 +101,15 @@ func (c *discoveryClient) QueryMetaStore(ctx context.Context, in *QueryMetaStore return out, nil } +func (c *discoveryClient) QueryStorageConfig(ctx context.Context, in *QueryStorageConfigRequest, opts ...grpc.CallOption) (*QueryStorageConfigResponse, error) { + out := new(QueryStorageConfigResponse) + err := c.cc.Invoke(ctx, "/enginepb.Discovery/QueryStorageConfig", in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + func (c *discoveryClient) GetLeader(ctx context.Context, in *GetLeaderRequest, opts ...grpc.CallOption) (*GetLeaderResponse, error) { out := new(GetLeaderResponse) err := c.cc.Invoke(ctx, "/enginepb.Discovery/GetLeader", in, out, opts...) @@ -137,6 +147,7 @@ type DiscoveryServer interface { // QueryMetaStore queries metastore manager and returns // the information of a matching metastore QueryMetaStore(context.Context, *QueryMetaStoreRequest) (*QueryMetaStoreResponse, error) + QueryStorageConfig(context.Context, *QueryStorageConfigRequest) (*QueryStorageConfigResponse, error) GetLeader(context.Context, *GetLeaderRequest) (*GetLeaderResponse, error) ResignLeader(context.Context, *ResignLeaderRequest) (*emptypb.Empty, error) } @@ -163,6 +174,9 @@ func (UnimplementedDiscoveryServer) RegisterMetaStore(context.Context, *Register func (UnimplementedDiscoveryServer) QueryMetaStore(context.Context, *QueryMetaStoreRequest) (*QueryMetaStoreResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method QueryMetaStore not implemented") } +func (UnimplementedDiscoveryServer) QueryStorageConfig(context.Context, *QueryStorageConfigRequest) (*QueryStorageConfigResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method QueryStorageConfig not implemented") +} func (UnimplementedDiscoveryServer) GetLeader(context.Context, *GetLeaderRequest) (*GetLeaderResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method GetLeader not implemented") } @@ -289,6 +303,24 @@ func _Discovery_QueryMetaStore_Handler(srv interface{}, ctx context.Context, dec return interceptor(ctx, in, info, handler) } +func _Discovery_QueryStorageConfig_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(QueryStorageConfigRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(DiscoveryServer).QueryStorageConfig(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: "/enginepb.Discovery/QueryStorageConfig", + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(DiscoveryServer).QueryStorageConfig(ctx, req.(*QueryStorageConfigRequest)) + } + return interceptor(ctx, in, info, handler) +} + func _Discovery_GetLeader_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(GetLeaderRequest) if err := dec(in); err != nil { @@ -356,6 +388,10 @@ var Discovery_ServiceDesc = grpc.ServiceDesc{ MethodName: "QueryMetaStore", Handler: _Discovery_QueryMetaStore_Handler, }, + { + MethodName: "QueryStorageConfig", + Handler: _Discovery_QueryStorageConfig_Handler, + }, { MethodName: "GetLeader", Handler: _Discovery_GetLeader_Handler, diff --git a/engine/executor/config.go b/engine/executor/config.go index 8cdc80cd344..f0b4da2876b 100644 --- a/engine/executor/config.go +++ b/engine/executor/config.go @@ -15,16 +15,13 @@ package executor import ( "bytes" - "encoding/hex" "encoding/json" "fmt" - "path/filepath" "strings" "time" "github.com/BurntSushi/toml" "github.com/pingcap/log" - resModel "github.com/pingcap/tiflow/engine/pkg/externalresource/model" "github.com/pingcap/tiflow/pkg/errors" "github.com/pingcap/tiflow/pkg/label" "github.com/pingcap/tiflow/pkg/logutil" @@ -39,10 +36,8 @@ var ( defaultRPCTimeout = "3s" defaultMetricInterval = 15 * time.Second - defaultCapability int64 = 100 // TODO: make this configurable - defaultLocalStorageDirPrefix = "/tmp/dfe-storage/" - - defaultExecutorAddr = "127.0.0.1:10340" + defaultCapability int64 = 100 // TODO: make this configurable + defaultExecutorAddr = "127.0.0.1:10340" ) // Config is the configuration. @@ -64,8 +59,6 @@ type Config struct { KeepAliveIntervalStr string `toml:"keepalive-interval" json:"keepalive-interval"` RPCTimeoutStr string `toml:"rpc-timeout" json:"rpc-timeout"` - Storage resModel.Config `toml:"storage" json:"storage"` - KeepAliveTTL time.Duration `toml:"-" json:"-"` KeepAliveInterval time.Duration `toml:"-" json:"-"` RPCTimeout time.Duration `toml:"-" json:"-"` @@ -111,13 +104,6 @@ func (c *Config) configFromFile(path string) error { return nil } -func getDefaultLocalStorageDir(executorName string) string { - // Use hex encoding in case there are special characters in the - // executor name. - encodedExecutorName := hex.EncodeToString([]byte(executorName)) - return filepath.Join(defaultLocalStorageDirPrefix, encodedExecutorName) -} - // Adjust adjusts the executor configuration func (c *Config) Adjust() (err error) { if c.AdvertiseAddr == "" { @@ -128,10 +114,6 @@ func (c *Config) Adjust() (err error) { c.Name = fmt.Sprintf("executor-%s", c.AdvertiseAddr) } - if c.Storage.Local.BaseDir == "" { - c.Storage.Local.BaseDir = getDefaultLocalStorageDir(c.Name) - } - c.KeepAliveInterval, err = time.ParseDuration(c.KeepAliveIntervalStr) if err != nil { return @@ -169,6 +151,5 @@ func GetDefaultExecutorConfig() *Config { KeepAliveTTLStr: defaultKeepAliveTTL, KeepAliveIntervalStr: defaultKeepAliveInterval, RPCTimeoutStr: defaultRPCTimeout, - Storage: resModel.DefaultConfig, } } diff --git a/engine/executor/config_test.go b/engine/executor/config_test.go index 87a838bb67f..4a8917b1050 100644 --- a/engine/executor/config_test.go +++ b/engine/executor/config_test.go @@ -14,7 +14,6 @@ package executor import ( - "encoding/hex" "os" "testing" @@ -36,10 +35,8 @@ join = "127.0.0.1:10240" err = cfg.Adjust() require.NoError(t, err) - expectedPath := "/tmp/dfe-storage/" + hex.EncodeToString([]byte("executor-1")) require.Equal(t, "executor-1", cfg.Name) require.Equal(t, "0.0.0.0:10241", cfg.AdvertiseAddr) - require.Equal(t, expectedPath, cfg.Storage.Local.BaseDir) } func TestConfigDefaultLocalStoragePathNoName(t *testing.T) { @@ -56,9 +53,7 @@ join = "127.0.0.1:10240" err = cfg.Adjust() require.NoError(t, err) - expectedPath := "/tmp/dfe-storage/" + hex.EncodeToString([]byte("executor-0.0.0.0:10241")) require.Equal(t, "0.0.0.0:10241", cfg.AdvertiseAddr) - require.Equal(t, expectedPath, cfg.Storage.Local.BaseDir) } func TestConfigStorage(t *testing.T) { @@ -68,9 +63,6 @@ func TestConfigStorage(t *testing.T) { name = "executor-1" addr = "0.0.0.0:10241" join = "127.0.0.1:10240" - -[storage] -local.base-dir = "/tmp/my-base-dir" ` fileName := mustWriteToTempFile(t, testToml) cfg := GetDefaultExecutorConfig() @@ -78,8 +70,6 @@ local.base-dir = "/tmp/my-base-dir" require.NoError(t, err) err = cfg.Adjust() require.NoError(t, err) - - require.Equal(t, "/tmp/my-base-dir", cfg.Storage.Local.BaseDir) } func mustWriteToTempFile(t *testing.T, content string) (filePath string) { diff --git a/engine/executor/dm/worker.go b/engine/executor/dm/worker.go index c821f4e8559..bf89916fa2a 100644 --- a/engine/executor/dm/worker.go +++ b/engine/executor/dm/worker.go @@ -193,11 +193,17 @@ func (w *dmWorker) CloseImpl(ctx context.Context) { // setupStorage opens and configs external storage func (w *dmWorker) setupStorage(ctx context.Context) error { rid := dm.NewDMResourceID(w.cfg.Name, w.cfg.SourceID) - h, err := w.OpenStorage(ctx, rid) + opts := []broker.OpenStorageOption{} + if w.workerType == frameModel.WorkerDMDump { + // always use an empty storage for dumpling task + opts = append(opts, broker.WithCleanBeforeOpen()) + } + + h, err := w.OpenStorage(ctx, rid, opts...) for status.Code(err) == codes.Unavailable { w.Logger().Info("simple retry", zap.Error(err)) time.Sleep(time.Second) - h, err = w.OpenStorage(ctx, rid) + h, err = w.OpenStorage(ctx, rid, opts...) } if err != nil { return errors.Trace(err) diff --git a/engine/executor/server.go b/engine/executor/server.go index 43382154a78..d50380ea6b9 100644 --- a/engine/executor/server.go +++ b/engine/executor/server.go @@ -444,11 +444,7 @@ func (s *Server) Run(ctx context.Context) error { return err } - if err := broker.PreCheckConfig(s.cfg.Storage); err != nil { - return err - } - - s.resourceBroker, err = broker.NewBroker(&s.cfg.Storage, s.selfID, s.masterClient) + s.resourceBroker, err = broker.NewBroker(ctx, s.selfID, s.masterClient) if err != nil { return err } diff --git a/engine/framework/worker.go b/engine/framework/worker.go index 74e2eb21404..a527d891f78 100644 --- a/engine/framework/worker.go +++ b/engine/framework/worker.go @@ -122,7 +122,9 @@ type BaseWorker interface { SendMessage(ctx context.Context, topic p2p.Topic, message interface{}, nonblocking bool) error // OpenStorage creates a resource and return the resource handle - OpenStorage(ctx context.Context, resourcePath resModel.ResourceID) (broker.Handle, error) + OpenStorage( + ctx context.Context, resourcePath resModel.ResourceID, opts ...broker.OpenStorageOption, + ) (broker.Handle, error) // Exit should be called when worker (in user logic) wants to exit. // exitReason: ExitReasonFinished/ExitReasonCanceled/ExitReasonFailed @@ -505,10 +507,12 @@ func (w *DefaultBaseWorker) SendMessage( } // OpenStorage implements BaseWorker.OpenStorage -func (w *DefaultBaseWorker) OpenStorage(ctx context.Context, resourcePath resModel.ResourceID) (broker.Handle, error) { +func (w *DefaultBaseWorker) OpenStorage( + ctx context.Context, resourcePath resModel.ResourceID, opts ...broker.OpenStorageOption, +) (broker.Handle, error) { ctx, cancel := w.errCenter.WithCancelOnFirstError(ctx) defer cancel() - return w.resourceBroker.OpenStorage(ctx, w.projectInfo, w.id, w.masterID, resourcePath) + return w.resourceBroker.OpenStorage(ctx, w.projectInfo, w.id, w.masterID, resourcePath, opts...) } // Exit implements BaseWorker.Exit diff --git a/engine/pkg/client/client_mock.go b/engine/pkg/client/client_mock.go index 9d26630a49e..485ee6ba0da 100644 --- a/engine/pkg/client/client_mock.go +++ b/engine/pkg/client/client_mock.go @@ -200,6 +200,21 @@ func (mr *MockServerMasterClientMockRecorder) QueryResource(arg0, arg1 interface return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryResource", reflect.TypeOf((*MockServerMasterClient)(nil).QueryResource), arg0, arg1) } +// QueryStorageConfig mocks base method. +func (m *MockServerMasterClient) QueryStorageConfig(arg0 context.Context, arg1 *enginepb.QueryStorageConfigRequest) (*enginepb.QueryStorageConfigResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "QueryStorageConfig", arg0, arg1) + ret0, _ := ret[0].(*enginepb.QueryStorageConfigResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// QueryStorageConfig indicates an expected call of QueryStorageConfig. +func (mr *MockServerMasterClientMockRecorder) QueryStorageConfig(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueryStorageConfig", reflect.TypeOf((*MockServerMasterClient)(nil).QueryStorageConfig), arg0, arg1) +} + // RegisterExecutor mocks base method. func (m *MockServerMasterClient) RegisterExecutor(arg0 context.Context, arg1 *enginepb.RegisterExecutorRequest) (model.DeployNodeID, error) { m.ctrl.T.Helper() diff --git a/engine/pkg/client/discovery_client.go b/engine/pkg/client/discovery_client.go index 71e9e23c3a4..d91418cdfda 100644 --- a/engine/pkg/client/discovery_client.go +++ b/engine/pkg/client/discovery_client.go @@ -17,7 +17,7 @@ import ( "context" "github.com/pingcap/errors" - "github.com/pingcap/tiflow/engine/enginepb" + pb "github.com/pingcap/tiflow/engine/enginepb" "github.com/pingcap/tiflow/engine/model" "github.com/pingcap/tiflow/engine/pkg/client/internal" "github.com/pingcap/tiflow/pkg/retry" @@ -29,49 +29,55 @@ type DiscoveryClient interface { // will allocate and records a UUID. RegisterExecutor( ctx context.Context, - request *enginepb.RegisterExecutorRequest, + request *pb.RegisterExecutorRequest, ) (model.ExecutorID, error) // ListExecutors lists all executors. - ListExecutors(ctx context.Context) ([]*enginepb.Executor, error) + ListExecutors(ctx context.Context) ([]*pb.Executor, error) // ListMasters lists all masters. - ListMasters(ctx context.Context) ([]*enginepb.Master, error) + ListMasters(ctx context.Context) ([]*pb.Master, error) // Heartbeat sends a heartbeat message to the server. Heartbeat( ctx context.Context, - request *enginepb.HeartbeatRequest, - ) (*enginepb.HeartbeatResponse, error) + request *pb.HeartbeatRequest, + ) (*pb.HeartbeatResponse, error) // RegisterMetaStore registers a new metastore. // Deprecated RegisterMetaStore( ctx context.Context, - request *enginepb.RegisterMetaStoreRequest, + request *pb.RegisterMetaStoreRequest, ) error // QueryMetaStore queries the details of a metastore. QueryMetaStore( ctx context.Context, - request *enginepb.QueryMetaStoreRequest, - ) (*enginepb.QueryMetaStoreResponse, error) + request *pb.QueryMetaStoreRequest, + ) (*pb.QueryMetaStoreResponse, error) + + // QueryStorageConfig queries the storage config. + QueryStorageConfig( + ctx context.Context, + in *pb.QueryStorageConfigRequest, + ) (*pb.QueryStorageConfigResponse, error) } var _ DiscoveryClient = &discoveryClient{} type discoveryClient struct { - cli enginepb.DiscoveryClient + cli pb.DiscoveryClient } // NewDiscoveryClient returns a DiscoveryClient. -func NewDiscoveryClient(cli enginepb.DiscoveryClient) DiscoveryClient { +func NewDiscoveryClient(cli pb.DiscoveryClient) DiscoveryClient { return &discoveryClient{cli: cli} } func (c *discoveryClient) RegisterExecutor( ctx context.Context, - request *enginepb.RegisterExecutorRequest, + request *pb.RegisterExecutorRequest, ) (model.ExecutorID, error) { var ret model.ExecutorID err := retry.Do(ctx, func() error { @@ -95,8 +101,8 @@ func (c *discoveryClient) RegisterExecutor( return ret, nil } -func (c *discoveryClient) ListExecutors(ctx context.Context) ([]*enginepb.Executor, error) { - call := internal.NewCall(c.cli.ListExecutors, &enginepb.ListExecutorsRequest{}) +func (c *discoveryClient) ListExecutors(ctx context.Context) ([]*pb.Executor, error) { + call := internal.NewCall(c.cli.ListExecutors, &pb.ListExecutorsRequest{}) resp, err := call.Do(ctx) if err != nil { return nil, err @@ -104,8 +110,8 @@ func (c *discoveryClient) ListExecutors(ctx context.Context) ([]*enginepb.Execut return resp.Executors, nil } -func (c *discoveryClient) ListMasters(ctx context.Context) ([]*enginepb.Master, error) { - call := internal.NewCall(c.cli.ListMasters, &enginepb.ListMastersRequest{}) +func (c *discoveryClient) ListMasters(ctx context.Context) ([]*pb.Master, error) { + call := internal.NewCall(c.cli.ListMasters, &pb.ListMastersRequest{}) resp, err := call.Do(ctx) if err != nil { return nil, err @@ -119,8 +125,8 @@ func (c *discoveryClient) ListMasters(ctx context.Context) ([]*enginepb.Master, // TODO refactor this. func (c *discoveryClient) Heartbeat( ctx context.Context, - request *enginepb.HeartbeatRequest, -) (*enginepb.HeartbeatResponse, error) { + request *pb.HeartbeatRequest, +) (*pb.HeartbeatResponse, error) { call := internal.NewCall( c.cli.Heartbeat, request, @@ -131,7 +137,7 @@ func (c *discoveryClient) Heartbeat( func (c *discoveryClient) RegisterMetaStore( ctx context.Context, - request *enginepb.RegisterMetaStoreRequest, + request *pb.RegisterMetaStoreRequest, ) error { call := internal.NewCall( c.cli.RegisterMetaStore, @@ -140,7 +146,7 @@ func (c *discoveryClient) RegisterMetaStore( if err != nil { return err } - if resp.Err != nil && resp.Err.Code != enginepb.ErrorCode_None { + if resp.Err != nil && resp.Err.Code != pb.ErrorCode_None { return errors.Errorf("RegisterMetaStore: %s", resp.Err.Message) } return nil @@ -148,10 +154,20 @@ func (c *discoveryClient) RegisterMetaStore( func (c *discoveryClient) QueryMetaStore( ctx context.Context, - request *enginepb.QueryMetaStoreRequest, -) (*enginepb.QueryMetaStoreResponse, error) { + request *pb.QueryMetaStoreRequest, +) (*pb.QueryMetaStoreResponse, error) { call := internal.NewCall( c.cli.QueryMetaStore, request) return call.Do(ctx) } + +func (c *discoveryClient) QueryStorageConfig( + ctx context.Context, + request *pb.QueryStorageConfigRequest, +) (*pb.QueryStorageConfigResponse, error) { + call := internal.NewCall( + c.cli.QueryStorageConfig, + request) + return call.Do(ctx) +} diff --git a/engine/pkg/externalresource/broker/broker.go b/engine/pkg/externalresource/broker/broker.go index b6b2aed6d8a..d928d800b69 100644 --- a/engine/pkg/externalresource/broker/broker.go +++ b/engine/pkg/externalresource/broker/broker.go @@ -15,6 +15,7 @@ package broker import ( "context" + "encoding/json" "fmt" "time" @@ -68,8 +69,32 @@ type DefaultBroker struct { cancel context.CancelFunc } -// NewBroker creates a new Impl instance +// NewBroker creates a new Impl instance. func NewBroker( + ctx context.Context, + executorID resModel.ExecutorID, + client client.ServerMasterClient, +) (*DefaultBroker, error) { + resp, err := client.QueryStorageConfig(ctx, &pb.QueryStorageConfigRequest{}) + if err != nil || resp.Err != nil { + return nil, errors.New(fmt.Sprintf("query storage config failed: %v, %v", err, resp.Err)) + } + var storageConfig resModel.Config + err = json.Unmarshal([]byte(resp.Config), &storageConfig) + if err != nil { + return nil, errors.Trace(err) + } + + // validate and check config + storageConfig.ValidateAndAdjust(executorID) + if err := PreCheckConfig(storageConfig); err != nil { + return nil, err + } + return NewBrokerWithConfig(&storageConfig, executorID, client) +} + +// NewBrokerWithConfig creates a new Impl instance based on the given config. +func NewBrokerWithConfig( config *resModel.Config, executorID resModel.ExecutorID, client client.ResourceManagerClient, @@ -116,6 +141,7 @@ func (b *DefaultBroker) OpenStorage( workerID resModel.WorkerID, jobID resModel.JobID, resID resModel.ResourceID, + opts ...OpenStorageOption, ) (Handle, error) { // Note the semantics of PasreResourceID: // If resourceID is `/local/my-resource`, then tp == resModel.ResourceTypeLocalFile @@ -130,6 +156,11 @@ func (b *DefaultBroker) OpenStorage( log.Panic("unexpected resource type", zap.String("type", string(tp))) } + options := &openStorageOptions{} + for _, o := range opts { + o(options) + } + record, exists, err := b.checkForExistingResource(ctx, resModel.ResourceKey{JobID: jobID, ID: resID}) if err != nil { @@ -140,7 +171,7 @@ func (b *DefaultBroker) OpenStorage( if !exists { desc, err = b.createResource(ctx, fm, projectInfo, workerID, resName) } else { - desc, err = b.getPersistResource(ctx, fm, record, resName) + desc, err = b.getPersistResource(ctx, fm, record, resName, options) } if err != nil { return nil, err @@ -299,6 +330,7 @@ func (b *DefaultBroker) getPersistResource( ctx context.Context, fm internal.FileManager, record *resModel.ResourceMeta, resName resModel.ResourceName, + options *openStorageOptions, ) (internal.ResourceDescriptor, error) { ident := internal.ResourceIdent{ Name: resName, @@ -308,7 +340,22 @@ func (b *DefaultBroker) getPersistResource( WorkerID: record.Worker, /* creator id*/ }, } - return fm.GetPersistedResource(ctx, ident) + desc, err := fm.GetPersistedResource(ctx, ident) + if err != nil { + return nil, err + } + + if options.cleanBeforeOpen { + err := fm.RemoveResource(ctx, ident) + if err != nil { + return nil, err + } + desc, err = fm.CreateResource(ctx, ident) + if err != nil { + return nil, err + } + } + return desc, nil } func (b *DefaultBroker) createDummyS3Resource() error { diff --git a/engine/pkg/externalresource/broker/broker_integration_test.go b/engine/pkg/externalresource/broker/broker_integration_test.go index 7bd7857a5f5..d35bbc44828 100644 --- a/engine/pkg/externalresource/broker/broker_integration_test.go +++ b/engine/pkg/externalresource/broker/broker_integration_test.go @@ -86,7 +86,7 @@ func newBrokerForS3WithPrefix( CreatorWorkerId: s3.DummyWorkerID, }, mock.Anything).Return(nil) - broker, err := NewBroker(&resModel.Config{ + broker, err := NewBrokerWithConfig(&resModel.Config{ Local: resModel.LocalFileConfig{BaseDir: tmpDir}, S3: resModel.S3Config{ S3BackendOptions: *s3Cfg, diff --git a/engine/pkg/externalresource/broker/broker_test.go b/engine/pkg/externalresource/broker/broker_test.go index 03fa5da9b04..48de349aa1c 100644 --- a/engine/pkg/externalresource/broker/broker_test.go +++ b/engine/pkg/externalresource/broker/broker_test.go @@ -33,7 +33,7 @@ import ( func newBroker(t *testing.T) (*DefaultBroker, *manager.MockClient, string) { tmpDir := t.TempDir() cli := manager.NewMockClient() - broker, err := NewBroker(&resModel.Config{Local: resModel.LocalFileConfig{BaseDir: tmpDir}}, + broker, err := NewBrokerWithConfig(&resModel.Config{Local: resModel.LocalFileConfig{BaseDir: tmpDir}}, "executor-1", cli) require.NoError(t, err) @@ -102,12 +102,13 @@ func TestBrokerOpenExistingStorage(t *testing.T) { CreatorWorkerId: "worker-2", }, mock.Anything).Return(nil) + opts := []OpenStorageOption{} hdl, err := brk.OpenStorage( context.Background(), fakeProjectInfo, "worker-2", "job-1", - resID) + resID, opts...) require.NoError(t, err) err = hdl.Persist(context.Background()) diff --git a/engine/pkg/externalresource/broker/interfaces.go b/engine/pkg/externalresource/broker/interfaces.go index dd3fc7e646e..0d0596eac1d 100644 --- a/engine/pkg/externalresource/broker/interfaces.go +++ b/engine/pkg/externalresource/broker/interfaces.go @@ -33,6 +33,7 @@ type Broker interface { workerID resModel.WorkerID, jobID resModel.JobID, resourcePath resModel.ResourceID, + opts ...OpenStorageOption, ) (Handle, error) // OnWorkerClosed is called when a worker is closing. @@ -46,3 +47,17 @@ type Broker interface { Close() } + +type openStorageOptions struct { + cleanBeforeOpen bool +} + +// OpenStorageOption is an option for OpenStorage. +type OpenStorageOption func(*openStorageOptions) + +// WithCleanBeforeOpen indicates that the storage should be cleaned before open. +func WithCleanBeforeOpen() OpenStorageOption { + return func(opts *openStorageOptions) { + opts.cleanBeforeOpen = true + } +} diff --git a/engine/pkg/externalresource/broker/mock_broker.go b/engine/pkg/externalresource/broker/mock_broker.go index 1a60bc43f82..eb9de077be5 100644 --- a/engine/pkg/externalresource/broker/mock_broker.go +++ b/engine/pkg/externalresource/broker/mock_broker.go @@ -55,7 +55,7 @@ func NewBrokerForTesting(executorID resModel.ExecutorID) *MockBroker { } cfg := &resModel.Config{Local: resModel.LocalFileConfig{BaseDir: dir}} client := manager.NewMockClient() - broker, err := NewBroker(cfg, executorID, client) + broker, err := NewBrokerWithConfig(cfg, executorID, client) if err != nil { log.Panic("failed to create broker") } @@ -81,6 +81,7 @@ func (b *MockBroker) OpenStorage( workerID resModel.WorkerID, jobID resModel.JobID, resourcePath resModel.ResourceID, + opts ...OpenStorageOption, ) (Handle, error) { b.clientMu.Lock() defer b.clientMu.Unlock() diff --git a/engine/pkg/externalresource/integration_test/mock_cluster.go b/engine/pkg/externalresource/integration_test/mock_cluster.go index 68bcbd02d1e..064f7c56657 100644 --- a/engine/pkg/externalresource/integration_test/mock_cluster.go +++ b/engine/pkg/externalresource/integration_test/mock_cluster.go @@ -120,7 +120,7 @@ func (c *mockCluster) Stop() { func (c *mockCluster) AddBroker(id model.ExecutorID, baseDir string) { config := &resModel.Config{Local: resModel.LocalFileConfig{BaseDir: baseDir}} cli := &resourceClientStub{service: c.service} - brk, err := broker.NewBroker(config, id, cli) + brk, err := broker.NewBrokerWithConfig(config, id, cli) if err != nil { log.Panic("create broker failed", zap.Error(err)) } diff --git a/engine/pkg/externalresource/manager/gc_runner_test.go b/engine/pkg/externalresource/manager/gc_runner_test.go index 2579414d8c9..70db4f743e7 100644 --- a/engine/pkg/externalresource/manager/gc_runner_test.go +++ b/engine/pkg/externalresource/manager/gc_runner_test.go @@ -345,9 +345,8 @@ func testGCExecutors(t *testing.T, helper *gcRunnerTestHelper) { require.NotNil(t, res) } } - // TODO: fix bug, should use passed in executors - checkOffline := func(ctx context.Context, _ ...model.ExecutorID) { - metas, err := helper.Meta.QueryResourcesByExecutorIDs(ctx, "executor-1", "executor-2") + checkOffline := func(ctx context.Context, executors ...model.ExecutorID) { + metas, err := helper.Meta.QueryResourcesByExecutorIDs(ctx, executors...) require.NoError(t, err) for _, meta := range metas { tp, resName, err := resModel.ParseResourceID(meta.ID) @@ -389,7 +388,7 @@ func testGCExecutors(t *testing.T, helper *gcRunnerTestHelper) { checkOffline(ctx, "executor-1", "executor-2") checkAlive(ctx, "executor-3", "executor-never-offline") - helper.Runner.GCExecutors(ctx, "executor-2") + helper.Runner.GCExecutors(ctx, "executor-3") checkOffline(ctx, "executor-3") checkAlive(ctx, "executor-never-offline") } diff --git a/engine/pkg/externalresource/model/config.go b/engine/pkg/externalresource/model/config.go index 617bd863738..76015b7a6d2 100644 --- a/engine/pkg/externalresource/model/config.go +++ b/engine/pkg/externalresource/model/config.go @@ -14,9 +14,13 @@ package model import ( + "path/filepath" + brStorage "github.com/pingcap/tidb/br/pkg/storage" ) +const defaultLocalStorageDirPrefix = "/tmp/dfe-storage" + // DefaultConfig defines the default configuration for external storage var DefaultConfig = Config{ Local: LocalFileConfig{BaseDir: ""}, @@ -46,11 +50,23 @@ func (c *Config) S3Enabled() bool { c.S3.AccessKey != "" && c.S3.SecretAccessKey != "" } +// ValidateAndAdjust validates and adjusts the configuration +func (c *Config) ValidateAndAdjust(executorID ExecutorID) { + c.Local.validateAndAdjust(executorID) +} + // LocalFileConfig defines configurations for a local file based resource type LocalFileConfig struct { BaseDir string `json:"base-dir" toml:"base-dir"` } +func (c *LocalFileConfig) validateAndAdjust(executorID ExecutorID) { + if c.BaseDir == "" { + c.BaseDir = defaultLocalStorageDirPrefix + } + c.BaseDir = filepath.Join(c.BaseDir, string(executorID)) +} + // S3Config defines configurations for s3 based resources type S3Config struct { brStorage.S3BackendOptions diff --git a/engine/pkg/externalresource/model/config_test.go b/engine/pkg/externalresource/model/config_test.go new file mode 100644 index 00000000000..63f151208ae --- /dev/null +++ b/engine/pkg/externalresource/model/config_test.go @@ -0,0 +1,42 @@ +// Copyright 2022 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package model + +import ( + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestValidateAndAdjust(t *testing.T) { + dirs := []string{"", "/tmp/dfe-storage", "/var/engine/", "/a/b", "/a/b/c"} + for _, dir := range dirs { + cfg := Config{ + Local: LocalFileConfig{ + BaseDir: dir, + }, + } + if dir == "" { + dir = defaultLocalStorageDirPrefix + } + oldCfg := cfg + cfg.ValidateAndAdjust("test-executor") + require.Equal(t, oldCfg.S3, cfg.S3, "inputBaseDir: %s", dir) + require.NotEqual(t, oldCfg.Local, cfg.Local, "inputBaseDir: %s", dir) + + expected := filepath.Join(dir, "test-executor") + require.Equal(t, expected, cfg.Local.BaseDir, "inputBaseDir: %s", dir) + } +} diff --git a/engine/pkg/openapi/apiv1.swagger.json b/engine/pkg/openapi/apiv1.swagger.json index 9ad228f4b65..5590a025808 100644 --- a/engine/pkg/openapi/apiv1.swagger.json +++ b/engine/pkg/openapi/apiv1.swagger.json @@ -705,6 +705,17 @@ } } }, + "enginepbQueryStorageConfigResponse": { + "type": "object", + "properties": { + "err": { + "$ref": "#/definitions/enginepbError" + }, + "config": { + "type": "string" + } + } + }, "enginepbReadLinesResponse": { "type": "object", "properties": { diff --git a/engine/proto/master.proto b/engine/proto/master.proto index 4a4d2a0705d..b42beec42f7 100644 --- a/engine/proto/master.proto +++ b/engine/proto/master.proto @@ -47,6 +47,7 @@ service Discovery { // QueryMetaStore queries metastore manager and returns // the information of a matching metastore rpc QueryMetaStore(QueryMetaStoreRequest) returns(QueryMetaStoreResponse) {} + rpc QueryStorageConfig(QueryStorageConfigRequest) returns(QueryStorageConfigResponse) {} rpc GetLeader(GetLeaderRequest) returns(GetLeaderResponse) { option (google.api.http) = { @@ -274,3 +275,11 @@ message DeleteJobRequest { string tenant_id = 2; string project_id = 3; } + +message QueryStorageConfigRequest { +} + +message QueryStorageConfigResponse { + Error err = 1; + string config = 2; +} diff --git a/engine/servermaster/server.go b/engine/servermaster/server.go index 4eb593dce27..57e49826cf0 100644 --- a/engine/servermaster/server.go +++ b/engine/servermaster/server.go @@ -411,6 +411,24 @@ func (s *Server) QueryMetaStore( } } +// QueryStorageConfig implements gRPC interface +func (s *Server) QueryStorageConfig( + ctx context.Context, req *pb.QueryStorageConfigRequest, +) (*pb.QueryStorageConfigResponse, error) { + b, err := json.Marshal(s.cfg.Storage) + if err != nil { + return &pb.QueryStorageConfigResponse{ + Err: &pb.Error{ + Code: pb.ErrorCode_MetaStoreSerializeFail, + Message: fmt.Sprintf("raw storage config params: %v", s.cfg.Storage), + }, + }, nil + } + return &pb.QueryStorageConfigResponse{ + Config: string(b), + }, nil +} + // GetLeader implements DiscoveryServer.GetLeader. func (s *Server) GetLeader(_ context.Context, _ *pb.GetLeaderRequest) (*pb.GetLeaderResponse, error) { leaderAddr, ok := s.LeaderAddr()