Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Updated Postgres indexer to handle out of order updates #212

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,19 @@ public PostgresIndexDAO(
@Override
public void indexWorkflow(WorkflowSummary workflow) {
String INSERT_WORKFLOW_INDEX_SQL =
"INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, status, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n"
"INSERT INTO workflow_index (workflow_id, correlation_id, workflow_type, start_time, update_time, status, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (workflow_id) \n"
+ "DO UPDATE SET correlation_id = EXCLUDED.correlation_id, workflow_type = EXCLUDED.workflow_type, "
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data";
+ "start_time = EXCLUDED.start_time, status = EXCLUDED.status, json_data = EXCLUDED.json_data "
+ "WHERE EXCLUDED.update_time > workflow_index.update_time";

if (onlyIndexOnStatusChange) {
INSERT_WORKFLOW_INDEX_SQL += " WHERE workflow_index.status != EXCLUDED.status";
INSERT_WORKFLOW_INDEX_SQL += " AND workflow_index.status != EXCLUDED.status";
}

TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(workflow.getUpdateTime());
Timestamp updateTime = Timestamp.from(Instant.from(updateTa));

TemporalAccessor ta = DateTimeFormatter.ISO_INSTANT.parse(workflow.getStartTime());
Timestamp startTime = Timestamp.from(Instant.from(ta));

Expand All @@ -102,6 +106,7 @@ public void indexWorkflow(WorkflowSummary workflow) {
.addParameter(workflow.getCorrelationId())
.addParameter(workflow.getWorkflowType())
.addParameter(startTime)
.addParameter(updateTime)
.addParameter(workflow.getStatus().toString())
.addJsonParameter(workflow)
.executeUpdate());
Expand Down Expand Up @@ -135,10 +140,11 @@ public void indexTask(TaskSummary task) {
"INSERT INTO task_index (task_id, task_type, task_def_name, status, start_time, update_time, workflow_type, json_data)"
+ "VALUES (?, ?, ?, ?, ?, ?, ?, ?::JSONB) ON CONFLICT (task_id) "
+ "DO UPDATE SET task_type = EXCLUDED.task_type, task_def_name = EXCLUDED.task_def_name, "
+ "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data";
+ "status = EXCLUDED.status, update_time = EXCLUDED.update_time, json_data = EXCLUDED.json_data "
+ "WHERE EXCLUDED.update_time > task_index.update_time";

if (onlyIndexOnStatusChange) {
INSERT_TASK_INDEX_SQL += " WHERE task_index.status != EXCLUDED.status";
INSERT_TASK_INDEX_SQL += " AND task_index.status != EXCLUDED.status";
}

TemporalAccessor updateTa = DateTimeFormatter.ISO_INSTANT.parse(task.getUpdateTime());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
ALTER TABLE workflow_index
ADD update_time TIMESTAMP WITH TIME ZONE NULL;

UPDATE workflow_index
SET update_time = to_timestamp(json_data->>'updateTime', 'YYYY-MM-DDTHH24:MI:SSZ')::timestamp WITH time zone;

ALTER TABLE workflow_index
ALTER COLUMN update_time SET NOT NULL;
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ private WorkflowSummary getMockWorkflowSummary(String id) {
wfs.setCorrelationId("correlation-id");
wfs.setWorkflowType("workflow-type");
wfs.setStartTime("2023-02-07T08:42:45Z");
wfs.setUpdateTime("2023-02-07T08:43:45Z");
wfs.setStatus(Workflow.WorkflowStatus.RUNNING);
return wfs;
}
Expand Down Expand Up @@ -142,13 +143,15 @@ public void testIndexWorkflowOnlyStatusChange() throws SQLException {

// Change the record, but not the status, and re-index
wfs.setCorrelationId("new-correlation-id");
wfs.setUpdateTime("2023-02-07T08:44:45Z");
indexDAO.indexWorkflow(wfs);

// retrieve the record, make sure it hasn't changed
checkWorkflow("workflow-id", "RUNNING", "correlation-id");

// Change the status and re-index
wfs.setStatus(Workflow.WorkflowStatus.FAILED);
wfs.setUpdateTime("2023-02-07T08:45:45Z");
indexDAO.indexWorkflow(wfs);

// retrieve the record, make sure it has changed
Expand All @@ -172,9 +175,10 @@ public void testIndexTaskOnlyStatusChange() throws SQLException {

// Change the status and re-index
ts.setStatus(Task.Status.FAILED);
ts.setUpdateTime("2023-02-07T10:43:45Z");
indexDAO.indexTask(ts);

// retrieve the record, make sure it has changed
checkTask("task-id", "FAILED", "2023-02-07 10:42:45.0");
checkTask("task-id", "FAILED", "2023-02-07 10:43:45.0");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ private WorkflowSummary getMockWorkflowSummary(String id) {
wfs.setCorrelationId("correlation-id");
wfs.setWorkflowType("workflow-type");
wfs.setStartTime("2023-02-07T08:42:45Z");
wfs.setUpdateTime("2023-02-07T08:43:45Z");
wfs.setStatus(Workflow.WorkflowStatus.COMPLETED);
return wfs;
}
Expand Down Expand Up @@ -173,7 +174,7 @@ private void compareTaskSummary(TaskSummary ts) throws SQLException {

@Test
public void testIndexNewWorkflow() throws SQLException {
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id");
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-new");

indexDAO.indexWorkflow(wfs);

Expand All @@ -182,22 +183,44 @@ public void testIndexNewWorkflow() throws SQLException {

@Test
public void testIndexExistingWorkflow() throws SQLException {
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id");
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-existing");

indexDAO.indexWorkflow(wfs);

compareWorkflowSummary(wfs);

wfs.setStatus(Workflow.WorkflowStatus.FAILED);
wfs.setUpdateTime("2023-02-07T08:44:45Z");

indexDAO.indexWorkflow(wfs);

compareWorkflowSummary(wfs);
}

@Test
public void testIndexExistingWorkflowWithOlderUpdateToEnsureItsNotIndexed()
throws SQLException {

WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-existing-no-index");

indexDAO.indexWorkflow(wfs);

compareWorkflowSummary(wfs);

// Set the update time to the past
wfs.setUpdateTime("2023-02-07T08:42:45Z");
wfs.setStatus(Workflow.WorkflowStatus.FAILED);

indexDAO.indexWorkflow(wfs);

// Reset the workflow to check it's not been updated
wfs = getMockWorkflowSummary("workflow-id-existing-no-index");
compareWorkflowSummary(wfs);
}

@Test
public void testIndexNewTask() throws SQLException {
TaskSummary ts = getMockTaskSummary("task-id");
TaskSummary ts = getMockTaskSummary("task-id-new");

indexDAO.indexTask(ts);

Expand All @@ -206,16 +229,36 @@ public void testIndexNewTask() throws SQLException {

@Test
public void testIndexExistingTask() throws SQLException {
TaskSummary ts = getMockTaskSummary("task-id");
TaskSummary ts = getMockTaskSummary("task-id-existing");

indexDAO.indexTask(ts);

compareTaskSummary(ts);

ts.setUpdateTime("2023-02-07T09:43:45Z");
ts.setStatus(Task.Status.FAILED);

indexDAO.indexTask(ts);

compareTaskSummary(ts);
}

@Test
public void testIndexExistingTaskWithOlderUpdateToEnsureItsNotIndexed() throws SQLException {
TaskSummary ts = getMockTaskSummary("task-id-exiting-no-update");

indexDAO.indexTask(ts);

compareTaskSummary(ts);

// Set the update time to the past
ts.setUpdateTime("2023-02-07T09:41:45Z");
ts.setStatus(Task.Status.FAILED);

indexDAO.indexTask(ts);

// Reset the task to check it's not been updated
ts = getMockTaskSummary("task-id-exiting-no-update");
compareTaskSummary(ts);
}

Expand Down Expand Up @@ -275,7 +318,7 @@ public void testFullTextSearchWorkflowSummary() {

@Test
public void testJsonSearchWorkflowSummary() {
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id");
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-summary");
wfs.setVersion(3);

indexDAO.indexWorkflow(wfs);
Expand All @@ -297,40 +340,40 @@ public void testJsonSearchWorkflowSummary() {
@Test
public void testSearchWorkflowSummaryPagination() {
for (int i = 0; i < 5; i++) {
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-" + i);
WorkflowSummary wfs = getMockWorkflowSummary("workflow-id-pagination-" + i);
indexDAO.indexWorkflow(wfs);
}

List<String> orderBy = Arrays.asList(new String[] {"workflowId:DESC"});
SearchResult<WorkflowSummary> results =
indexDAO.searchWorkflowSummary("", "*", 0, 2, orderBy);
indexDAO.searchWorkflowSummary("", "workflow-id-pagination*", 0, 2, orderBy);
assertEquals("Wrong totalHits returned", 3, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"workflow-id-4",
"workflow-id-pagination-4",
results.getResults().get(0).getWorkflowId());
assertEquals(
"Results returned in wrong order",
"workflow-id-3",
"workflow-id-pagination-3",
results.getResults().get(1).getWorkflowId());
results = indexDAO.searchWorkflowSummary("", "*", 2, 2, orderBy);
assertEquals("Wrong totalHits returned", 5, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"workflow-id-2",
"workflow-id-pagination-2",
results.getResults().get(0).getWorkflowId());
assertEquals(
"Results returned in wrong order",
"workflow-id-1",
"workflow-id-pagination-1",
results.getResults().get(1).getWorkflowId());
results = indexDAO.searchWorkflowSummary("", "*", 4, 2, orderBy);
assertEquals("Wrong totalHits returned", 7, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"workflow-id-0",
"workflow-id-pagination-0",
results.getResults().get(0).getWorkflowId());
}

Expand All @@ -351,7 +394,7 @@ public void testSearchTaskSummary() {
@Test
public void testSearchTaskSummaryPagination() {
for (int i = 0; i < 5; i++) {
TaskSummary ts = getMockTaskSummary("task-id-" + i);
TaskSummary ts = getMockTaskSummary("task-id-pagination-" + i);
indexDAO.indexTask(ts);
}

Expand All @@ -361,29 +404,29 @@ public void testSearchTaskSummaryPagination() {
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"task-id-4",
"task-id-pagination-4",
results.getResults().get(0).getTaskId());
assertEquals(
"Results returned in wrong order",
"task-id-3",
"task-id-pagination-3",
results.getResults().get(1).getTaskId());
results = indexDAO.searchTaskSummary("", "*", 2, 2, orderBy);
assertEquals("Wrong totalHits returned", 5, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"task-id-2",
"task-id-pagination-2",
results.getResults().get(0).getTaskId());
assertEquals(
"Results returned in wrong order",
"task-id-1",
"task-id-pagination-1",
results.getResults().get(1).getTaskId());
results = indexDAO.searchTaskSummary("", "*", 4, 2, orderBy);
assertEquals("Wrong totalHits returned", 7, results.getTotalHits());
assertEquals("Wrong number of results returned", 2, results.getResults().size());
assertEquals(
"Results returned in wrong order",
"task-id-0",
"task-id-pagination-0",
results.getResults().get(0).getTaskId());
}

Expand Down
Loading