diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/mutations/DataStreamTopErrorsNotificationSubscriptionMutationService.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/mutations/DataStreamTopErrorsNotificationSubscriptionMutationService.kt new file mode 100644 index 00000000..b8533b37 --- /dev/null +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/mutations/DataStreamTopErrorsNotificationSubscriptionMutationService.kt @@ -0,0 +1,141 @@ +package gov.cdc.ocio.processingstatusapi.mutations + +import com.expediagroup.graphql.generator.annotations.GraphQLDescription +import com.expediagroup.graphql.server.operations.Mutation +import gov.cdc.ocio.processingstatusapi.mutations.models.NotificationSubscriptionResult +import gov.cdc.ocio.processingstatusapi.mutations.response.SubscriptionResponse +import io.ktor.client.* +import io.ktor.client.plugins.* +import io.ktor.client.request.* +import io.ktor.client.plugins.contentnegotiation.* +import io.ktor.client.plugins.logging.* +import io.ktor.serialization.kotlinx.json.* +import io.ktor.http.* +import kotlinx.coroutines.runBlocking +import kotlinx.serialization.Serializable + + +/** + * DataStream Subscription for digest counts and top5 errors data class which is serialized back and forth which is in turn subscribed in to the MemCache + * @param dataStreamId String + * @param dataStreamRoute String + * @param jurisdiction String + * @param daysToRun List + * @param deliveryReference String + */ +@Serializable +data class DataStreamTopErrorsNotificationSubscription( val dataStreamId: String, + val dataStreamRoute: String, + val jurisdiction: String, + val daysToRun: List, + val timeToRun: String, + val deliveryReference: String) + +/** + * DataStream UnSubscription data class which is serialized back and forth which is in turn used for unsubscribing from the db for digest counts and the top errors and their + * frequency within an upload + * @param subscriptionId + */ +@Serializable +data class DataStreamTopErrorsNotificationUnSubscription(val subscriptionId:String) + +/** + * The graphQL mutation class for dataStream Subscription for digest counts and top5 errors and their frequencies + */ + +class DataStreamTopErrorsNotificationSubscriptionMutationService : Mutation { + private val dataStreamTopErrorsNotificationSubscriptionUrl: String = System.getenv("PSTATUS_WORKFLOW_NOTIFICATIONS_BASE_URL") + private val serviceUnavailable = + "DeadlineCheckSubscription service is unavailable and no connection has been established. Make sure the service is running" + private val client = HttpClient { + install(ContentNegotiation) { + json() + } + install(Logging) { + logger = Logger.DEFAULT + level = LogLevel.INFO + } + install(HttpTimeout) { + requestTimeoutMillis = 10000 + connectTimeoutMillis = 10000 + socketTimeoutMillis = 10000 + } + } + + /** + * The mutation function which invokes the data stream top errors and digest counts microservice route to subscribe + * @param dataStreamId String + * @param dataStreamRoute String + * @param jurisdiction String + * @param daysToRun List + * @param deliveryReference String + */ + + @GraphQLDescription("Subscribe data stream top errors lets you subscribe to get notifications for top data stream errors and its frequency during an upload") + @Suppress("unused") + fun subscribeDataStreamTopErrorsNotification( + dataStreamId: String, + dataStreamRoute: String, + jurisdiction: String, + daysToRun: List, + timeToRun: String, + deliveryReference: String + ): NotificationSubscriptionResult { + val url = "$dataStreamTopErrorsNotificationSubscriptionUrl/subscribe/dataStreamTopErrorsNotification" + + return runBlocking { + try { + val response = client.post(url) { + contentType(ContentType.Application.Json) + setBody( + DataStreamTopErrorsNotificationSubscription( + dataStreamId, + dataStreamRoute, + jurisdiction, + daysToRun, + timeToRun, + deliveryReference + ) + ) + } + return@runBlocking SubscriptionResponse.ProcessNotificationResponse(response) + } catch (e: Exception) { + if (e.message!!.contains("Status:")) { + SubscriptionResponse.ProcessErrorCodes(url, e, null) + } + throw Exception(serviceUnavailable) + } + } + } + + /** + * The mutation function which invokes the data stream top errors and digest counts microservice route to unsubscribe + * @param subscriptionId String + */ + + @GraphQLDescription("UnSubscribe data stream top errors lets you unsubscribe from getting notifications for top data stream errors and its frequency during an upload") + @Suppress("unused") + fun unsubscribesDataStreamTopErrorsNotification( + subscriptionId: String + ): NotificationSubscriptionResult { + val url = "$dataStreamTopErrorsNotificationSubscriptionUrl/unsubscribe/dataStreamTopErrorsNotification" + + return runBlocking { + try { + val response = client.post(url) { + contentType(ContentType.Application.Json) + setBody( + DataStreamTopErrorsNotificationUnSubscription(subscriptionId) + ) + } + return@runBlocking SubscriptionResponse.ProcessNotificationResponse(response) + } catch (e: Exception) { + if (e.message!!.contains("Status:")) { + SubscriptionResponse.ProcessErrorCodes(url, e, null) + } + throw Exception(serviceUnavailable) + } + } + } + +} \ No newline at end of file diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/mutations/DeadlineCheckNotificationSubscriptionMutationService.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/mutations/DeadlineCheckNotificationSubscriptionMutationService.kt new file mode 100644 index 00000000..8499ed53 --- /dev/null +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/mutations/DeadlineCheckNotificationSubscriptionMutationService.kt @@ -0,0 +1,140 @@ +package gov.cdc.ocio.processingstatusapi.mutations + +import gov.cdc.ocio.processingstatusapi.mutations.models.NotificationSubscriptionResult +import com.expediagroup.graphql.generator.annotations.GraphQLDescription +import com.expediagroup.graphql.server.operations.Mutation +import gov.cdc.ocio.processingstatusapi.mutations.response.SubscriptionResponse +import io.ktor.client.* +import io.ktor.client.plugins.* +import io.ktor.client.request.* +import io.ktor.client.plugins.contentnegotiation.* +import io.ktor.client.plugins.logging.* +import io.ktor.serialization.kotlinx.json.* +import io.ktor.http.* +import kotlinx.coroutines.runBlocking +import kotlinx.serialization.Serializable + + +/** + * Deadline Check Subscription data class which is serialized back and forth which is in turn subscribed in to the MemCache + * @param dataStreamId String + * @param dataStreamRoute String + * @param jurisdiction String + * @param daysToRun List + * @param deliveryReference String + */ +@Serializable +data class DeadlineCheckSubscription( val dataStreamId: String, + val dataStreamRoute: String, + val jurisdiction: String, + val daysToRun: List, + val timeToRun: String, + val deliveryReference: String) + +/** + * Deadline check unSubscription data class which is serialized back and forth which is in turn used for unsubscribing from the cache for emails and webhooks using the given subscriberId + * @param subscriptionId + */ +@Serializable +data class DeadlineCheckUnSubscription(val subscriptionId:String) + +/** + * The graphQL mutation service class for deadline check subscription/unSubscription + */ + +class DeadlineCheckSubscriptionMutationService : Mutation { + private val deadlineCheckSubscriptionUrl: String = System.getenv("PSTATUS_WORKFLOW_NOTIFICATIONS_BASE_URL") + private val serviceUnavailable = + "DeadlineCheckSubscription service is unavailable and no connection has been established. Make sure the service is running" + private val client = HttpClient { + install(ContentNegotiation) { + json() + } + install(Logging) { + logger = Logger.DEFAULT + level = LogLevel.INFO + } + install(HttpTimeout) { + requestTimeoutMillis = 10000 + connectTimeoutMillis = 10000 + socketTimeoutMillis = 10000 + } + } + + /** + * The mutation function which invokes the upload deadline check microservice route to subscribe + * @param dataStreamId String + * @param dataStreamRoute String + * @param jurisdiction String + * @param daysToRun List + * @param deliveryReference String + */ + + @GraphQLDescription("Subscribe Deadline Check lets you get notifications when an upload from jurisdictions has not happened by 12pm") + @Suppress("unused") + fun subscribeDeadlineCheck( + dataStreamId: String, + dataStreamRoute: String, + jurisdiction: String, + daysToRun: List, + timeToRun: String, + deliveryReference: String + ): NotificationSubscriptionResult { + val url = "$deadlineCheckSubscriptionUrl/subscribe/deadlineCheck" + + return runBlocking { + try { + val response = client.post(url) { + contentType(ContentType.Application.Json) + setBody( + DeadlineCheckSubscription( + dataStreamId, + dataStreamRoute, + jurisdiction, + daysToRun, + timeToRun, + deliveryReference + ) + ) + } + return@runBlocking SubscriptionResponse.ProcessNotificationResponse(response) + } catch (e: Exception) { + if (e.message!!.contains("Status:")) { + SubscriptionResponse.ProcessErrorCodes(url, e, null) + } + throw Exception(serviceUnavailable) + } + } + } + + /** + * The mutation function which invokes the upload deadline check microservice route to unsubscribe + * @param subscriptionId String + */ + + @GraphQLDescription("UnSubscribe Deadline Check lets you unsubscribe from getting notifications when an upload from jurisdictions has not happened by 12pm") + @Suppress("unused") + fun unsubscribeDeadlineCheck( + subscriptionId: String + ): NotificationSubscriptionResult { + val url = "$deadlineCheckSubscriptionUrl/unsubscribe/deadlineCheck" + + return runBlocking { + try { + val response = client.post(url) { + contentType(ContentType.Application.Json) + setBody( + DeadlineCheckUnSubscription(subscriptionId) + ) + } + return@runBlocking SubscriptionResponse.ProcessNotificationResponse(response) + } catch (e: Exception) { + if (e.message!!.contains("Status:")) { + SubscriptionResponse.ProcessErrorCodes(url, e, null) + } + throw Exception(serviceUnavailable) + } + } + } + +} \ No newline at end of file diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/mutations/NotificationsMutationService.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/mutations/NotificationsMutationService.kt index 01b71fea..f7dc318e 100644 --- a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/mutations/NotificationsMutationService.kt +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/mutations/NotificationsMutationService.kt @@ -59,7 +59,7 @@ data class SubscriptionResult( */ class NotificationsMutationService : Mutation { - private val notificationsRouteBaseUrl: String =System.getenv("PSTATUS_NOTIFICATIONS_BASE_URL") + private val notificationsRouteBaseUrl: String =System.getenv("PSTATUS_WORKFLOW_NOTIFICATIONS_BASE_URL") private val serviceUnavailable ="Notification service is unavailable and no connection has been established. Make sure the service is running" private val client = HttpClient { install(ContentNegotiation) { diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/mutations/UploadErrorsNotificationSubscriptionMutationService.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/mutations/UploadErrorsNotificationSubscriptionMutationService.kt new file mode 100644 index 00000000..32140828 --- /dev/null +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/mutations/UploadErrorsNotificationSubscriptionMutationService.kt @@ -0,0 +1,146 @@ +package gov.cdc.ocio.processingstatusapi.mutations + +import com.expediagroup.graphql.generator.annotations.GraphQLDescription +import com.expediagroup.graphql.server.operations.Mutation +import gov.cdc.ocio.processingstatusapi.mutations.models.NotificationSubscriptionResult +import gov.cdc.ocio.processingstatusapi.mutations.response.SubscriptionResponse +import io.ktor.client.* +import io.ktor.client.plugins.* +import io.ktor.client.request.* +import io.ktor.client.plugins.contentnegotiation.* +import io.ktor.client.plugins.logging.* +import io.ktor.serialization.kotlinx.json.* +import io.ktor.http.* +import kotlinx.coroutines.runBlocking +import kotlinx.serialization.Serializable + + +/** + * Upload errors subscription data class which is serialized back and forth which is in turn subscribed in to the MemCache + * @param dataStreamId String + * @param dataStreamRoute String + * @param jurisdiction String + * @param daysToRun List + * @param deliveryReference String + */ +@Serializable +data class UploadErrorsNotificationSubscription( val dataStreamId: String, + val dataStreamRoute: String, + val jurisdiction: String, + val daysToRun: List, + val timeToRun: String, + val deliveryReference: String) + +/** + * Upload errors unSubscription data class which is serialized back and forth which is in turn used for unsubscribing from the cache for emails and webhooks using the given subscriberId + * @param subscriptionId + */ +@Serializable +data class UploadErrorsNotificationUnSubscription(val subscriptionId:String) + + +/** + * The graphQL mutation service class for upload errors notification subscription/unSubscription + */ + +class UploadErrorsNotificationSubscriptionMutationService : Mutation { + private val uploadErrorsNotificationSubscriptionUrl: String = System.getenv("PSTATUS_WORKFLOW_NOTIFICATIONS_BASE_URL") + private val serviceUnavailable = + "DeadlineCheckSubscription service is unavailable and no connection has been established. Make sure the service is running" + private val client = HttpClient { + install(ContentNegotiation) { + json() + } + install(Logging) { + logger = Logger.DEFAULT + level = LogLevel.INFO + } + install(HttpTimeout) { + requestTimeoutMillis = 10000 + connectTimeoutMillis = 10000 + socketTimeoutMillis = 10000 + } + } + + /** + * The mutation function which invokes the upload errors notification microservice route to subscribe to it + * @param dataStreamId String + * @param dataStreamRoute String + * @param jurisdiction String + * @param daysToRun List + * @param deliveryReference String + */ + + @GraphQLDescription("Subscribe upload errors lets you get notifications when there are errors in an upload") + @Suppress("unused") + fun subscribeUploadErrorsNotification( + dataStreamId: String, + dataStreamRoute: String, + jurisdiction: String, + daysToRun: List, + timeToRun: String, + deliveryReference: String + ): NotificationSubscriptionResult { + val url = "$uploadErrorsNotificationSubscriptionUrl/subscribe/uploadErrorsNotification" + + return runBlocking { + try { + val response = client.post(url) { + contentType(ContentType.Application.Json) + setBody( + UploadErrorsNotificationSubscription( + dataStreamId, + dataStreamRoute, + jurisdiction, + daysToRun, + timeToRun, + deliveryReference + ) + ) + } + return@runBlocking SubscriptionResponse.ProcessNotificationResponse(response) + } catch (e: Exception) { + if (e.message!!.contains("Status:")) { + SubscriptionResponse.ProcessErrorCodes(url, e, null) + } + throw Exception(serviceUnavailable) + } + } + } + + /** + * The mutation function which invokes the upload errors in the upload microservice route to unsubscribe + * @param subscriptionId String + */ + + @GraphQLDescription("UnSubscribe upload errors lets you unsubscribe from getting notifications when there are errors during an upload") + @Suppress("unused") + fun unsubscribeUploadErrorsNotification( + subscriptionId: String + ): NotificationSubscriptionResult { + val url = "$uploadErrorsNotificationSubscriptionUrl/unsubscribe/uploadErrorsNotification" + + return runBlocking { + try { + val response = client.post(url) { + contentType(ContentType.Application.Json) + setBody( + UploadErrorsNotificationUnSubscription(subscriptionId) + ) + } + return@runBlocking SubscriptionResponse.ProcessNotificationResponse(response) + } catch (e: Exception) { + if (e.message!!.contains("Status:")) { + SubscriptionResponse.ProcessErrorCodes(url, e, null) + } + throw Exception(serviceUnavailable) + } + } + } + + + + + + +} \ No newline at end of file diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/mutations/models/NotificationSubscriptionResult.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/mutations/models/NotificationSubscriptionResult.kt new file mode 100644 index 00000000..6a0ddaa6 --- /dev/null +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/mutations/models/NotificationSubscriptionResult.kt @@ -0,0 +1,17 @@ +package gov.cdc.ocio.processingstatusapi.mutations.models + +import kotlinx.serialization.Serializable + +/** + * NotificationSubscriptionResult is the response class which is serialized back and forth which is in turn used for getting the response which contains the subscriberId , message and the status of subscribe/unsubscribe operations + * @param subscriptionId + * @param message + * @param deliveryReference + + */ +@Serializable +data class NotificationSubscriptionResult( + var subscriptionId: String? = null, + var message: String? = "", + var deliveryReference:String +) diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/mutations/response/Response.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/mutations/response/Response.kt new file mode 100644 index 00000000..c9cde9a2 --- /dev/null +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/mutations/response/Response.kt @@ -0,0 +1,43 @@ +package gov.cdc.ocio.processingstatusapi.mutations.response + + +import gov.cdc.ocio.processingstatusapi.mutations.models.NotificationSubscriptionResult +import io.ktor.client.call.* +import io.ktor.client.statement.* +import io.ktor.http.* + +object SubscriptionResponse{ + + + /** + * Function to process the http response coming from notifications service + * @param response HttpResponse + */ + @JvmStatic + suspend fun ProcessNotificationResponse(response: HttpResponse): NotificationSubscriptionResult { + if (response.status == HttpStatusCode.OK) { + return response.body() + } else { + throw Exception("Notification service is unavailable. Status:${response.status}") + } + } + + @Throws(Exception::class) + /** + * Function to process the http response codes and throw exception accordingly + * @param url String + * @param e Exception + * @param subscriptionId String? + */ + fun ProcessErrorCodes(url: String, e: Exception, subscriptionId: String?) { + val error = e.message!!.substringAfter("Status:").substringBefore(" ") + when (error) { + "500" -> throw Exception("Subscription with subscriptionId = ${subscriptionId} does not exist in the cache") + "400" -> throw Exception("Bad Request: Please check the request and retry") + "401" -> throw Exception("Unauthorized access to notifications service") + "403" -> throw Exception("Access to notifications service is forbidden") + "404" -> throw Exception("${url} not found") + else -> throw Exception(e.message) + } + } +} diff --git a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/GraphQL.kt b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/GraphQL.kt index 53d911d3..41587ab7 100644 --- a/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/GraphQL.kt +++ b/pstatus-graphql-ktor/src/main/kotlin/gov/cdc/ocio/processingstatusapi/plugins/GraphQL.kt @@ -6,7 +6,10 @@ import com.expediagroup.graphql.dataloader.KotlinDataLoaderRegistryFactory import com.expediagroup.graphql.server.ktor.* import gov.cdc.ocio.processingstatusapi.dataloaders.ReportDataLoader import gov.cdc.ocio.processingstatusapi.dataloaders.ReportDeadLetterDataLoader +import gov.cdc.ocio.processingstatusapi.mutations.DataStreamTopErrorsNotificationSubscriptionMutationService +import gov.cdc.ocio.processingstatusapi.mutations.DeadlineCheckSubscriptionMutationService import gov.cdc.ocio.processingstatusapi.mutations.NotificationsMutationService +import gov.cdc.ocio.processingstatusapi.mutations.UploadErrorsNotificationSubscriptionMutationService import gov.cdc.ocio.processingstatusapi.mutations.ReportMutation import gov.cdc.ocio.processingstatusapi.queries.* import io.ktor.http.* @@ -97,7 +100,11 @@ fun Application.graphQLModule() { ) mutations= listOf( NotificationsMutationService(), + DataStreamTopErrorsNotificationSubscriptionMutationService(), + DeadlineCheckSubscriptionMutationService(), + UploadErrorsNotificationSubscriptionMutationService() ReportMutation() + ) // subscriptions = listOf( // ErrorSubscriptionService() diff --git a/pstatus-graphql-ktor/src/main/resources/application.conf b/pstatus-graphql-ktor/src/main/resources/application.conf index 83b0bd9c..2a490716 100644 --- a/pstatus-graphql-ktor/src/main/resources/application.conf +++ b/pstatus-graphql-ktor/src/main/resources/application.conf @@ -1,6 +1,6 @@ ktor { deployment { - port = 8080 + port = 8082 host = 0.0.0.0 }