From fdbef791f298f6e6e04cb0ed0ba30634556f2ef0 Mon Sep 17 00:00:00 2001 From: VZE3 Date: Wed, 25 Sep 2024 17:32:12 -0400 Subject: [PATCH 01/11] Add undeliveredUploads feature to UploadStats. Update the GetUploadStats query to search for and provide the response for "undelivered uploads" and the associated counts as part of the query response. --- .../loaders/UploadStatsLoader.kt | 44 ++++++++++++++----- .../models/query/UnDeliveredUpload.kt | 13 ++++++ .../models/query/UnDeliveredUploadCounts.kt | 13 ++++++ .../models/query/UploadStats.kt | 5 ++- 4 files changed, 64 insertions(+), 11 deletions(-) create mode 100644 pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/UnDeliveredUpload.kt create mode 100644 pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/UnDeliveredUploadCounts.kt diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt index a466a18f..91a5fa6b 100644 --- a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt @@ -2,9 +2,7 @@ package gov.cdc.ocio.processingstatusapi.loaders import com.azure.cosmos.models.CosmosQueryRequestOptions import gov.cdc.ocio.processingstatusapi.exceptions.BadRequestException -import gov.cdc.ocio.processingstatusapi.models.query.DuplicateFilenameCounts -import gov.cdc.ocio.processingstatusapi.models.query.UploadCounts -import gov.cdc.ocio.processingstatusapi.models.query.UploadStats +import gov.cdc.ocio.processingstatusapi.models.query.* import gov.cdc.ocio.processingstatusapi.utils.SqlClauseBuilder import java.util.* @@ -15,7 +13,8 @@ class UploadStatsLoader: CosmosLoader() { dataStreamRoute: String, dateStart: String?, dateEnd: String?, - daysInterval: Int?): UploadStats { + daysInterval: Int? + ): UploadStats { val timeRangeWhereClause = SqlClauseBuilder().buildSqlClauseForDateRange( daysInterval, @@ -72,6 +71,18 @@ class UploadStatsLoader: CosmosLoader() { + ") r where r.totalCount > 1" ) + val unDeliveredUploadsQuery = ( + "select r.uploadId, r.content.metadata.received_filename " + + "from r " + + "where IS_DEFINED(r.content.content_schema_name) and " + + "r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and " + + "r.content.content_schema_name = 'blob-file-copy' and " + + "r.stageInfo.status = 'FAILURE' and " + + "$timeRangeWhereClause " + ) + + + val uniqueUploadIdsResult = reportsContainer?.queryItems( numUniqueUploadsQuery, CosmosQueryRequestOptions(), UploadCounts::class.java @@ -100,6 +111,17 @@ class UploadStatsLoader: CosmosLoader() { val inProgressUploadsCount = inProgressUploadCountResult?.firstOrNull() ?: 0 + val duplicateFilenameCountResult = reportsContainer?.queryItems( + duplicateFilenameCountQuery, CosmosQueryRequestOptions(), + DuplicateFilenameCounts::class.java + ) + + val duplicateFilenames = + if (duplicateFilenameCountResult != null && duplicateFilenameCountResult.count() > 0) + duplicateFilenameCountResult.toList() + else + listOf() + val completedUploadsCountResult = reportsContainer?.queryItems( completedUploadsCountQuery, CosmosQueryRequestOptions(), Float::class.java @@ -107,14 +129,14 @@ class UploadStatsLoader: CosmosLoader() { val completedUploadsCount = completedUploadsCountResult?.firstOrNull() ?: 0 - val duplicateFilenameCountResult = reportsContainer?.queryItems( - duplicateFilenameCountQuery, CosmosQueryRequestOptions(), - DuplicateFilenameCounts::class.java + val unDeliveredUploadsCountResult = reportsContainer?.queryItems( + unDeliveredUploadsQuery, CosmosQueryRequestOptions(), + UnDeliveredUpload::class.java ) - val duplicateFilenames = - if (duplicateFilenameCountResult != null && duplicateFilenameCountResult.count() > 0) - duplicateFilenameCountResult.toList() + val undeliveredUploads = + if (unDeliveredUploadsCountResult != null && unDeliveredUploadsCountResult.count() > 0) + unDeliveredUploadsCountResult.toList() else listOf() @@ -125,6 +147,8 @@ class UploadStatsLoader: CosmosLoader() { this.inProgressUploadsCount = inProgressUploadsCount.toLong() this.completedUploadsCount = completedUploadsCount.toLong() this.duplicateFilenames = duplicateFilenames + this.unDeliveredUploads.totalCount = undeliveredUploads.count().toLong() + this.unDeliveredUploads.unDeliveredUploads = undeliveredUploads } } } diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/UnDeliveredUpload.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/UnDeliveredUpload.kt new file mode 100644 index 00000000..b25bae8f --- /dev/null +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/UnDeliveredUpload.kt @@ -0,0 +1,13 @@ +package gov.cdc.ocio.processingstatusapi.models.query + +import com.expediagroup.graphql.generator.annotations.GraphQLDescription + +@GraphQLDescription("Collection of undelivered found") +data class UnDeliveredUpload( + + @GraphQLDescription("UploadId of the file that is not delivered.") + var uploadId: String? = null, + + @GraphQLDescription("Filename of the file that is not delivered.") + var filename: String? = null +) \ No newline at end of file diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/UnDeliveredUploadCounts.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/UnDeliveredUploadCounts.kt new file mode 100644 index 00000000..59dc5986 --- /dev/null +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/UnDeliveredUploadCounts.kt @@ -0,0 +1,13 @@ +package gov.cdc.ocio.processingstatusapi.models.query + +import com.expediagroup.graphql.generator.annotations.GraphQLDescription + +@GraphQLDescription("Collection of undelivered uploads found") +data class UnDeliveredUploadCounts( + + @GraphQLDescription("Total number of undelivered uploads.") + var totalCount: Long = 0, + + @GraphQLDescription("Provides a list of all the uploads that have not been delivered. This means, the upload started, but according to the upload status reports we did not receive 100% of the expected chunks.") + var unDeliveredUploads: List = listOf() +) \ No newline at end of file diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/UploadStats.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/UploadStats.kt index d5ed30ec..ce2b2739 100644 --- a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/UploadStats.kt +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/UploadStats.kt @@ -21,5 +21,8 @@ data class UploadStats( var completedUploadsCount: Long = 0, @GraphQLDescription("Provides a list of all the duplicate filenames that were uploaded and how many.") - var duplicateFilenames: List = listOf() + var duplicateFilenames: List = listOf(), + + @GraphQLDescription("Provides a list of all the uploads that have not been delivered. This means, the upload started, but according to the upload status reports we did not receive 100% of the expected chunks.") + var unDeliveredUploads: UnDeliveredUploadCounts = UnDeliveredUploadCounts() ) From 6180071de89eb409043debb0626ed069922a4d55 Mon Sep 17 00:00:00 2001 From: VZE3 Date: Thu, 26 Sep 2024 16:05:38 -0400 Subject: [PATCH 02/11] Updates to the undeliveredUploads feature Update the GetUploadStats query to search for and provide the response for "undelivered uploads" and the associated counts as part of the query response (Send the counts, filename and uploadId). --- .../loaders/UploadStatsLoader.kt | 82 ++++++++++++++----- 1 file changed, 60 insertions(+), 22 deletions(-) diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt index 91a5fa6b..f4232cdc 100644 --- a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt @@ -2,6 +2,8 @@ package gov.cdc.ocio.processingstatusapi.loaders import com.azure.cosmos.models.CosmosQueryRequestOptions import gov.cdc.ocio.processingstatusapi.exceptions.BadRequestException +import gov.cdc.ocio.processingstatusapi.models.ReportDeadLetter +import gov.cdc.ocio.processingstatusapi.models.dao.ReportDeadLetterDao import gov.cdc.ocio.processingstatusapi.models.query.* import gov.cdc.ocio.processingstatusapi.utils.SqlClauseBuilder import java.util.* @@ -71,18 +73,6 @@ class UploadStatsLoader: CosmosLoader() { + ") r where r.totalCount > 1" ) - val unDeliveredUploadsQuery = ( - "select r.uploadId, r.content.metadata.received_filename " - + "from r " - + "where IS_DEFINED(r.content.content_schema_name) and " - + "r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and " - + "r.content.content_schema_name = 'blob-file-copy' and " - + "r.stageInfo.status = 'FAILURE' and " - + "$timeRangeWhereClause " - ) - - - val uniqueUploadIdsResult = reportsContainer?.queryItems( numUniqueUploadsQuery, CosmosQueryRequestOptions(), UploadCounts::class.java @@ -129,16 +119,7 @@ class UploadStatsLoader: CosmosLoader() { val completedUploadsCount = completedUploadsCountResult?.firstOrNull() ?: 0 - val unDeliveredUploadsCountResult = reportsContainer?.queryItems( - unDeliveredUploadsQuery, CosmosQueryRequestOptions(), - UnDeliveredUpload::class.java - ) - - val undeliveredUploads = - if (unDeliveredUploadsCountResult != null && unDeliveredUploadsCountResult.count() > 0) - unDeliveredUploadsCountResult.toList() - else - listOf() + val undeliveredUploads = getUndeliveredUploads(dataStreamId, dataStreamRoute, timeRangeWhereClause) return UploadStats().apply { this.uniqueUploadIdsCount = uniqueUploadIdsCount.toLong() @@ -151,4 +132,61 @@ class UploadStatsLoader: CosmosLoader() { this.unDeliveredUploads.unDeliveredUploads = undeliveredUploads } } + + /** + * Search the reports by uploadId for unDelivered Uploads. + * + * @param dataStreamId + * @param dataStreamRoute + * @param timeRangeWhereClause + * @return List + */ + private fun getUndeliveredUploads(dataStreamId: String, dataStreamRoute: String, timeRangeWhereClause: String): List { + + //Query to get the upload Ids for the given criteria + val unDeliveredUploadIdsQuery = ( + "select r.uploadId " + + "from r " + + "where IS_DEFINED(r.content.content_schema_name) and " + + "r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and " + + "r.content.content_schema_name = 'blob-file-copy' and " + + "r.stageInfo.status = 'FAILURE' and " + + "$timeRangeWhereClause " + + "group by r.uploadId" + ) + + + val unDeliveredUploadIdsResult = reportsContainer?.queryItems( + unDeliveredUploadIdsQuery, CosmosQueryRequestOptions(), + UnDeliveredUpload::class.java + ) + + // Extract uploadIds from the result + val uploadIds = unDeliveredUploadIdsResult?.mapNotNull { it.uploadId } ?: emptyList() + val quotedIds = uploadIds.joinToString("\",\"", "\"", "\"") + + //Query to get the respective filenames for the above uploadIds with the select criteria + val unDeliveredUploadsQuery = ( + "select r.uploadId, r.content.filename " + + "from r " + + "where r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and " + + "r.stageInfo.action = 'metadata-verify' and " + + "r.uploadId in ($quotedIds) " + ) + + + val unDeliveredUploadsResult = reportsContainer?.queryItems( + unDeliveredUploadsQuery, CosmosQueryRequestOptions(), + UnDeliveredUpload::class.java + ) + + val undeliveredUploads = + if (unDeliveredUploadsResult != null && unDeliveredUploadsResult.count() > 0) + unDeliveredUploadsResult.toList() + else + listOf() + + return undeliveredUploads + + } } From 5aebc498fd23f4de227a57fa96663977c57be981 Mon Sep 17 00:00:00 2001 From: VZE3 Date: Thu, 26 Sep 2024 21:22:07 -0400 Subject: [PATCH 03/11] Add logs --- .../ocio/processingstatusapi/loaders/UploadStatsLoader.kt | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt index f4232cdc..1e7230ad 100644 --- a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt @@ -152,9 +152,8 @@ class UploadStatsLoader: CosmosLoader() { + "r.content.content_schema_name = 'blob-file-copy' and " + "r.stageInfo.status = 'FAILURE' and " + "$timeRangeWhereClause " - + "group by r.uploadId" ) - + logger.info("UploadsStats, fetch uploadIds query = $unDeliveredUploadIdsQuery") val unDeliveredUploadIdsResult = reportsContainer?.queryItems( unDeliveredUploadIdsQuery, CosmosQueryRequestOptions(), @@ -171,9 +170,10 @@ class UploadStatsLoader: CosmosLoader() { + "from r " + "where r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and " + "r.stageInfo.action = 'metadata-verify' and " - + "r.uploadId in ($quotedIds) " + + "r.uploadId in ($quotedIds) and " + + "$timeRangeWhereClause " ) - + logger.info("UploadsStats, fetch undelivered uploads query = $unDeliveredUploadsQuery") val unDeliveredUploadsResult = reportsContainer?.queryItems( unDeliveredUploadsQuery, CosmosQueryRequestOptions(), From bfb0e10aaeab14e6584cc3aa40fff6a878d223a0 Mon Sep 17 00:00:00 2001 From: VZE3 Date: Mon, 30 Sep 2024 03:34:38 -0400 Subject: [PATCH 04/11] Capture the undelivered uploads that do not have any associated entries for blob-file-copy in teh reports container --- .../loaders/UploadStatsLoader.kt | 249 +++++++++++++++--- 1 file changed, 206 insertions(+), 43 deletions(-) diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt index 1e7230ad..c0c3a5e9 100644 --- a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt @@ -2,6 +2,7 @@ package gov.cdc.ocio.processingstatusapi.loaders import com.azure.cosmos.models.CosmosQueryRequestOptions import gov.cdc.ocio.processingstatusapi.exceptions.BadRequestException +import gov.cdc.ocio.processingstatusapi.exceptions.ContentException import gov.cdc.ocio.processingstatusapi.models.ReportDeadLetter import gov.cdc.ocio.processingstatusapi.models.dao.ReportDeadLetterDao import gov.cdc.ocio.processingstatusapi.models.query.* @@ -134,59 +135,221 @@ class UploadStatsLoader: CosmosLoader() { } /** - * Search the reports by uploadId for unDelivered Uploads. + * Searches the reports by uploadId to find undelivered uploads. * - * @param dataStreamId - * @param dataStreamRoute - * @param timeRangeWhereClause - * @return List + * @param dataStreamId The ID of the data stream. + * @param dataStreamRoute The route of the data stream. + * @param timeRangeWhereClause The SQL clause for the time range. + * @return A list of [UnDeliveredUpload] objects representing the undelivered uploads. + * @throws BadRequestException If an error occurs while fetching the undelivered uploads. */ + @Throws(ContentException::class, BadRequestException :: class, Exception:: class) private fun getUndeliveredUploads(dataStreamId: String, dataStreamRoute: String, timeRangeWhereClause: String): List { + try{ + //Get the unmatched uploadIds for all the items without an associated item for blob-file-copy + val undeliveredUploadIdsForBlobFileCopy = getUndeliveredUploadIdsForStageBlobFileCopy(dataStreamId, dataStreamRoute, timeRangeWhereClause) + + //Build Query - Get all the uploads with an item for blob-file-copy and status of failure + val unDeliveredUploadIdsQuery = buildBlobFileCopyFailureQuery(dataStreamId, dataStreamRoute, timeRangeWhereClause) + logger.info("UploadsStats, fetch uploadIds query = $unDeliveredUploadIdsQuery") + val undeliveredUploadIdsForBlobFileCopyWithFailure = executeUndeliveredUploadsQuery(unDeliveredUploadIdsQuery) + + //All the uploadIds that can be categorized as undelivered + val uploadIds = undeliveredUploadIdsForBlobFileCopyWithFailure + undeliveredUploadIdsForBlobFileCopy + val quotedIds = uploadIds.joinToString("\",\"", "\"", "\"") + + //Query to get the respective filenames for the above uploadIds with the select criteria + val unDeliveredUploadsQuery = ( + "select r.uploadId, r.content.filename " + + "from r " + + "where r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and " + + "r.stageInfo.action = 'metadata-verify' and " + + "r.uploadId in ($quotedIds) and " + + "$timeRangeWhereClause " + ) + logger.info("UploadsStats, fetch undelivered uploads query = $unDeliveredUploadsQuery") + + val unDeliveredUploadsResult = reportsContainer?.queryItems( + unDeliveredUploadsQuery, CosmosQueryRequestOptions(), + UnDeliveredUpload::class.java + ) + + val undeliveredUploads = + if (unDeliveredUploadsResult != null && unDeliveredUploadsResult.count() > 0) + unDeliveredUploadsResult.toList() + else + listOf() + + return undeliveredUploads + + }catch (e: ContentException) { + logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) + throw ContentException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") + + } catch (e: BadRequestException) { + logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) + throw BadRequestException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") + }catch (e: Exception) { + logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) + throw BadRequestException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") + } + } - //Query to get the upload Ids for the given criteria - val unDeliveredUploadIdsQuery = ( - "select r.uploadId " - + "from r " - + "where IS_DEFINED(r.content.content_schema_name) and " - + "r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and " - + "r.content.content_schema_name = 'blob-file-copy' and " - + "r.stageInfo.status = 'FAILURE' and " - + "$timeRangeWhereClause " - ) - logger.info("UploadsStats, fetch uploadIds query = $unDeliveredUploadIdsQuery") + /** + * Fetches the unmatched uploadIds for all the items without an associated item for blob-file-copy. + * + * @param dataStreamId The ID of the data stream. + * @param dataStreamRoute The route of the data stream. + * @param timeRangeWhereClause The SQL clause for the time range. + * @return A list of undelivered uploadIds. + * @throws ContentException, BadRequestException If an error occurs while fetching the undelivered upload IDs for blob-file-copy. + */ + @Throws(ContentException::class, BadRequestException :: class, Exception:: class) + private fun getUndeliveredUploadIdsForStageBlobFileCopy(dataStreamId: String, dataStreamRoute: String, timeRangeWhereClause: String): List { + try{ + // Query to retrieve the count of uploads with 'metadata-verify' with the provided search criteria + val uploadsWithMetadataVerifyCountQuery = buildCountQuery(dataStreamId, dataStreamRoute, "metadata-verify", timeRangeWhereClause) + + // Query to retrieve the uploads with 'metadata-verify' with the provided search criteria + val uploadsWithMetadataVerifyQuery = buildUploadByActionQuery(dataStreamId, dataStreamRoute, "metadata-verify", timeRangeWhereClause) + + // Query to retrieve the count of uploads with 'blob-file-copy' with the provided search criteria + val uploadsWithBlobFileCopyCountQuery = buildCountQuery(dataStreamId, dataStreamRoute, "blob-file-copy", timeRangeWhereClause) + + // Query to retrieve the uploads with 'blob-file-copy' with the provided search criteria + val uploadsWithBlobFileCopyQuery = buildUploadByActionQuery(dataStreamId, dataStreamRoute, "blob-file-copy", timeRangeWhereClause) + + // Query: Count of all uploads with 'metadata-verify' + val uploadsWithMetadataCount = executeUndeliveredUploadsCountsQuery(uploadsWithMetadataVerifyCountQuery) + + // Query: Count of all uploads with 'blob-file-copy' + val uploadsWithBlobFileCopyCount = executeUndeliveredUploadsCountsQuery(uploadsWithBlobFileCopyCountQuery) + + // Fetch the list of undelivered uploadIds when the counts do not match + if (uploadsWithMetadataCount != uploadsWithBlobFileCopyCount) { + // Get the list of uploadIds with metadata-verify that do not have an entry with an uploadId exist in blob-file-copy + val metadataVerifyIds = executeUndeliveredUploadsQuery(uploadsWithMetadataVerifyQuery) + val blobFileCopyIds = executeUndeliveredUploadsQuery(uploadsWithBlobFileCopyQuery) + + val finalResults = (metadataVerifyIds - blobFileCopyIds).toList() + return finalResults + + }else { + return listOf() + } //END if + }catch (e: ContentException) { + logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) + throw ContentException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") + + } catch (e: BadRequestException) { + logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) + throw BadRequestException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") + + }catch (e: Exception) { + logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) + throw BadRequestException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") + } - val unDeliveredUploadIdsResult = reportsContainer?.queryItems( - unDeliveredUploadIdsQuery, CosmosQueryRequestOptions(), - UnDeliveredUpload::class.java - ) + } - // Extract uploadIds from the result - val uploadIds = unDeliveredUploadIdsResult?.mapNotNull { it.uploadId } ?: emptyList() - val quotedIds = uploadIds.joinToString("\",\"", "\"", "\"") - //Query to get the respective filenames for the above uploadIds with the select criteria - val unDeliveredUploadsQuery = ( - "select r.uploadId, r.content.filename " - + "from r " - + "where r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and " - + "r.stageInfo.action = 'metadata-verify' and " - + "r.uploadId in ($quotedIds) and " - + "$timeRangeWhereClause " - ) - logger.info("UploadsStats, fetch undelivered uploads query = $unDeliveredUploadsQuery") + /** + * Executes an undelivered upload query and returns the set of uploadIds. + * + * @param query The SQL query to execute. + * @return A set of undelivered uploadIds. + * @throws ContentException If an error occurs while executing the undelivered upload query. + */ + @Throws(ContentException::class) + private fun executeUndeliveredUploadsQuery(query: String): Set { + try{ + return reportsContainer?.queryItems(query, CosmosQueryRequestOptions(), UnDeliveredUpload::class.java) + ?.mapNotNull { it.uploadId } + ?.toSet() ?: emptySet() + }catch (e: ContentException) { + logger.error("Error executing undelivered uploads counts query: ${e.message}", e) + throw ContentException("Error executing undelivered uploads counts query: ${e.message}") + } - val unDeliveredUploadsResult = reportsContainer?.queryItems( - unDeliveredUploadsQuery, CosmosQueryRequestOptions(), - UnDeliveredUpload::class.java - ) + } - val undeliveredUploads = - if (unDeliveredUploadsResult != null && unDeliveredUploadsResult.count() > 0) - unDeliveredUploadsResult.toList() - else - listOf() + /** + * Executes an undelivered uploads counts query and returns the count. + * + * @param query The SQL query to execute. + * @return The count of undelivered uploads. + * @throws ContentException If an error occurs while executing the undelivered uploads counts query. + */ + @Throws(ContentException::class) + private fun executeUndeliveredUploadsCountsQuery(query: String): Int { + try { + return reportsContainer?.queryItems(query, CosmosQueryRequestOptions(), Int::class.java) + ?.firstOrNull() ?: 0 + }catch (e: ContentException) { + logger.error("Error executing undelivered uploads counts query: ${e.message}", e) + throw ContentException("Error executing undelivered uploads counts query: ${e.message}") + } + } - return undeliveredUploads + /** + * Builds a SQL count query for the specified data stream, route, and action. + * + * @param dataStreamId The ID of the data stream. + * @param dataStreamRoute The route of the data stream. + * @param action The action to filter by. + * @param timeRangeWhereClause The SQL clause for the time range. + * @return The SQL query string. + */ + private fun buildCountQuery(dataStreamId: String, dataStreamRoute: String, action: String, timeRangeWhereClause: String): String { + return ( + "select value count(1) " + + "from" + + "(select distinct r.uploadId " + + "from r " + + "where r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and " + + "r.stageInfo.action = '$action' and " + + timeRangeWhereClause + + ") as count") + .trimIndent() + } + /** + * Builds a SQL query to retrieve the uploads by the specified data stream, route, and action. + * + * @param dataStreamId The ID of the data stream. + * @param dataStreamRoute The route of the data stream. + * @param action The action to filter by. + * @param timeRangeWhereClause The SQL clause for the time range. + * @return The SQL query string. + */ + private fun buildUploadByActionQuery(dataStreamId: String, dataStreamRoute: String, action: String, timeRangeWhereClause: String): String { + return ( + "select distinct r.uploadId " + + "from r " + + "where r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and " + + "r.stageInfo.action = '$action' and " + + timeRangeWhereClause + ).trimIndent() + } + + /** + * Builds a SQL query to retrieve the uploads with an item for blob-file-copy and a status of failure. + * + * @param dataStreamId The ID of the data stream. + * @param dataStreamRoute The route of the data stream. + * @param timeRangeWhereClause The SQL clause for the time range. + * @return The SQL query string. + */ + private fun buildBlobFileCopyFailureQuery(dataStreamId: String, dataStreamRoute: String, timeRangeWhereClause: String): String { + return ( + "select distinct r.uploadId " + + "from r " + + "where IS_DEFINED(r.content.content_schema_name) and " + + "r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and " + + "r.content.content_schema_name = 'blob-file-copy' and " + + "r.stageInfo.status = 'FAILURE' and " + + "$timeRangeWhereClause " + ) } } + From 5c5b3f1c32f0d36ecbe409f93ac10297142d7f2d Mon Sep 17 00:00:00 2001 From: VZE3 Date: Mon, 30 Sep 2024 03:41:56 -0400 Subject: [PATCH 05/11] Logs --- .../processingstatusapi/loaders/UploadStatsLoader.kt | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt index c0c3a5e9..79df2bf9 100644 --- a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt @@ -3,15 +3,13 @@ package gov.cdc.ocio.processingstatusapi.loaders import com.azure.cosmos.models.CosmosQueryRequestOptions import gov.cdc.ocio.processingstatusapi.exceptions.BadRequestException import gov.cdc.ocio.processingstatusapi.exceptions.ContentException -import gov.cdc.ocio.processingstatusapi.models.ReportDeadLetter -import gov.cdc.ocio.processingstatusapi.models.dao.ReportDeadLetterDao import gov.cdc.ocio.processingstatusapi.models.query.* import gov.cdc.ocio.processingstatusapi.utils.SqlClauseBuilder import java.util.* class UploadStatsLoader: CosmosLoader() { - @Throws(BadRequestException::class) + @Throws(BadRequestException::class, ContentException::class) fun getUploadStats(dataStreamId: String, dataStreamRoute: String, dateStart: String?, @@ -143,7 +141,7 @@ class UploadStatsLoader: CosmosLoader() { * @return A list of [UnDeliveredUpload] objects representing the undelivered uploads. * @throws BadRequestException If an error occurs while fetching the undelivered uploads. */ - @Throws(ContentException::class, BadRequestException :: class, Exception:: class) + @Throws(ContentException::class, BadRequestException :: class) private fun getUndeliveredUploads(dataStreamId: String, dataStreamRoute: String, timeRangeWhereClause: String): List { try{ //Get the unmatched uploadIds for all the items without an associated item for blob-file-copy @@ -167,7 +165,7 @@ class UploadStatsLoader: CosmosLoader() { + "r.uploadId in ($quotedIds) and " + "$timeRangeWhereClause " ) - logger.info("UploadsStats, fetch undelivered uploads query = $unDeliveredUploadsQuery") + logger.info("UploadsStats, fetch all undelivered uploads query = $unDeliveredUploadsQuery") val unDeliveredUploadsResult = reportsContainer?.queryItems( unDeliveredUploadsQuery, CosmosQueryRequestOptions(), @@ -232,6 +230,7 @@ class UploadStatsLoader: CosmosLoader() { val blobFileCopyIds = executeUndeliveredUploadsQuery(uploadsWithBlobFileCopyQuery) val finalResults = (metadataVerifyIds - blobFileCopyIds).toList() + logger.info("Total number of undelivered uploads for stage, 'blob-file-copy' without any associated reports: " + finalResults.count()) return finalResults }else { From 6951a08daeaabb9594cb15c6c7dd348d3f61b484 Mon Sep 17 00:00:00 2001 From: VZE3 Date: Mon, 30 Sep 2024 04:12:42 -0400 Subject: [PATCH 06/11] Logs --- .../kotlin/gov/cdc/ocio/processingstatusapi/plugins/GraphQL.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/GraphQL.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/GraphQL.kt index 41587ab7..3f43950b 100644 --- a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/GraphQL.kt +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/GraphQL.kt @@ -102,7 +102,7 @@ fun Application.graphQLModule() { NotificationsMutationService(), DataStreamTopErrorsNotificationSubscriptionMutationService(), DeadlineCheckSubscriptionMutationService(), - UploadErrorsNotificationSubscriptionMutationService() + UploadErrorsNotificationSubscriptionMutationService(), ReportMutation() ) From 8d056ceadc66bc45cfce0e01b170861bc5b153c0 Mon Sep 17 00:00:00 2001 From: VZE3 Date: Mon, 30 Sep 2024 14:57:13 -0400 Subject: [PATCH 07/11] Change the default port number back to 8080 --- pstatus-graphql-ktor/src/main/resources/application.conf | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pstatus-graphql-ktor/src/main/resources/application.conf b/pstatus-graphql-ktor/src/main/resources/application.conf index 2a490716..83b0bd9c 100644 --- a/pstatus-graphql-ktor/src/main/resources/application.conf +++ b/pstatus-graphql-ktor/src/main/resources/application.conf @@ -1,6 +1,6 @@ ktor { deployment { - port = 8082 + port = 8080 host = 0.0.0.0 } From 665edd6c7ed7ed699aa623cf325329bf643c67c7 Mon Sep 17 00:00:00 2001 From: VZE3 Date: Mon, 30 Sep 2024 16:46:11 -0400 Subject: [PATCH 08/11] Add "Pending Uploads" and the respective counts to the response --- .../loaders/UploadStatsLoader.kt | 71 +++++++++++++++++-- .../models/query/PendingUploadCounts.kt | 13 ++++ .../models/query/UploadStats.kt | 5 +- 3 files changed, 82 insertions(+), 7 deletions(-) create mode 100644 pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/PendingUploadCounts.kt diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt index 79df2bf9..0d0073b5 100644 --- a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt @@ -119,6 +119,7 @@ class UploadStatsLoader: CosmosLoader() { val completedUploadsCount = completedUploadsCountResult?.firstOrNull() ?: 0 val undeliveredUploads = getUndeliveredUploads(dataStreamId, dataStreamRoute, timeRangeWhereClause) + val pendingUploads = getPendingUploads(dataStreamId, dataStreamRoute, timeRangeWhereClause) return UploadStats().apply { this.uniqueUploadIdsCount = uniqueUploadIdsCount.toLong() @@ -129,6 +130,8 @@ class UploadStatsLoader: CosmosLoader() { this.duplicateFilenames = duplicateFilenames this.unDeliveredUploads.totalCount = undeliveredUploads.count().toLong() this.unDeliveredUploads.unDeliveredUploads = undeliveredUploads + this.pendingUploads.totalCount = pendingUploads.count().toLong() + this.pendingUploads.pendingUploads = pendingUploads } } @@ -145,7 +148,7 @@ class UploadStatsLoader: CosmosLoader() { private fun getUndeliveredUploads(dataStreamId: String, dataStreamRoute: String, timeRangeWhereClause: String): List { try{ //Get the unmatched uploadIds for all the items without an associated item for blob-file-copy - val undeliveredUploadIdsForBlobFileCopy = getUndeliveredUploadIdsForStageBlobFileCopy(dataStreamId, dataStreamRoute, timeRangeWhereClause) + val undeliveredUploadIdsForBlobFileCopy = getUploadIdsWithIssues(dataStreamId, dataStreamRoute, "upload-completed", "blob-file-copy", timeRangeWhereClause) //Build Query - Get all the uploads with an item for blob-file-copy and status of failure val unDeliveredUploadIdsQuery = buildBlobFileCopyFailureQuery(dataStreamId, dataStreamRoute, timeRangeWhereClause) @@ -193,6 +196,62 @@ class UploadStatsLoader: CosmosLoader() { } } + + /** + * Searches the reports by uploadId to find undelivered uploads. + * + * @param dataStreamId The ID of the data stream. + * @param dataStreamRoute The route of the data stream. + * @param timeRangeWhereClause The SQL clause for the time range. + * @return A list of [UnDeliveredUpload] objects representing the undelivered uploads. + * @throws BadRequestException If an error occurs while fetching the undelivered uploads. + */ + @Throws(ContentException::class, BadRequestException :: class) + private fun getPendingUploads(dataStreamId: String, dataStreamRoute: String, timeRangeWhereClause: String): List { + try{ + //Get the unmatched uploadIds for all the items without an associated item for blob-file-copy + val pendingUploadIdsForBlobFileCopy = getUploadIdsWithIssues(dataStreamId, dataStreamRoute, "metadata-verify", "upload-completed", timeRangeWhereClause) + + //All the uploadIds that can be categorized as undelivered + val quotedIds = pendingUploadIdsForBlobFileCopy.joinToString("\",\"", "\"", "\"") + + //Query to get the respective filenames for the above uploadIds with the select criteria + val unDeliveredUploadsQuery = ( + "select r.uploadId, r.content.filename " + + "from r " + + "where r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and " + + "r.stageInfo.action = 'metadata-verify' and " + + "r.uploadId in ($quotedIds) and " + + "$timeRangeWhereClause " + ) + logger.info("UploadsStats, fetch all undelivered uploads query = $unDeliveredUploadsQuery") + + val unDeliveredUploadsResult = reportsContainer?.queryItems( + unDeliveredUploadsQuery, CosmosQueryRequestOptions(), + UnDeliveredUpload::class.java + ) + + val undeliveredUploads = + if (unDeliveredUploadsResult != null && unDeliveredUploadsResult.count() > 0) + unDeliveredUploadsResult.toList() + else + listOf() + + return undeliveredUploads + + }catch (e: ContentException) { + logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) + throw ContentException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") + + } catch (e: BadRequestException) { + logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) + throw BadRequestException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") + }catch (e: Exception) { + logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) + throw BadRequestException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") + } + } + /** * Fetches the unmatched uploadIds for all the items without an associated item for blob-file-copy. * @@ -203,19 +262,19 @@ class UploadStatsLoader: CosmosLoader() { * @throws ContentException, BadRequestException If an error occurs while fetching the undelivered upload IDs for blob-file-copy. */ @Throws(ContentException::class, BadRequestException :: class, Exception:: class) - private fun getUndeliveredUploadIdsForStageBlobFileCopy(dataStreamId: String, dataStreamRoute: String, timeRangeWhereClause: String): List { + private fun getUploadIdsWithIssues(dataStreamId: String, dataStreamRoute: String, action1: String, action2: String, timeRangeWhereClause: String): List { try{ // Query to retrieve the count of uploads with 'metadata-verify' with the provided search criteria - val uploadsWithMetadataVerifyCountQuery = buildCountQuery(dataStreamId, dataStreamRoute, "metadata-verify", timeRangeWhereClause) + val uploadsWithMetadataVerifyCountQuery = buildCountQuery(dataStreamId, dataStreamRoute, action1, timeRangeWhereClause) // Query to retrieve the uploads with 'metadata-verify' with the provided search criteria - val uploadsWithMetadataVerifyQuery = buildUploadByActionQuery(dataStreamId, dataStreamRoute, "metadata-verify", timeRangeWhereClause) + val uploadsWithMetadataVerifyQuery = buildUploadByActionQuery(dataStreamId, dataStreamRoute, action1, timeRangeWhereClause) // Query to retrieve the count of uploads with 'blob-file-copy' with the provided search criteria - val uploadsWithBlobFileCopyCountQuery = buildCountQuery(dataStreamId, dataStreamRoute, "blob-file-copy", timeRangeWhereClause) + val uploadsWithBlobFileCopyCountQuery = buildCountQuery(dataStreamId, dataStreamRoute, action2, timeRangeWhereClause) // Query to retrieve the uploads with 'blob-file-copy' with the provided search criteria - val uploadsWithBlobFileCopyQuery = buildUploadByActionQuery(dataStreamId, dataStreamRoute, "blob-file-copy", timeRangeWhereClause) + val uploadsWithBlobFileCopyQuery = buildUploadByActionQuery(dataStreamId, dataStreamRoute, action2, timeRangeWhereClause) // Query: Count of all uploads with 'metadata-verify' val uploadsWithMetadataCount = executeUndeliveredUploadsCountsQuery(uploadsWithMetadataVerifyCountQuery) diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/PendingUploadCounts.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/PendingUploadCounts.kt new file mode 100644 index 00000000..16b5958a --- /dev/null +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/PendingUploadCounts.kt @@ -0,0 +1,13 @@ +package gov.cdc.ocio.processingstatusapi.models.query + +import com.expediagroup.graphql.generator.annotations.GraphQLDescription + +@GraphQLDescription("Collection of undelivered uploads found") +data class PendingUploadCounts( + + @GraphQLDescription("Total number of undelivered uploads.") + var totalCount: Long = 0, + + @GraphQLDescription("Provides a list of all the uploads that have not been delivered. This means, the upload started, but according to the upload status reports we did not receive 100% of the expected chunks.") + var pendingUploads: List = listOf() +) \ No newline at end of file diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/UploadStats.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/UploadStats.kt index ce2b2739..55b6b504 100644 --- a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/UploadStats.kt +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/models/query/UploadStats.kt @@ -24,5 +24,8 @@ data class UploadStats( var duplicateFilenames: List = listOf(), @GraphQLDescription("Provides a list of all the uploads that have not been delivered. This means, the upload started, but according to the upload status reports we did not receive 100% of the expected chunks.") - var unDeliveredUploads: UnDeliveredUploadCounts = UnDeliveredUploadCounts() + var unDeliveredUploads: UnDeliveredUploadCounts = UnDeliveredUploadCounts(), + + @GraphQLDescription("Provides a list of all the uploads that are pending. This means, the upload started, but according to the upload status reports we did not receive 100% of the expected chunks.") + var pendingUploads: PendingUploadCounts = PendingUploadCounts() ) From 609581a3fbc49d2e027c094c3e9c9c339519ddb8 Mon Sep 17 00:00:00 2001 From: VZE3 Date: Mon, 30 Sep 2024 18:05:31 -0400 Subject: [PATCH 09/11] Add "Pending Uploads" and the respective counts to the response --- .../loaders/UploadStatsLoader.kt | 124 +++++++++++++++--- 1 file changed, 106 insertions(+), 18 deletions(-) diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt index 0d0073b5..bd6edc0a 100644 --- a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt @@ -148,16 +148,19 @@ class UploadStatsLoader: CosmosLoader() { private fun getUndeliveredUploads(dataStreamId: String, dataStreamRoute: String, timeRangeWhereClause: String): List { try{ //Get the unmatched uploadIds for all the items without an associated item for blob-file-copy - val undeliveredUploadIdsForBlobFileCopy = getUploadIdsWithIssues(dataStreamId, dataStreamRoute, "upload-completed", "blob-file-copy", timeRangeWhereClause) + //val undeliveredUploadIdsForBlobFileCopy = getUploadIdsWithIssues(dataStreamId, dataStreamRoute, "upload-completed", "blob-file-copy", timeRangeWhereClause) + //getUploadsWithIssues(dataStreamId, dataStreamRoute, timeRangeWhereClause, expectedAction = "upload-completed", missingAction = "blob-file-copy") + val undeliveredUploadsForBlobFileCopy = getUploadsWithIssues(dataStreamId, dataStreamRoute, timeRangeWhereClause, expectedAction = "upload-completed", missingAction = "blob-file-copy") + //Build Query - Get all the uploads with an item for blob-file-copy and status of failure val unDeliveredUploadIdsQuery = buildBlobFileCopyFailureQuery(dataStreamId, dataStreamRoute, timeRangeWhereClause) logger.info("UploadsStats, fetch uploadIds query = $unDeliveredUploadIdsQuery") - val undeliveredUploadIdsForBlobFileCopyWithFailure = executeUndeliveredUploadsQuery(unDeliveredUploadIdsQuery) + val uploadsWithFailures = executeUndeliveredUploadsQuery(unDeliveredUploadIdsQuery) //All the uploadIds that can be categorized as undelivered - val uploadIds = undeliveredUploadIdsForBlobFileCopyWithFailure + undeliveredUploadIdsForBlobFileCopy - val quotedIds = uploadIds.joinToString("\",\"", "\"", "\"") + // val uploadIds = uploadsWithFailures + val quotedIds = uploadsWithFailures.joinToString("\",\"", "\"", "\"") //Query to get the respective filenames for the above uploadIds with the select criteria val unDeliveredUploadsQuery = ( @@ -181,7 +184,7 @@ class UploadStatsLoader: CosmosLoader() { else listOf() - return undeliveredUploads + return undeliveredUploads + undeliveredUploadsForBlobFileCopy }catch (e: ContentException) { logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) @@ -196,6 +199,19 @@ class UploadStatsLoader: CosmosLoader() { } } +/* + @Throws(ContentException::class, BadRequestException::class) + private fun getUndeliveredUploads(dataStreamId: String, dataStreamRoute: String, timeRangeWhereClause: String): List { + return getUploadsWithIssues( + dataStreamId, + dataStreamRoute, + timeRangeWhereClause, + expectedAction = "upload-completed", + missingAction = "blob-file-copy" + ) + } +*/ + /** * Searches the reports by uploadId to find undelivered uploads. @@ -206,7 +222,7 @@ class UploadStatsLoader: CosmosLoader() { * @return A list of [UnDeliveredUpload] objects representing the undelivered uploads. * @throws BadRequestException If an error occurs while fetching the undelivered uploads. */ - @Throws(ContentException::class, BadRequestException :: class) +/* @Throws(ContentException::class, BadRequestException :: class) private fun getPendingUploads(dataStreamId: String, dataStreamRoute: String, timeRangeWhereClause: String): List { try{ //Get the unmatched uploadIds for all the items without an associated item for blob-file-copy @@ -250,8 +266,60 @@ class UploadStatsLoader: CosmosLoader() { logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) throw BadRequestException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") } + }*/ + + @Throws(ContentException::class, BadRequestException::class) + private fun getPendingUploads(dataStreamId: String, dataStreamRoute: String, timeRangeWhereClause: String): List { + return getUploadsWithIssues( + dataStreamId, + dataStreamRoute, + timeRangeWhereClause, + expectedAction = "metadata-verify", + missingAction = "upload-completed" + ) + } + + + @Throws(ContentException::class, BadRequestException::class) + private fun getUploadsWithIssues( + dataStreamId: String, + dataStreamRoute: String, + timeRangeWhereClause: String, + expectedAction: String, + missingAction: String + ): List { + try { + val uploadIdsWithIssues = getUploadIdsWithIssues(dataStreamId, dataStreamRoute, expectedAction, missingAction, timeRangeWhereClause) + + if (uploadIdsWithIssues.isEmpty()) { + return emptyList() + } + + val quotedIds = uploadIdsWithIssues.joinToString("\",\"", "\"", "\"") + val uploadsWithIssuesQuery = buildUploadsWithIssuesQuery(dataStreamId, dataStreamRoute, quotedIds, timeRangeWhereClause) + + logger.info("UploadsStats, fetch uploads with issues query = $uploadsWithIssuesQuery") + + return reportsContainer?.queryItems( + uploadsWithIssuesQuery, + CosmosQueryRequestOptions(), + UnDeliveredUpload::class.java + )?.toList() ?: emptyList() + + }catch (e: ContentException) { + logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) + throw ContentException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") + + }catch (e: BadRequestException) { + logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) + throw BadRequestException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") + }catch (e: Exception) { + logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) + throw BadRequestException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") + } } + /** * Fetches the unmatched uploadIds for all the items without an associated item for blob-file-copy. * @@ -262,34 +330,40 @@ class UploadStatsLoader: CosmosLoader() { * @throws ContentException, BadRequestException If an error occurs while fetching the undelivered upload IDs for blob-file-copy. */ @Throws(ContentException::class, BadRequestException :: class, Exception:: class) - private fun getUploadIdsWithIssues(dataStreamId: String, dataStreamRoute: String, action1: String, action2: String, timeRangeWhereClause: String): List { + private fun getUploadIdsWithIssues( + dataStreamId: String, + dataStreamRoute: String, + expectedAction: String, + missingAction: String, + timeRangeWhereClause: String): List { + try{ // Query to retrieve the count of uploads with 'metadata-verify' with the provided search criteria - val uploadsWithMetadataVerifyCountQuery = buildCountQuery(dataStreamId, dataStreamRoute, action1, timeRangeWhereClause) + val expectedActionCountQuery = buildCountQuery(dataStreamId, dataStreamRoute, expectedAction, timeRangeWhereClause) // Query to retrieve the uploads with 'metadata-verify' with the provided search criteria - val uploadsWithMetadataVerifyQuery = buildUploadByActionQuery(dataStreamId, dataStreamRoute, action1, timeRangeWhereClause) + val expectedActionQuery = buildUploadByActionQuery(dataStreamId, dataStreamRoute, expectedAction, timeRangeWhereClause) // Query to retrieve the count of uploads with 'blob-file-copy' with the provided search criteria - val uploadsWithBlobFileCopyCountQuery = buildCountQuery(dataStreamId, dataStreamRoute, action2, timeRangeWhereClause) + val missingActionCountQuery = buildCountQuery(dataStreamId, dataStreamRoute, missingAction, timeRangeWhereClause) // Query to retrieve the uploads with 'blob-file-copy' with the provided search criteria - val uploadsWithBlobFileCopyQuery = buildUploadByActionQuery(dataStreamId, dataStreamRoute, action2, timeRangeWhereClause) + val missingActionQuery = buildUploadByActionQuery(dataStreamId, dataStreamRoute, missingAction, timeRangeWhereClause) // Query: Count of all uploads with 'metadata-verify' - val uploadsWithMetadataCount = executeUndeliveredUploadsCountsQuery(uploadsWithMetadataVerifyCountQuery) + val expectedActionCount = executeUndeliveredUploadsCountsQuery(expectedActionCountQuery) // Query: Count of all uploads with 'blob-file-copy' - val uploadsWithBlobFileCopyCount = executeUndeliveredUploadsCountsQuery(uploadsWithBlobFileCopyCountQuery) + val missingActionCount = executeUndeliveredUploadsCountsQuery(missingActionCountQuery) // Fetch the list of undelivered uploadIds when the counts do not match - if (uploadsWithMetadataCount != uploadsWithBlobFileCopyCount) { + if (expectedActionCount != missingActionCount) { // Get the list of uploadIds with metadata-verify that do not have an entry with an uploadId exist in blob-file-copy - val metadataVerifyIds = executeUndeliveredUploadsQuery(uploadsWithMetadataVerifyQuery) - val blobFileCopyIds = executeUndeliveredUploadsQuery(uploadsWithBlobFileCopyQuery) + val expectedActionIds = executeUndeliveredUploadsQuery(expectedActionQuery) + val missingActionIds = executeUndeliveredUploadsQuery(missingActionQuery) - val finalResults = (metadataVerifyIds - blobFileCopyIds).toList() - logger.info("Total number of undelivered uploads for stage, 'blob-file-copy' without any associated reports: " + finalResults.count()) + val finalResults = (expectedActionIds - missingActionIds).toList() + logger.info("Total number of uploads with stage, $expectedAction, without any associated reports: " + finalResults.count()) return finalResults }else { @@ -409,5 +483,19 @@ class UploadStatsLoader: CosmosLoader() { + "$timeRangeWhereClause " ) } + + + private fun buildUploadsWithIssuesQuery(dataStreamId: String, dataStreamRoute: String, quotedIds: String, timeRangeWhereClause: String): String { + return """ + SELECT r.uploadId, r.content.filename + FROM r + WHERE r.dataStreamId = '$dataStreamId' + AND r.dataStreamRoute = '$dataStreamRoute' + AND r.stageInfo.action = 'metadata-verify' + AND r.uploadId IN ($quotedIds) + AND $timeRangeWhereClause + """.trimIndent() + } + } From eea1b96bf41e5b6c43d02f4be12d95d18d9341ed Mon Sep 17 00:00:00 2001 From: VZE3 Date: Mon, 30 Sep 2024 20:45:55 -0400 Subject: [PATCH 10/11] Update Query --- .../loaders/UploadStatsLoader.kt | 125 ++++++++---------- 1 file changed, 53 insertions(+), 72 deletions(-) diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt index bd6edc0a..f9c82423 100644 --- a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt @@ -148,10 +148,7 @@ class UploadStatsLoader: CosmosLoader() { private fun getUndeliveredUploads(dataStreamId: String, dataStreamRoute: String, timeRangeWhereClause: String): List { try{ //Get the unmatched uploadIds for all the items without an associated item for blob-file-copy - //val undeliveredUploadIdsForBlobFileCopy = getUploadIdsWithIssues(dataStreamId, dataStreamRoute, "upload-completed", "blob-file-copy", timeRangeWhereClause) - //getUploadsWithIssues(dataStreamId, dataStreamRoute, timeRangeWhereClause, expectedAction = "upload-completed", missingAction = "blob-file-copy") - val undeliveredUploadsForBlobFileCopy = getUploadsWithIssues(dataStreamId, dataStreamRoute, timeRangeWhereClause, expectedAction = "upload-completed", missingAction = "blob-file-copy") - + val undeliveredUploadIdsForBlobFileCopy = getUploadIdsWithIssues(dataStreamId, dataStreamRoute, "upload-completed", "blob-file-copy", timeRangeWhereClause) //Build Query - Get all the uploads with an item for blob-file-copy and status of failure val unDeliveredUploadIdsQuery = buildBlobFileCopyFailureQuery(dataStreamId, dataStreamRoute, timeRangeWhereClause) @@ -159,12 +156,12 @@ class UploadStatsLoader: CosmosLoader() { val uploadsWithFailures = executeUndeliveredUploadsQuery(unDeliveredUploadIdsQuery) //All the uploadIds that can be categorized as undelivered - // val uploadIds = uploadsWithFailures - val quotedIds = uploadsWithFailures.joinToString("\",\"", "\"", "\"") + val uploadIds = uploadsWithFailures + undeliveredUploadIdsForBlobFileCopy + val quotedIds = uploadIds.joinToString("\",\"", "\"", "\"") //Query to get the respective filenames for the above uploadIds with the select criteria val unDeliveredUploadsQuery = ( - "select r.uploadId, r.content.filename " + "select distinct r.uploadId, r.content.filename " + "from r " + "where r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and " + "r.stageInfo.action = 'metadata-verify' and " @@ -184,7 +181,7 @@ class UploadStatsLoader: CosmosLoader() { else listOf() - return undeliveredUploads + undeliveredUploadsForBlobFileCopy + return undeliveredUploads }catch (e: ContentException) { logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) @@ -199,19 +196,6 @@ class UploadStatsLoader: CosmosLoader() { } } -/* - @Throws(ContentException::class, BadRequestException::class) - private fun getUndeliveredUploads(dataStreamId: String, dataStreamRoute: String, timeRangeWhereClause: String): List { - return getUploadsWithIssues( - dataStreamId, - dataStreamRoute, - timeRangeWhereClause, - expectedAction = "upload-completed", - missingAction = "blob-file-copy" - ) - } -*/ - /** * Searches the reports by uploadId to find undelivered uploads. @@ -222,66 +206,63 @@ class UploadStatsLoader: CosmosLoader() { * @return A list of [UnDeliveredUpload] objects representing the undelivered uploads. * @throws BadRequestException If an error occurs while fetching the undelivered uploads. */ -/* @Throws(ContentException::class, BadRequestException :: class) + @Throws(ContentException::class, BadRequestException::class) private fun getPendingUploads(dataStreamId: String, dataStreamRoute: String, timeRangeWhereClause: String): List { - try{ - //Get the unmatched uploadIds for all the items without an associated item for blob-file-copy - val pendingUploadIdsForBlobFileCopy = getUploadIdsWithIssues(dataStreamId, dataStreamRoute, "metadata-verify", "upload-completed", timeRangeWhereClause) + return getUploadsWithIssues(dataStreamId, dataStreamRoute, timeRangeWhereClause, "metadata-verify", "upload-completed") + } - //All the uploadIds that can be categorized as undelivered - val quotedIds = pendingUploadIdsForBlobFileCopy.joinToString("\",\"", "\"", "\"") - //Query to get the respective filenames for the above uploadIds with the select criteria - val unDeliveredUploadsQuery = ( - "select r.uploadId, r.content.filename " - + "from r " - + "where r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and " - + "r.stageInfo.action = 'metadata-verify' and " - + "r.uploadId in ($quotedIds) and " - + "$timeRangeWhereClause " - ) - logger.info("UploadsStats, fetch all undelivered uploads query = $unDeliveredUploadsQuery") + @Throws(ContentException::class, BadRequestException::class) + private fun getUploadsWithIssues( + dataStreamId: String, + dataStreamRoute: String, + timeRangeWhereClause: String, + expectedAction: String, + missingAction: String + ): List { + try { + // First, get the uploadIds with issues + val uploadIdsWithIssues = getUploadIdsWithIssues(dataStreamId, dataStreamRoute, expectedAction, missingAction, timeRangeWhereClause) - val unDeliveredUploadsResult = reportsContainer?.queryItems( - unDeliveredUploadsQuery, CosmosQueryRequestOptions(), - UnDeliveredUpload::class.java - ) + if (uploadIdsWithIssues.isEmpty()) { + return emptyList() + } - val undeliveredUploads = - if (unDeliveredUploadsResult != null && unDeliveredUploadsResult.count() > 0) - unDeliveredUploadsResult.toList() - else - listOf() + // Then, fetch only the necessary metadata for these specific uploadIds + val quotedIds = uploadIdsWithIssues.joinToString("\",\"", "\"", "\"") - return undeliveredUploads + val uploadsWithIssuesQuery = buildUploadsWithIssuesQuery(dataStreamId, dataStreamRoute, quotedIds, timeRangeWhereClause) + logger.info("UploadsStats, uploadsWithIssuesQuery query = $uploadsWithIssuesQuery") - }catch (e: ContentException) { - logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) - throw ContentException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") + val metadataResult = reportsContainer?.queryItems( + uploadsWithIssuesQuery, + CosmosQueryRequestOptions(), + UnDeliveredUpload::class.java + )?.toList() ?: emptyList() - } catch (e: BadRequestException) { - logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) - throw BadRequestException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") - }catch (e: Exception) { - logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) - throw BadRequestException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") - } - }*/ + // Create a map for quick lookup + val metadataMap = metadataResult.associateBy { it.uploadId } - @Throws(ContentException::class, BadRequestException::class) - private fun getPendingUploads(dataStreamId: String, dataStreamRoute: String, timeRangeWhereClause: String): List { - return getUploadsWithIssues( - dataStreamId, - dataStreamRoute, - timeRangeWhereClause, - expectedAction = "metadata-verify", - missingAction = "upload-completed" - ) + // Construct the final list of UnDeliveredUpload objects + return uploadIdsWithIssues.map { uploadId -> + metadataMap[uploadId] ?: UnDeliveredUpload(uploadId, null) + } + + } catch (e: Exception) { + logger.error("Error fetching uploads with issues: ${e.message}", e) + throw when (e) { + is ContentException, is BadRequestException -> e + else -> BadRequestException("Error fetching uploads with issues: ${e.message}") + } + } } - @Throws(ContentException::class, BadRequestException::class) - private fun getUploadsWithIssues( + + + +/* @Throws(ContentException::class, BadRequestException::class) + private fun getUploadsWithIssues2( dataStreamId: String, dataStreamRoute: String, timeRangeWhereClause: String, @@ -317,7 +298,7 @@ class UploadStatsLoader: CosmosLoader() { logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) throw BadRequestException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") } - } + }*/ /** @@ -479,6 +460,7 @@ class UploadStatsLoader: CosmosLoader() { + "where IS_DEFINED(r.content.content_schema_name) and " + "r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and " + "r.content.content_schema_name = 'blob-file-copy' and " + + "r.stageInfo.action = 'blob-file-copy' and " + "r.stageInfo.status = 'FAILURE' and " + "$timeRangeWhereClause " ) @@ -487,7 +469,7 @@ class UploadStatsLoader: CosmosLoader() { private fun buildUploadsWithIssuesQuery(dataStreamId: String, dataStreamRoute: String, quotedIds: String, timeRangeWhereClause: String): String { return """ - SELECT r.uploadId, r.content.filename + SELECT distinct r.uploadId, r.content.filename FROM r WHERE r.dataStreamId = '$dataStreamId' AND r.dataStreamRoute = '$dataStreamRoute' @@ -496,6 +478,5 @@ class UploadStatsLoader: CosmosLoader() { AND $timeRangeWhereClause """.trimIndent() } - } From 61dc2d0f34a7d2cf3b9733c6776ced48a1b9e58e Mon Sep 17 00:00:00 2001 From: VZE3 Date: Tue, 1 Oct 2024 07:17:07 -0400 Subject: [PATCH 11/11] Clean up --- .../loaders/UploadStatsLoader.kt | 131 +++++------------- 1 file changed, 34 insertions(+), 97 deletions(-) diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt index f9c82423..4fe5942a 100644 --- a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/loaders/UploadStatsLoader.kt @@ -147,40 +147,25 @@ class UploadStatsLoader: CosmosLoader() { @Throws(ContentException::class, BadRequestException :: class) private fun getUndeliveredUploads(dataStreamId: String, dataStreamRoute: String, timeRangeWhereClause: String): List { try{ - //Get the unmatched uploadIds for all the items without an associated item for blob-file-copy + //Get the uploadIds for each upload with 'upload-completed' and no report for blob-file-copy val undeliveredUploadIdsForBlobFileCopy = getUploadIdsWithIssues(dataStreamId, dataStreamRoute, "upload-completed", "blob-file-copy", timeRangeWhereClause) - //Build Query - Get all the uploads with an item for blob-file-copy and status of failure + //Query - Get all the uploads with an item for blob-file-copy and status of failure val unDeliveredUploadIdsQuery = buildBlobFileCopyFailureQuery(dataStreamId, dataStreamRoute, timeRangeWhereClause) logger.info("UploadsStats, fetch uploadIds query = $unDeliveredUploadIdsQuery") val uploadsWithFailures = executeUndeliveredUploadsQuery(unDeliveredUploadIdsQuery) //All the uploadIds that can be categorized as undelivered + // (Any upload id where an upload-completed report exists but not a blob-file-copy report + // or if there is the least one report with blob-file-copy report and the status indicates failure.) val uploadIds = uploadsWithFailures + undeliveredUploadIdsForBlobFileCopy val quotedIds = uploadIds.joinToString("\",\"", "\"", "\"") //Query to get the respective filenames for the above uploadIds with the select criteria - val unDeliveredUploadsQuery = ( - "select distinct r.uploadId, r.content.filename " - + "from r " - + "where r.dataStreamId = '$dataStreamId' and r.dataStreamRoute = '$dataStreamRoute' and " - + "r.stageInfo.action = 'metadata-verify' and " - + "r.uploadId in ($quotedIds) and " - + "$timeRangeWhereClause " - ) + val unDeliveredUploadsQuery = buildUploadsQuery(dataStreamId, dataStreamRoute, quotedIds, timeRangeWhereClause) logger.info("UploadsStats, fetch all undelivered uploads query = $unDeliveredUploadsQuery") - val unDeliveredUploadsResult = reportsContainer?.queryItems( - unDeliveredUploadsQuery, CosmosQueryRequestOptions(), - UnDeliveredUpload::class.java - ) - - val undeliveredUploads = - if (unDeliveredUploadsResult != null && unDeliveredUploadsResult.count() > 0) - unDeliveredUploadsResult.toList() - else - listOf() - + val undeliveredUploads = executeUploadsQuery(unDeliveredUploadsQuery) return undeliveredUploads }catch (e: ContentException) { @@ -208,47 +193,24 @@ class UploadStatsLoader: CosmosLoader() { */ @Throws(ContentException::class, BadRequestException::class) private fun getPendingUploads(dataStreamId: String, dataStreamRoute: String, timeRangeWhereClause: String): List { - return getUploadsWithIssues(dataStreamId, dataStreamRoute, timeRangeWhereClause, "metadata-verify", "upload-completed") - } - - - @Throws(ContentException::class, BadRequestException::class) - private fun getUploadsWithIssues( - dataStreamId: String, - dataStreamRoute: String, - timeRangeWhereClause: String, - expectedAction: String, - missingAction: String - ): List { - try { - // First, get the uploadIds with issues - val uploadIdsWithIssues = getUploadIdsWithIssues(dataStreamId, dataStreamRoute, expectedAction, missingAction, timeRangeWhereClause) + try{ + //Get the uploadIds for each upload with 'metadata-verify' and no report for 'upload-completed' + val pendingUploadIdsForBlobFileCopy = getUploadIdsWithIssues(dataStreamId, dataStreamRoute, "metadata-verify", "upload-completed", timeRangeWhereClause) - if (uploadIdsWithIssues.isEmpty()) { + if (pendingUploadIdsForBlobFileCopy.isEmpty()) { return emptyList() } // Then, fetch only the necessary metadata for these specific uploadIds - val quotedIds = uploadIdsWithIssues.joinToString("\",\"", "\"", "\"") + val quotedIds = pendingUploadIdsForBlobFileCopy.joinToString("\",\"", "\"", "\"") - val uploadsWithIssuesQuery = buildUploadsWithIssuesQuery(dataStreamId, dataStreamRoute, quotedIds, timeRangeWhereClause) + val uploadsWithIssuesQuery = buildUploadsQuery(dataStreamId, dataStreamRoute, quotedIds, timeRangeWhereClause) logger.info("UploadsStats, uploadsWithIssuesQuery query = $uploadsWithIssuesQuery") - val metadataResult = reportsContainer?.queryItems( - uploadsWithIssuesQuery, - CosmosQueryRequestOptions(), - UnDeliveredUpload::class.java - )?.toList() ?: emptyList() - - // Create a map for quick lookup - val metadataMap = metadataResult.associateBy { it.uploadId } + val uploadsWithIssues = executeUploadsQuery(uploadsWithIssuesQuery) + return uploadsWithIssues - // Construct the final list of UnDeliveredUpload objects - return uploadIdsWithIssues.map { uploadId -> - metadataMap[uploadId] ?: UnDeliveredUpload(uploadId, null) - } - - } catch (e: Exception) { + }catch (e: Exception) { logger.error("Error fetching uploads with issues: ${e.message}", e) throw when (e) { is ContentException, is BadRequestException -> e @@ -258,49 +220,6 @@ class UploadStatsLoader: CosmosLoader() { } - - - -/* @Throws(ContentException::class, BadRequestException::class) - private fun getUploadsWithIssues2( - dataStreamId: String, - dataStreamRoute: String, - timeRangeWhereClause: String, - expectedAction: String, - missingAction: String - ): List { - try { - val uploadIdsWithIssues = getUploadIdsWithIssues(dataStreamId, dataStreamRoute, expectedAction, missingAction, timeRangeWhereClause) - - if (uploadIdsWithIssues.isEmpty()) { - return emptyList() - } - - val quotedIds = uploadIdsWithIssues.joinToString("\",\"", "\"", "\"") - val uploadsWithIssuesQuery = buildUploadsWithIssuesQuery(dataStreamId, dataStreamRoute, quotedIds, timeRangeWhereClause) - - logger.info("UploadsStats, fetch uploads with issues query = $uploadsWithIssuesQuery") - - return reportsContainer?.queryItems( - uploadsWithIssuesQuery, - CosmosQueryRequestOptions(), - UnDeliveredUpload::class.java - )?.toList() ?: emptyList() - - }catch (e: ContentException) { - logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) - throw ContentException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") - - }catch (e: BadRequestException) { - logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) - throw BadRequestException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") - }catch (e: Exception) { - logger.error("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}", e) - throw BadRequestException("Error fetching undelivered upload IDs for blob-file-copy: ${e.message}") - } - }*/ - - /** * Fetches the unmatched uploadIds for all the items without an associated item for blob-file-copy. * @@ -467,7 +386,7 @@ class UploadStatsLoader: CosmosLoader() { } - private fun buildUploadsWithIssuesQuery(dataStreamId: String, dataStreamRoute: String, quotedIds: String, timeRangeWhereClause: String): String { + private fun buildUploadsQuery(dataStreamId: String, dataStreamRoute: String, quotedIds: String, timeRangeWhereClause: String): String { return """ SELECT distinct r.uploadId, r.content.filename FROM r @@ -478,5 +397,23 @@ class UploadStatsLoader: CosmosLoader() { AND $timeRangeWhereClause """.trimIndent() } + + /** + * Executes an undelivered upload query and returns the set of uploadIds. + * + * @param query The SQL query to execute. + * @return A set of undelivered uploadIds. + * @throws ContentException If an error occurs while executing the undelivered upload query. + */ + @Throws(ContentException::class) + private fun executeUploadsQuery(query: String): List { + try{ + return reportsContainer?.queryItems(query, CosmosQueryRequestOptions(), UnDeliveredUpload::class.java) + ?.toList() ?: emptyList() + }catch (e: ContentException) { + logger.error("Error executing undelivered uploads counts query: ${e.message}", e) + throw ContentException("Error executing undelivered uploads counts query: ${e.message}") + } + } }