-
Notifications
You must be signed in to change notification settings - Fork 4.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
introduce method to query jobs database by end timestamp of attempts #6505
Conversation
database.transaction(ctx -> { | ||
// override any other terminal statuses if we are now succeeded. | ||
updateJobStatus(ctx, jobId, JobStatus.SUCCEEDED, now); | ||
|
||
ctx.execute( | ||
"UPDATE attempts SET status = CAST(? as ATTEMPT_STATUS), updated_at = ? WHERE job_id = ? AND attempt_number = ?", | ||
"UPDATE attempts SET status = CAST(? as ATTEMPT_STATUS), updated_at = ? , ended_at = ? WHERE job_id = ? AND attempt_number = ?", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you create an issue to make sure this is standardised?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😬 damn. so we previously haven't been setting ended_at
? good catch.
return database.query(ctx -> getJobsFromResult(ctx | ||
.fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE " + | ||
"CAST(config_type AS VARCHAR) IN " + Sqls.toSqlInFragment(Sets.newHashSet(configType)) + " AND " + | ||
" attempts.ended_at > ? ORDER BY jobs.created_at ASC, attempts.created_at ASC", timeConvertedIntoLocalDateTime))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you can make your life a little easier here. In the other listJobs
method in this class the order by query is already there as a constant. See:
.fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE " + | ||
"CAST(config_type AS VARCHAR) IN " + Sqls.toSqlInFragment(Sets.newHashSet(configType)) + " AND " + | ||
" attempts.ended_at > ? ORDER BY jobs.created_at ASC, attempts.created_at ASC", timeConvertedIntoLocalDateTime))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE " + | |
"CAST(config_type AS VARCHAR) IN " + Sqls.toSqlInFragment(Sets.newHashSet(configType)) + " AND " + | |
" attempts.ended_at > ? ORDER BY jobs.created_at ASC, attempts.created_at ASC", timeConvertedIntoLocalDateTime))); | |
.fetch(BASE_JOB_SELECT_AND_JOIN + "WHERE " + | |
"CAST(config_type AS VARCHAR) = ? AND attempts.ended_at > ? " + | |
ORDER_BY_JOB_TIME_ATTEMPT_TIME, | |
configType, | |
timeConvertedIntoLocalDateTime))); |
This may not be 100% syntactically right, but I think it expresses what I mean.
database.transaction(ctx -> { | ||
// override any other terminal statuses if we are now succeeded. | ||
updateJobStatus(ctx, jobId, JobStatus.SUCCEEDED, now); | ||
|
||
ctx.execute( | ||
"UPDATE attempts SET status = CAST(? as ATTEMPT_STATUS), updated_at = ? WHERE job_id = ? AND attempt_number = ?", | ||
"UPDATE attempts SET status = CAST(? as ATTEMPT_STATUS), updated_at = ? , ended_at = ? WHERE job_id = ? AND attempt_number = ?", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😬 damn. so we previously haven't been setting ended_at
? good catch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think this comment (https://github.com/airbytehq/airbyte/pull/6505/files#r717770848) is still not addressed? otherwise looks good.
For billing I need to be able to query the jobs database by end timestamp of attempts.
Pre-merge Checklist
Expand the relevant checklist and delete the others.
New Connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/SUMMARY.md
docs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampledocs/integrations/README.md
airbyte-integrations/builds.md
Airbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing./publish
command described hereUpdating a connector
Community member or Airbyter
airbyte_secret
./gradlew :airbyte-integrations:connectors:<name>:integrationTest
.README.md
bootstrap.md
. See description and examplesdocs/integrations/<source or destination>/<name>.md
including changelog. See changelog exampleAirbyter
If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items.
/test connector=connectors/<name>
command is passing./publish
command described hereConnector Generator
-scaffold
in their name) have been updated with the latest scaffold by running./gradlew :airbyte-integrations:connector-templates:generator:testScaffoldTemplates
then checking in your changes