Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bootstrap): create abstract class UpgradeStep to abstract away upgrade logic #5349

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package com.linkedin.metadata.boot;

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataMap;
import com.linkedin.entity.EntityResponse;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.key.DataHubUpgradeKey;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.upgrade.DataHubUpgradeRequest;
import com.linkedin.upgrade.DataHubUpgradeResult;
import java.net.URISyntaxException;
import java.util.Collections;
import lombok.extern.slf4j.Slf4j;


@Slf4j
public abstract class UpgradeStep implements BootstrapStep {
private static final Integer SLEEP_SECONDS = 120;

protected final EntityService _entityService;
private final String _version;
private final String _upgradeId;
private final Urn _upgradeUrn;

public UpgradeStep(EntityService entityService, String version, String upgradeId) {
this._entityService = entityService;
this._version = version;
this._upgradeId = upgradeId;
this._upgradeUrn = EntityKeyUtils.convertEntityKeyToUrn(new DataHubUpgradeKey().setId(upgradeId),
Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
}

@Override
public void execute() throws Exception {
String upgradeStepName = name();

log.info(String.format("Attempting to run %s Upgrade Step..", upgradeStepName));
log.info(String.format("Waiting %s seconds..", SLEEP_SECONDS));

if (hasUpgradeRan()) {
log.info(String.format("%s has run before for version %s. Skipping..", _upgradeId, _version));
return;
}

// Sleep to ensure deployment process finishes.
Thread.sleep(SLEEP_SECONDS * 1000);

try {
ingestUpgradeRequestAspect();
upgrade();
ingestUpgradeResultAspect();
} catch (Exception e) {
String errorMessage = String.format("Error when running %s for version %s", _upgradeId, _version);
cleanUpgradeAfterError(e, errorMessage);
throw new RuntimeException(errorMessage, e);
}
}

@Override
public String name() {
return this.getClass().getSimpleName();
}

public abstract void upgrade() throws Exception;

private boolean hasUpgradeRan() {
try {
EntityResponse response = _entityService.getEntityV2(Constants.DATA_HUB_UPGRADE_ENTITY_NAME, _upgradeUrn,
Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME));

if (response != null && response.getAspects().containsKey(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)) {
DataMap dataMap = response.getAspects().get(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME).getValue().data();
DataHubUpgradeRequest request = new DataHubUpgradeRequest(dataMap);
if (request.hasVersion() && request.getVersion().equals(_version)) {
return true;
}
}
} catch (Exception e) {
log.error("Error when checking to see if datahubUpgrade entity exists. Commencing with upgrade...", e);
return false;
}
return false;
}

private void ingestUpgradeRequestAspect() throws URISyntaxException {
final AuditStamp auditStamp =
new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());
final DataHubUpgradeRequest upgradeRequest =
new DataHubUpgradeRequest().setTimestampMs(System.currentTimeMillis()).setVersion(_version);

final MetadataChangeProposal upgradeProposal = new MetadataChangeProposal();
upgradeProposal.setEntityUrn(_upgradeUrn);
upgradeProposal.setEntityType(Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
upgradeProposal.setAspectName(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME);
upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(upgradeRequest));
upgradeProposal.setChangeType(ChangeType.UPSERT);

_entityService.ingestProposal(upgradeProposal, auditStamp);
}

private void ingestUpgradeResultAspect() throws URISyntaxException {
final AuditStamp auditStamp =
new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());
final DataHubUpgradeResult upgradeResult = new DataHubUpgradeResult().setTimestampMs(System.currentTimeMillis());

final MetadataChangeProposal upgradeProposal = new MetadataChangeProposal();
upgradeProposal.setEntityUrn(_upgradeUrn);
upgradeProposal.setEntityType(Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
upgradeProposal.setAspectName(Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME);
upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(upgradeResult));
upgradeProposal.setChangeType(ChangeType.UPSERT);

_entityService.ingestProposal(upgradeProposal, auditStamp);
}

