Skip to content

Commit

Permalink
15766: item sent event (#15920)
Browse files Browse the repository at this point in the history
* 15766: item sent event

* fixup! 15766: item sent event
  • Loading branch information
mkalish authored Sep 25, 2024
1 parent db9921c commit 51a8a01
Show file tree
Hide file tree
Showing 21 changed files with 328 additions and 55 deletions.
47 changes: 47 additions & 0 deletions prime-router/src/main/kotlin/azure/ActionHistory.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import com.microsoft.azure.functions.HttpRequestMessage
import com.microsoft.azure.functions.HttpResponseMessage
import com.microsoft.azure.functions.HttpStatusType
import com.networknt.org.apache.commons.validator.routines.InetAddressValidator
import fhirengine.engine.CustomFhirPathFunctions
import gov.cdc.prime.reportstream.shared.BlobUtils
import gov.cdc.prime.router.ActionLog
import gov.cdc.prime.router.ActionLogLevel
import gov.cdc.prime.router.ClientSource
Expand All @@ -25,11 +27,16 @@ import gov.cdc.prime.router.azure.db.tables.pojos.ItemLineage
import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile
import gov.cdc.prime.router.azure.db.tables.pojos.ReportLineage
import gov.cdc.prime.router.azure.db.tables.pojos.Task
import gov.cdc.prime.router.azure.observability.bundleDigest.BundleDigestExtractor
import gov.cdc.prime.router.azure.observability.bundleDigest.FhirPathBundleDigestLabResultExtractorStrategy
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventName
import gov.cdc.prime.router.azure.observability.event.ReportStreamEventProperties
import gov.cdc.prime.router.common.AzureHttpUtils.getSenderIP
import gov.cdc.prime.router.common.JacksonMapperUtilities
import gov.cdc.prime.router.fhirengine.translation.hl7.utils.CustomContext
import gov.cdc.prime.router.fhirengine.utils.FhirTranscoder
import gov.cdc.prime.router.report.ReportService
import io.ktor.http.HttpStatusCode
import org.apache.logging.log4j.kotlin.Logging
import org.jooq.impl.SQLDataType
Expand Down Expand Up @@ -565,6 +572,7 @@ class ActionHistory(
result: String,
header: WorkflowEngine.Header,
reportEventService: IReportStreamEventService,
reportService: ReportService,
transportType: String,
) {
if (isReportAlreadyTracked(sentReportId)) {
Expand Down Expand Up @@ -616,6 +624,45 @@ class ActionHistory(
)
}

val lineages = Report.createItemLineagesFromDb(header, sentReportId)
lineages?.forEach { itemLineage ->
val receiverFilterReportFile = reportService.getReportForItemAtTask(
itemLineage.parentReportId,
itemLineage.parentIndex,
TaskAction.receiver_filter
)
if (receiverFilterReportFile != null) {
val blob = BlobAccess.downloadBlob(
receiverFilterReportFile.bodyUrl,
BlobUtils.digestToString(receiverFilterReportFile.blobDigest)
)
val bundle = FhirTranscoder.decode(blob)
val bundleDigestExtractor = BundleDigestExtractor(
FhirPathBundleDigestLabResultExtractorStrategy(
CustomContext(
bundle,
bundle,
mutableMapOf(),
CustomFhirPathFunctions()
)
)
)
reportEventService.sendItemEvent(ReportStreamEventName.ITEM_SENT, reportFile, TaskAction.send) {
trackingId(bundle)
parentReportId(header.reportFile.reportId)
childItemIndex(itemLineage.childIndex)
params(
mapOf(
ReportStreamEventProperties.BUNDLE_DIGEST
to bundleDigestExtractor.generateDigest(bundle),
ReportStreamEventProperties.RECEIVER_NAME to receiver.fullName,
)
)
}
} else {
logger.error("No translate report found for sent item.")
}
}
reportsOut[reportFile.reportId] = reportFile
}

Expand Down
3 changes: 2 additions & 1 deletion prime-router/src/main/kotlin/azure/SendFunction.kt
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,8 @@ class SendFunction(
retryItems,
context,
actionHistory,
reportEventService
reportEventService,
workflowEngine.reportService
)
if (nextRetry != null) {
nextRetryItems += nextRetry
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ enum class ReportStreamEventName {
ITEM_ROUTED,
REPORT_LAST_MILE_FAILURE,
REPORT_NOT_PROCESSABLE,
ITEM_SENT,
}

/**
Expand Down
59 changes: 50 additions & 9 deletions prime-router/src/main/kotlin/history/db/ReportGraph.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import org.jooq.CommonTableExpression
import org.jooq.DSLContext
import org.jooq.Record
import org.jooq.Record1
import org.jooq.Record2
import org.jooq.SelectConditionStep
import org.jooq.SelectOnConditionStep
import org.jooq.impl.CustomRecord
import org.jooq.impl.CustomTable
Expand Down Expand Up @@ -213,6 +213,25 @@ class ReportGraph(
return descendantReportRecords(txn, cte, searchedForTaskActions).fetchInto(ReportFile::class.java)
}

/**
* Retrieves ancestor report from a [TaskAction] for a particular item.
*
* @param txn the transaction to run the DB access under
* @param childReportId the reportId to search for ancestors of
* @param childIndex the index of the child
* @param searchedForTaskAction the task action associated with the desired ancestor report
* @return The ancestor report for that particular action
*/
fun getAncestorReport(
txn: DataAccessTransaction,
childReportId: UUID,
childIndex: Int,
searchedForTaskAction: TaskAction,
): ReportFile? {
val cte = itemAncestorGraphCommonTableExpression(childReportId, childIndex)
return ancestorReportRecords(txn, cte, searchedForTaskAction).fetchOneInto(ReportFile::class.java)
}

/**
* Returns all the metadata rows associated with the passed in [ItemGraphRecord]
*
Expand Down Expand Up @@ -421,19 +440,15 @@ class ReportGraph(
*/
fun reportAncestorGraphCommonTableExpression(childReportIds: List<UUID>) =
DSL.name(lineageCteName).fields(
PARENT_REPORT_ID_FIELD,
PATH_FIELD
PARENT_REPORT_ID_FIELD
).`as`(
DSL.select(
REPORT_LINEAGE.PARENT_REPORT_ID,
REPORT_LINEAGE.CHILD_REPORT_ID.cast(SQLDataType.VARCHAR),
REPORT_LINEAGE.PARENT_REPORT_ID
).from(REPORT_LINEAGE)
.where(REPORT_LINEAGE.CHILD_REPORT_ID.`in`(childReportIds))
.unionAll(
DSL.select(
REPORT_LINEAGE.PARENT_REPORT_ID,
DSL.field("$lineageCteName.$PATH_FIELD", SQLDataType.VARCHAR)
.concat(REPORT_LINEAGE.PARENT_REPORT_ID)
REPORT_LINEAGE.PARENT_REPORT_ID
)
.from(REPORT_LINEAGE)
.join(DSL.table(DSL.name(lineageCteName)))
Expand All @@ -454,7 +469,7 @@ class ReportGraph(
*/
private fun rootReportRecords(
txn: DataAccessTransaction,
cte: CommonTableExpression<Record2<UUID, String>>,
cte: CommonTableExpression<Record1<UUID>>,
) = DSL.using(txn)
.withRecursive(cte)
.select(REPORT_FILE.asterisk())
Expand Down Expand Up @@ -520,4 +535,30 @@ class ReportGraph(

return select
}

/**
* Fetches all ancestor report records in a recursive manner.
*
* @param txn the data access transaction
* @param cte the common table expression for report lineage
* @return the descendant report records
*/
private fun ancestorReportRecords(
txn: DataAccessTransaction,
cte: CommonTableExpression<ItemGraphRecord>,
searchedForTaskAction: TaskAction,
): SelectConditionStep<Record> {
val select = DSL.using(txn)
.withRecursive(cte)
.select(REPORT_FILE.asterisk())
.distinctOn(REPORT_FILE.REPORT_ID)
.from(cte)
.join(REPORT_FILE)
.on(REPORT_FILE.REPORT_ID.eq(ItemGraphTable.ITEM_GRAPH.PARENT_REPORT_ID))
.join(ACTION)
.on(ACTION.ACTION_ID.eq(REPORT_FILE.ACTION_ID))
.where(ACTION.ACTION_NAME.eq(searchedForTaskAction))

return select
}
}
19 changes: 18 additions & 1 deletion prime-router/src/main/kotlin/report/ReportService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package gov.cdc.prime.router.report

import gov.cdc.prime.router.ReportId
import gov.cdc.prime.router.azure.DatabaseAccess
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.db.tables.pojos.ReportFile
import gov.cdc.prime.router.common.BaseEngine
import gov.cdc.prime.router.history.db.ReportGraph
Expand Down Expand Up @@ -47,7 +48,23 @@ class ReportService(
* @return List of ReportFile objects of the root reports
*/
fun getRootReports(childReportId: ReportId): List<ReportFile> {
return reportGraph.getRootReports(childReportId)
return reportGraph.getRootReports(childReportId).distinctBy { it.reportId }
}

/**
* Accepts a descendant item (report id and index) and finds the ancestor report associated with the
* passed [TaskAction]
*
* @param childReportId the descendant child report
* @param childIndex the index of the item
* @param task the particular task to find the ancestor report for
*
* @return the [ReportFile] ancestor at the passed [TaskAction]
*/
fun getReportForItemAtTask(childReportId: ReportId, childIndex: Int, task: TaskAction): ReportFile? {
return db.transactReturning { txn ->
reportGraph.getAncestorReport(txn, childReportId, childIndex, task)
}
}

/**
Expand Down
3 changes: 3 additions & 0 deletions prime-router/src/main/kotlin/transport/AS2Transport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.credentials.CredentialHelper
import gov.cdc.prime.router.credentials.CredentialRequestReason
import gov.cdc.prime.router.credentials.UserJksCredential
import gov.cdc.prime.router.report.ReportService
import org.apache.hc.core5.util.Timeout
import org.apache.http.conn.ConnectTimeoutException
import org.apache.logging.log4j.kotlin.Logging
Expand Down Expand Up @@ -50,6 +51,7 @@ class AS2Transport(val metadata: Metadata? = null) : ITransport, Logging {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
// DevNote: This code is similar to the SFTP code in structure
//
Expand Down Expand Up @@ -78,6 +80,7 @@ class AS2Transport(val metadata: Metadata? = null) : ITransport, Logging {
msg,
header,
reportEventService,
reportService,
this::class.java.simpleName
)
actionHistory.trackItemLineages(Report.createItemLineagesFromDb(header, sentReportId))
Expand Down
3 changes: 3 additions & 0 deletions prime-router/src/main/kotlin/transport/BlobStoreTransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import gov.cdc.prime.router.azure.BlobAccess
import gov.cdc.prime.router.azure.WorkflowEngine
import gov.cdc.prime.router.azure.db.enums.TaskAction
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.report.ReportService

class BlobStoreTransport : ITransport {
override fun send(
Expand All @@ -21,6 +22,7 @@ class BlobStoreTransport : ITransport {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
val blobTransportType = transportType as BlobStoreTransportType
val envVar: String = blobTransportType.containerName
Expand All @@ -41,6 +43,7 @@ class BlobStoreTransport : ITransport {
msg,
header,
reportEventService,
reportService,
this::class.java.simpleName
)
actionHistory.trackItemLineages(Report.createItemLineagesFromDb(header, sentReportId))
Expand Down
2 changes: 2 additions & 0 deletions prime-router/src/main/kotlin/transport/EmailTransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import gov.cdc.prime.router.TransportType
import gov.cdc.prime.router.azure.ActionHistory
import gov.cdc.prime.router.azure.WorkflowEngine
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.report.ReportService
import org.thymeleaf.TemplateEngine
import org.thymeleaf.context.Context
import org.thymeleaf.templateresolver.StringTemplateResolver
Expand All @@ -33,6 +34,7 @@ class EmailTransport : ITransport {
context: ExecutionContext,
actionHistory: ActionHistory, // not used by emailer
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
val emailTransport = transportType as EmailTransportType
val content = buildContent(header)
Expand Down
11 changes: 9 additions & 2 deletions prime-router/src/main/kotlin/transport/GAENTransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import gov.cdc.prime.router.common.HttpClientUtils
import gov.cdc.prime.router.credentials.CredentialHelper
import gov.cdc.prime.router.credentials.CredentialRequestReason
import gov.cdc.prime.router.credentials.UserApiKeyCredential
import gov.cdc.prime.router.report.ReportService
import io.ktor.client.HttpClient
import io.ktor.http.ContentType
import io.ktor.http.HttpStatusCode
Expand Down Expand Up @@ -78,6 +79,7 @@ class GAENTransport(val httpClient: HttpClient? = null) : ITransport, Logging {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
val gaenTransportInfo = transportType as GAENTransportType
val reportId = header.reportFile.reportId
Expand Down Expand Up @@ -106,7 +108,7 @@ class GAENTransport(val httpClient: HttpClient? = null) : ITransport, Logging {

// Record the work in history and logs
when (postResult) {
PostResult.SUCCESS -> recordFullSuccess(params, reportEventService)
PostResult.SUCCESS -> recordFullSuccess(params, reportEventService, reportService)
PostResult.RETRY -> recordFailureWithRetry(params)
PostResult.FAIL -> recordFailure(params)
}
Expand All @@ -123,7 +125,11 @@ class GAENTransport(val httpClient: HttpClient? = null) : ITransport, Logging {
/**
* Record in [ActionHistory] the full success of this notification. Log an info message as well.
*/
private fun recordFullSuccess(params: SendParams, reportEventService: IReportStreamEventService) {
private fun recordFullSuccess(
params: SendParams,
reportEventService: IReportStreamEventService,
reportService: ReportService,
) {
val msg = "${params.receiver.fullName}: Successful exposure notifications of ${params.comboId}"
val history = params.actionHistory
params.context.logger.info(msg)
Expand All @@ -137,6 +143,7 @@ class GAENTransport(val httpClient: HttpClient? = null) : ITransport, Logging {
msg,
params.header,
reportEventService,
reportService,
this::class.java.simpleName
)
history.trackItemLineages(Report.createItemLineagesFromDb(params.header, params.sentReportId))
Expand Down
2 changes: 2 additions & 0 deletions prime-router/src/main/kotlin/transport/ITransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import gov.cdc.prime.router.TransportType
import gov.cdc.prime.router.azure.ActionHistory
import gov.cdc.prime.router.azure.WorkflowEngine
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.report.ReportService

interface ITransport {
/**
Expand All @@ -26,5 +27,6 @@ interface ITransport {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems?
}
3 changes: 3 additions & 0 deletions prime-router/src/main/kotlin/transport/NullTransport.kt
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import gov.cdc.prime.router.TransportType
import gov.cdc.prime.router.azure.ActionHistory
import gov.cdc.prime.router.azure.WorkflowEngine
import gov.cdc.prime.router.azure.observability.event.IReportStreamEventService
import gov.cdc.prime.router.report.ReportService

/**
* The Null transport is intended for testing and benchmarking purposes.
Expand All @@ -21,6 +22,7 @@ class NullTransport : ITransport {
context: ExecutionContext,
actionHistory: ActionHistory,
reportEventService: IReportStreamEventService,
reportService: ReportService,
): RetryItems? {
if (header.content == null) error("No content for report ${header.reportFile.reportId}")
val receiver = header.receiver ?: error("No receiver defined for report ${header.reportFile.reportId}")
Expand All @@ -34,6 +36,7 @@ class NullTransport : ITransport {
msg,
header,
reportEventService,
reportService,
this::class.java.simpleName
)
actionHistory.trackItemLineages(Report.createItemLineagesFromDb(header, sentReportId))
Expand Down
Loading

0 comments on commit 51a8a01

Please sign in to comment.