Skip to content

Commit

Permalink
Fixing Delete Entity utils Test
Browse files Browse the repository at this point in the history
  • Loading branch information
jjoyce0510 committed Jun 27, 2022
1 parent 9dca2fa commit e79a12f
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -220,29 +220,73 @@ private void deleteReference(final Urn urn, final RelatedEntity relatedEntity) {
updatedAspect.get(), aspectSpec.getPegasusSchema(), path));
});

// If there has been an update
if (!updatedAspect.get().equals(aspect)) {
final MetadataChangeProposal proposal = new MetadataChangeProposal();
proposal.setEntityUrn(relatedUrn);
proposal.setChangeType(ChangeType.UPSERT);
proposal.setEntityType(relatedUrn.getEntityType());
proposal.setAspectName(aspectName);
proposal.setAspect(GenericRecordUtils.serializeAspect(updatedAspect.get()));

final AuditStamp auditStamp = new AuditStamp().setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());
final EntityService.IngestProposalResult ingestProposalResult = _entityService.ingestProposal(proposal, auditStamp);

if (!ingestProposalResult.isDidUpdate()) {
log.error("Failed to ingest aspect with references removed. Before {}, after: {}, please check MCP processor"
+ " logs for more information", aspect, updatedAspect);
handleError(new DeleteEntityServiceError("Failed to ingest new aspect",
DeleteEntityServiceErrorReason.MCP_PROCESSOR_FAILED,
ImmutableMap.of("proposal", proposal)));
// If there has been an update, then we produce an MCE.
if (!aspect.equals(updatedAspect.get())) {
if (updatedAspect.get() == null) {
// Then we should remove the aspect.
deleteAspect(relatedUrn, aspectName, aspect);
} else {
// Then we should update the aspect.
updateAspect(relatedUrn, aspectName, aspect, updatedAspect.get());
}
}
});
}

/**
* Delete an existing aspect for an urn.
*
* @param urn the urn of the entity to remove the aspect for
* @param aspectName the aspect to remove
* @param prevAspect the old value for the aspect
*/
private void deleteAspect(Urn urn, String aspectName, RecordTemplate prevAspect) {
final MetadataChangeProposal proposal = new MetadataChangeProposal();
proposal.setEntityUrn(urn);
proposal.setChangeType(ChangeType.DELETE);
proposal.setEntityType(urn.getEntityType());
proposal.setAspectName(aspectName);

final AuditStamp auditStamp = new AuditStamp().setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());
final EntityService.IngestProposalResult ingestProposalResult = _entityService.ingestProposal(proposal, auditStamp);

if (!ingestProposalResult.isDidUpdate()) {
log.error("Failed to ingest aspect with references removed. Before {}, after: null, please check MCP processor"
+ " logs for more information", prevAspect);
handleError(new DeleteEntityServiceError("Failed to ingest new aspect",
DeleteEntityServiceErrorReason.MCP_PROCESSOR_FAILED,
ImmutableMap.of("proposal", proposal)));
}
}

/**
* Update an aspect for an urn.
*
* @param urn the urn of the entity to remove the aspect for
* @param aspectName the aspect to remove
* @param prevAspect the old value for the aspect
* @param newAspect the new value for the aspect
*/
private void updateAspect(Urn urn, String aspectName, RecordTemplate prevAspect, RecordTemplate newAspect) {
final MetadataChangeProposal proposal = new MetadataChangeProposal();
proposal.setEntityUrn(urn);
proposal.setChangeType(ChangeType.UPSERT);
proposal.setEntityType(urn.getEntityType());
proposal.setAspectName(aspectName);
proposal.setAspect(GenericRecordUtils.serializeAspect(newAspect));

final AuditStamp auditStamp = new AuditStamp().setActor(UrnUtils.getUrn(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());
final EntityService.IngestProposalResult ingestProposalResult = _entityService.ingestProposal(proposal, auditStamp);

if (!ingestProposalResult.isDidUpdate()) {
log.error("Failed to ingest aspect with references removed. Before {}, after: {}, please check MCP processor"
+ " logs for more information", prevAspect, newAspect);
handleError(new DeleteEntityServiceError("Failed to ingest new aspect",
DeleteEntityServiceErrorReason.MCP_PROCESSOR_FAILED,
ImmutableMap.of("proposal", proposal)));
}
}


/**
* Utility method that attempts to find Aspect information as well as the associated path spec for a given urn that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public static Aspect getAspectWithReferenceRemoved(String value, RecordTemplate
try {
final DataMap copy = aspect.copy().data();
final DataComplex newValue = removeValueBasedOnPath(value, schema, copy, aspectPath.getPathComponents(), 0);
if (newValue == null) {
// If the new value is null, we should remove the aspect.
return null;
}
return new Aspect((DataMap) newValue);
} catch (CloneNotSupportedException e) {
return new Aspect();
Expand Down Expand Up @@ -106,6 +110,7 @@ private static DataComplex removeValueFromMap(String value, RecordDataSchema spe
if (canDelete) {
record.remove(pathComponents.get(index));
} else {
// If the field is required, then we need to remove the entire record (if possible)
return null;
}
} else {
Expand All @@ -126,6 +131,10 @@ private static DataComplex removeValueFromMap(String value, RecordDataSchema spe
record.remove(key);
} else if (record.size() == 1) {
return null;
} else {
// Not optional and not the only field, then this is a bad delete. Need to throw.
throw new UnsupportedOperationException(
String.format("Delete failed! Failed to field with name %s from DataMap. The field is required!", key));
}
} else {
record.put(key, result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,18 +73,12 @@ public void testNonOptionalFieldRemoval() {
+ "}");

final DataSchema schema = pdlSchemaParser.lookupName("simple_record");
final Aspect updatedAspect = DeleteEntityUtils.getAspectWithReferenceRemoved("hello", aspect, schema,
new PathSpec("key_a"));

assertTrue(updatedAspect.data().containsKey("key_a"));
assertEquals("hello", updatedAspect.data().get("key_a"));
assertTrue(updatedAspect.data().containsKey("key_b"));
assertEquals("world", updatedAspect.data().get("key_b"));
assertEquals(aspect, updatedAspect);
assertNull(DeleteEntityUtils.getAspectWithReferenceRemoved("hello", aspect, schema,
new PathSpec("key_a")));
}

/**
* Tests that Aspect Processor does not delete a non-optional value from a record referenced by another record.
* Tests that Aspect Processor deletes a required value from a record referenced by another record.
*/
@Test
public void testNestedFieldRemoval() {
Expand All @@ -98,15 +92,14 @@ public void testNestedFieldRemoval() {
+ "}");

pdlSchemaParser.parse("record complex_record {\n"
+ "key_c: simple_record\n"
+ "key_c: optional simple_record\n"
+ "}");

final DataSchema schema = pdlSchemaParser.lookupName("complex_record");
final Aspect updatedAspect = DeleteEntityUtils.getAspectWithReferenceRemoved("hello", aspect, schema,
new PathSpec("key_c", "key_a"));

assertTrue(updatedAspect.data().containsKey("key_c"));
assertEquals(aspect.data().get("key_c"), updatedAspect.data().get("key_c"));
assertFalse(updatedAspect.data().containsKey("key_c"));
}

/**
Expand Down

0 comments on commit e79a12f

Please sign in to comment.