private void cleanUpgradeAfterError(Exception e, String errorMessage) {
log.error(errorMessage, e);
_entityService.deleteUrn(_upgradeUrn);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,14 @@ protected BootstrapManager createInstance() {
final IngestDataPlatformsStep ingestDataPlatformsStep = new IngestDataPlatformsStep(_entityService);
final IngestDataPlatformInstancesStep ingestDataPlatformInstancesStep =
new IngestDataPlatformInstancesStep(_entityService, _migrationsDao);
final RestoreGlossaryIndices restoreGlossaryIndicesStep = new RestoreGlossaryIndices(_entityService, _entitySearchService, _entityRegistry);
final RestoreDbtSiblingsIndices
restoreDbtSiblingsIndices = new RestoreDbtSiblingsIndices(_entityService, _entityRegistry);
final RestoreGlossaryIndices restoreGlossaryIndicesStep =
new RestoreGlossaryIndices(_entityService, _entitySearchService, _entityRegistry);
final RestoreDbtSiblingsIndices restoreDbtSiblingsIndices =
new RestoreDbtSiblingsIndices(_entityService, _entityRegistry);
final RemoveClientIdAspectStep removeClientIdAspectStep = new RemoveClientIdAspectStep(_entityService);
return new BootstrapManager(ImmutableList.of(ingestRootUserStep, ingestPoliciesStep, ingestDataPlatformsStep,
ingestDataPlatformInstancesStep, _ingestRetentionPoliciesStep, restoreGlossaryIndicesStep, removeClientIdAspectStep, restoreDbtSiblingsIndices));
return new BootstrapManager(
ImmutableList.of(ingestRootUserStep, ingestPoliciesStep, ingestDataPlatformsStep,
ingestDataPlatformInstancesStep, _ingestRetentionPoliciesStep, restoreGlossaryIndicesStep,
removeClientIdAspectStep, restoreDbtSiblingsIndices));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,140 +2,84 @@

import com.linkedin.common.AuditStamp;
import com.linkedin.common.urn.Urn;
import com.linkedin.data.DataMap;
import com.linkedin.data.template.RecordTemplate;
import com.linkedin.entity.EntityResponse;
import com.linkedin.entity.EnvelopedAspectMap;
import com.linkedin.events.metadata.ChangeType;
import com.linkedin.glossary.GlossaryNodeInfo;
import com.linkedin.glossary.GlossaryTermInfo;
import com.linkedin.metadata.Constants;
import com.linkedin.metadata.boot.BootstrapStep;
import com.linkedin.metadata.boot.UpgradeStep;
import com.linkedin.metadata.entity.EntityService;
import com.linkedin.metadata.key.DataHubUpgradeKey;
import com.linkedin.metadata.models.AspectSpec;
import com.linkedin.metadata.models.registry.EntityRegistry;
import com.linkedin.metadata.search.EntitySearchService;
import com.linkedin.metadata.search.SearchEntity;
import com.linkedin.metadata.search.SearchResult;
import com.linkedin.metadata.utils.EntityKeyUtils;
import com.linkedin.metadata.utils.GenericRecordUtils;
import com.linkedin.mxe.MetadataChangeProposal;
import com.linkedin.upgrade.DataHubUpgradeRequest;
import com.linkedin.upgrade.DataHubUpgradeResult;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;

import javax.annotation.Nonnull;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.extern.slf4j.Slf4j;


@Slf4j
@RequiredArgsConstructor
public class RestoreGlossaryIndices implements BootstrapStep {
public class RestoreGlossaryIndices extends UpgradeStep {
private static final String VERSION = "1";
private static final String UPGRADE_ID = "restore-glossary-indices-ui";
private static final Urn GLOSSARY_UPGRADE_URN =
EntityKeyUtils.convertEntityKeyToUrn(new DataHubUpgradeKey().setId(UPGRADE_ID), Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
private static final Integer BATCH_SIZE = 1000;
private static final Integer SLEEP_SECONDS = 120;

private final EntityService _entityService;
private final EntitySearchService _entitySearchService;
private final EntityRegistry _entityRegistry;

@Override
public String name() {
return this.getClass().getSimpleName();
public RestoreGlossaryIndices(EntityService entityService, EntitySearchService entitySearchService,
EntityRegistry entityRegistry) {
super(entityService, VERSION, UPGRADE_ID);
_entitySearchService = entitySearchService;
_entityRegistry = entityRegistry;
}

@Nonnull
@Override
public ExecutionMode getExecutionMode() {
return ExecutionMode.ASYNC;
}

@Override
public void execute() throws Exception {
log.info("Attempting to run RestoreGlossaryIndices upgrade..");
log.info(String.format("Waiting %s seconds..", SLEEP_SECONDS));

// Sleep to ensure deployment process finishes.
Thread.sleep(SLEEP_SECONDS * 1000);

try {
EntityResponse response = _entityService.getEntityV2(
Constants.DATA_HUB_UPGRADE_ENTITY_NAME,
GLOSSARY_UPGRADE_URN,
Collections.singleton(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)
);
if (response != null && response.getAspects().containsKey(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME)) {
DataMap dataMap = response.getAspects().get(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME).getValue().data();
DataHubUpgradeRequest request = new DataHubUpgradeRequest(dataMap);
if (request.hasVersion() && request.getVersion().equals(VERSION)) {
log.info("Glossary Upgrade has run before with this version. Skipping");
return;
}
}

final AspectSpec termAspectSpec =
_entityRegistry.getEntitySpec(Constants.GLOSSARY_TERM_ENTITY_NAME).getAspectSpec(Constants.GLOSSARY_TERM_INFO_ASPECT_NAME);
final AspectSpec nodeAspectSpec =
_entityRegistry.getEntitySpec(Constants.GLOSSARY_NODE_ENTITY_NAME).getAspectSpec(Constants.GLOSSARY_NODE_INFO_ASPECT_NAME);
final AuditStamp auditStamp = new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());

final DataHubUpgradeRequest upgradeRequest = new DataHubUpgradeRequest().setTimestampMs(System.currentTimeMillis()).setVersion(VERSION);
ingestUpgradeAspect(Constants.DATA_HUB_UPGRADE_REQUEST_ASPECT_NAME, upgradeRequest, auditStamp);

final int totalTermsCount = getAndRestoreTermAspectIndices(0, auditStamp, termAspectSpec);
int termsCount = BATCH_SIZE;
while (termsCount < totalTermsCount) {
getAndRestoreTermAspectIndices(termsCount, auditStamp, termAspectSpec);
termsCount += BATCH_SIZE;
}

final int totalNodesCount = getAndRestoreNodeAspectIndices(0, auditStamp, nodeAspectSpec);
int nodesCount = BATCH_SIZE;
while (nodesCount < totalNodesCount) {
getAndRestoreNodeAspectIndices(nodesCount, auditStamp, nodeAspectSpec);
nodesCount += BATCH_SIZE;
}

final DataHubUpgradeResult upgradeResult = new DataHubUpgradeResult().setTimestampMs(System.currentTimeMillis());
ingestUpgradeAspect(Constants.DATA_HUB_UPGRADE_RESULT_ASPECT_NAME, upgradeResult, auditStamp);
public void upgrade() throws Exception {
final AspectSpec termAspectSpec = _entityRegistry.getEntitySpec(Constants.GLOSSARY_TERM_ENTITY_NAME)
.getAspectSpec(Constants.GLOSSARY_TERM_INFO_ASPECT_NAME);
final AspectSpec nodeAspectSpec = _entityRegistry.getEntitySpec(Constants.GLOSSARY_NODE_ENTITY_NAME)
.getAspectSpec(Constants.GLOSSARY_NODE_INFO_ASPECT_NAME);
final AuditStamp auditStamp =
new AuditStamp().setActor(Urn.createFromString(Constants.SYSTEM_ACTOR)).setTime(System.currentTimeMillis());

final int totalTermsCount = getAndRestoreTermAspectIndices(0, auditStamp, termAspectSpec);
int termsCount = BATCH_SIZE;
while (termsCount < totalTermsCount) {
getAndRestoreTermAspectIndices(termsCount, auditStamp, termAspectSpec);
termsCount += BATCH_SIZE;
}

log.info("Successfully restored glossary index");
} catch (Exception e) {
log.error("Error when running the RestoreGlossaryIndices Bootstrap Step", e);
_entityService.deleteUrn(GLOSSARY_UPGRADE_URN);
throw new RuntimeException("Error when running the RestoreGlossaryIndices Bootstrap Step", e);
final int totalNodesCount = getAndRestoreNodeAspectIndices(0, auditStamp, nodeAspectSpec);
int nodesCount = BATCH_SIZE;
while (nodesCount < totalNodesCount) {
getAndRestoreNodeAspectIndices(nodesCount, auditStamp, nodeAspectSpec);
nodesCount += BATCH_SIZE;
}
}

private void ingestUpgradeAspect(String aspectName, RecordTemplate aspect, AuditStamp auditStamp) {
final MetadataChangeProposal upgradeProposal = new MetadataChangeProposal();
upgradeProposal.setEntityUrn(GLOSSARY_UPGRADE_URN);
upgradeProposal.setEntityType(Constants.DATA_HUB_UPGRADE_ENTITY_NAME);
upgradeProposal.setAspectName(aspectName);
upgradeProposal.setAspect(GenericRecordUtils.serializeAspect(aspect));
upgradeProposal.setChangeType(ChangeType.UPSERT);

_entityService.ingestProposal(upgradeProposal, auditStamp);
@Nonnull
@Override
public ExecutionMode getExecutionMode() {
return ExecutionMode.ASYNC;
}

private int getAndRestoreTermAspectIndices(int start, AuditStamp auditStamp, AspectSpec termAspectSpec) throws Exception {
SearchResult termsResult = _entitySearchService.search(Constants.GLOSSARY_TERM_ENTITY_NAME, "", null, null, start, BATCH_SIZE);
private int getAndRestoreTermAspectIndices(int start, AuditStamp auditStamp, AspectSpec termAspectSpec)
throws Exception {
SearchResult termsResult =
_entitySearchService.search(Constants.GLOSSARY_TERM_ENTITY_NAME, "", null, null, start, BATCH_SIZE);
List<Urn> termUrns = termsResult.getEntities().stream().map(SearchEntity::getEntity).collect(Collectors.toList());
if (termUrns.size() == 0) {
return 0;
}
final Map<Urn, EntityResponse> termInfoResponses = _entityService.getEntitiesV2(
Constants.GLOSSARY_TERM_ENTITY_NAME,
new HashSet<>(termUrns),
final Map<Urn, EntityResponse> termInfoResponses =
_entityService.getEntitiesV2(Constants.GLOSSARY_TERM_ENTITY_NAME, new HashSet<>(termUrns),
Collections.singleton(Constants.GLOSSARY_TERM_INFO_ASPECT_NAME)
);

Expand Down