diff --git a/pkg/controller/catalog/datacatalog/datacatalog.go b/pkg/controller/catalog/datacatalog/datacatalog.go index f8aa16883..5728536d9 100644 --- a/pkg/controller/catalog/datacatalog/datacatalog.go +++ b/pkg/controller/catalog/datacatalog/datacatalog.go @@ -22,7 +22,7 @@ import ( const ( taskVersionKey = "task-version" - taskExecKey = "execution-name" + wfExecNameKey = "execution-name" ) // This is the client that caches task executions to DataCatalog service. @@ -108,7 +108,7 @@ func (m *CatalogClient) Get(ctx context.Context, task *core.TaskTemplate, inputP dataset, err := m.getDataset(ctx, task) if err != nil { - logger.Errorf(ctx, "DataCatalog failed to get dataset for task %+v, err: %+v", task, err) + logger.Debugf(ctx, "DataCatalog failed to get dataset for task %+v, err: %+v", task.Id, err) return nil, err } @@ -120,7 +120,7 @@ func (m *CatalogClient) Get(ctx context.Context, task *core.TaskTemplate, inputP artifact, err := m.getArtifactByTag(ctx, tag, dataset) if err != nil { - logger.Errorf(ctx, "DataCatalog failed to get artifact by tag %+v, err: %+v", tag, err) + logger.Debugf(ctx, "DataCatalog failed to get artifact by tag %+v, err: %+v", tag, err) return nil, err } logger.Debugf(ctx, "Artifact found %v from tag %v", artifact, tag) @@ -131,7 +131,7 @@ func (m *CatalogClient) Get(ctx context.Context, task *core.TaskTemplate, inputP return nil, err } - logger.Debugf(ctx, "Cached %v artifact outputs from artifact %v", len(outputs.Literals), artifact.Id) + logger.Debugf(ctx, "Retrieved %v artifact outputs from artifact %v", len(outputs.Literals), artifact.Id) return outputs, nil } @@ -179,7 +179,7 @@ func (m *CatalogClient) Put(ctx context.Context, task *core.TaskTemplate, execID metadata := &datacatalog.Metadata{ KeyMap: map[string]string{ taskVersionKey: task.Id.Version, - taskExecKey: execID.NodeExecutionId.NodeId, + wfExecNameKey: execID.NodeExecutionId.ExecutionId.Name, }, } newDataset := &datacatalog.Dataset{ @@ -227,7 +227,7 @@ func (m *CatalogClient) Put(ctx context.Context, task *core.TaskTemplate, execID // Tag the artifact since it is the cached artifact tagName, err := transformer.GenerateArtifactTagName(ctx, inputs) if err != nil { - logger.Errorf(ctx, "Failed to create tag for artifact %+v, err: %+v", cachedArtifact.Id, err) + logger.Errorf(ctx, "Failed to generate tag for artifact %+v, err: %+v", cachedArtifact.Id, err) return err } logger.Debugf(ctx, "Created tag: %v, for task: %v", tagName, task.Id) @@ -241,7 +241,7 @@ func (m *CatalogClient) Put(ctx context.Context, task *core.TaskTemplate, execID _, err = m.client.AddTag(ctx, &datacatalog.AddTagRequest{Tag: tag}) if err != nil { if status.Code(err) == codes.AlreadyExists { - logger.Errorf(ctx, "Tag %v already exists for Artifact %v (idempotent)", tagName, cachedArtifact.Id) + logger.Warnf(ctx, "Tag %v already exists for Artifact %v (idempotent)", tagName, cachedArtifact.Id) } logger.Errorf(ctx, "Failed to add tag %+v for artifact %+v, err: %+v", tagName, cachedArtifact.Id, err)