diff --git a/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/configuration/ScannerConfiguration.kt b/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/configuration/ScannerConfiguration.kt index a11a7ace61..902b98c4a2 100644 --- a/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/configuration/ScannerConfiguration.kt +++ b/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/configuration/ScannerConfiguration.kt @@ -30,10 +30,12 @@ package com.tencent.bkrepo.analyst.configuration import com.tencent.bkrepo.analysis.executor.api.ExecutorClient import com.tencent.bkrepo.analyst.dispatcher.SubtaskDispatcherFactory import com.tencent.bkrepo.analyst.dispatcher.SubtaskPoller +import com.tencent.bkrepo.analyst.event.ScanEventConsumer import com.tencent.bkrepo.analyst.service.ExecutionClusterService import com.tencent.bkrepo.analyst.service.ScannerService import com.tencent.bkrepo.analyst.service.impl.OperateLogServiceImpl import com.tencent.bkrepo.analyst.service.impl.ProjectUsageStatisticsServiceImpl +import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent import com.tencent.bkrepo.common.operate.api.OperateLogService import com.tencent.bkrepo.common.operate.api.ProjectUsageStatisticsService import com.tencent.bkrepo.common.service.condition.ConditionalOnNotAssembly @@ -45,6 +47,7 @@ import org.springframework.cloud.loadbalancer.annotation.LoadBalancerClients import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor +import java.util.function.Consumer @Configuration(proxyBeanMethods = false) @EnableConfigurationProperties( @@ -84,4 +87,14 @@ class ScannerConfiguration { ): ProjectUsageStatisticsService { return ProjectUsageStatisticsServiceImpl(client) } + + + @Bean("scanEventConsumer") + fun scanEventConsumer( + scanEventConsumer: ScanEventConsumer + ): Consumer { + return Consumer { + scanEventConsumer.accept(it) + } + } } diff --git a/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/event/ScanEventConsumer.kt b/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/event/ScanEventConsumer.kt index 75246aac62..aea9aa3757 100644 --- a/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/event/ScanEventConsumer.kt +++ b/src/backend/analyst/biz-analyst/src/main/kotlin/com/tencent/bkrepo/analyst/event/ScanEventConsumer.kt @@ -56,7 +56,6 @@ import com.tencent.bkrepo.repository.pojo.packages.PackageSummary import org.slf4j.LoggerFactory import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor import org.springframework.stereotype.Component -import java.util.function.Consumer /** * 构件事件消费者,用于触发制品更新扫描 @@ -71,7 +70,7 @@ class ScanEventConsumer( private val scanPlanDao: ScanPlanDao, private val projectScanConfigurationService: ProjectScanConfigurationService, private val executor: ThreadPoolTaskExecutor -) : Consumer { +) { /** * 允许接收的事件类型 @@ -82,7 +81,7 @@ class ScanEventConsumer( EventType.VERSION_UPDATED ) - override fun accept(event: ArtifactEvent) { + fun accept(event: ArtifactEvent) { if (!acceptTypes.contains(event.type)) { return } diff --git a/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/config/FsConsumerConfig.kt b/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/config/FsConsumerConfig.kt new file mode 100644 index 0000000000..ed63fa7623 --- /dev/null +++ b/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/config/FsConsumerConfig.kt @@ -0,0 +1,52 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.tencent.bkrepo.fs.server.config + +import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent +import com.tencent.bkrepo.fs.server.listener.NodeModifyListener +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.messaging.Message +import java.util.function.Consumer + +@Configuration +class FsConsumerConfig { + + @Bean("artifactEventFs") + fun nodeModifyListener( + nodeModifyListener: NodeModifyListener + ): Consumer> { + return Consumer { + nodeModifyListener.accept(it) + } + } +} diff --git a/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/listener/NodeModifyListener.kt b/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/listener/NodeModifyListener.kt index a314c1bfad..3c3bff46f4 100644 --- a/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/listener/NodeModifyListener.kt +++ b/src/backend/fs/boot-fs-server/src/main/kotlin/com/tencent/bkrepo/fs/server/listener/NodeModifyListener.kt @@ -10,15 +10,14 @@ import org.slf4j.LoggerFactory import org.springframework.messaging.Message import org.springframework.stereotype.Component import java.util.concurrent.Executors -import java.util.function.Consumer -@Component("artifactEventFs") +@Component class NodeModifyListener( private val rRepositoryClient: RRepositoryClient, private val fileNodeService: FileNodeService -) : Consumer> { +) { - override fun accept(message: Message) { + fun accept(message: Message) { val event = message.payload val type = event.type // 覆盖创建也会先删除,再创建。所以这里只需关注删除事件即可。 diff --git a/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/config/HelmConsumerConfig.kt b/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/config/HelmConsumerConfig.kt new file mode 100644 index 0000000000..ccc4b6aa26 --- /dev/null +++ b/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/config/HelmConsumerConfig.kt @@ -0,0 +1,62 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.tencent.bkrepo.helm.config + +import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent +import com.tencent.bkrepo.helm.listener.consumer.PackageReplicationEventConsumer +import com.tencent.bkrepo.helm.listener.consumer.RemoteRepoEventConsumer +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.messaging.Message +import java.util.function.Consumer + +@Configuration +class HelmConsumerConfig { + + @Bean("packageReplication") + fun packageReplicationEventConsumer( + packageReplicationEventConsumer: PackageReplicationEventConsumer + ): Consumer> { + return Consumer { + packageReplicationEventConsumer.accept(it) + } + } + + @Bean("remoteRepo") + fun remoteRepoEventConsumer( + remoteRepoEventConsumer: RemoteRepoEventConsumer + ): Consumer> { + return Consumer { + remoteRepoEventConsumer.accept(it) + } + } +} diff --git a/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/listener/consumer/PackageReplicationEventConsumer.kt b/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/listener/consumer/PackageReplicationEventConsumer.kt index 25a9487a51..3f1e95a12f 100644 --- a/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/listener/consumer/PackageReplicationEventConsumer.kt +++ b/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/listener/consumer/PackageReplicationEventConsumer.kt @@ -33,16 +33,15 @@ import com.tencent.bkrepo.helm.listener.base.RemoteEventJobExecutor import org.slf4j.LoggerFactory import org.springframework.messaging.Message import org.springframework.stereotype.Component -import java.util.function.Consumer /** * 消费基于MQ传递的事件 * 消费分发同步的Package, 用于更新index文件 */ -@Component("packageReplication") +@Component class PackageReplicationEventConsumer( private val remoteEventJobExecutor: RemoteEventJobExecutor -) : Consumer> { +) { /** * 允许接收的事件类型 @@ -52,7 +51,7 @@ class PackageReplicationEventConsumer( EventType.VERSION_UPDATED, ) - override fun accept(message: Message) { + fun accept(message: Message) { if (!acceptTypes.contains(message.payload.type)) { return } diff --git a/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/listener/consumer/RemoteRepoEventConsumer.kt b/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/listener/consumer/RemoteRepoEventConsumer.kt index cf42ef9e5b..97a9df2e1a 100644 --- a/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/listener/consumer/RemoteRepoEventConsumer.kt +++ b/src/backend/helm/biz-helm/src/main/kotlin/com/tencent/bkrepo/helm/listener/consumer/RemoteRepoEventConsumer.kt @@ -30,7 +30,6 @@ package com.tencent.bkrepo.helm.listener.consumer import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent import com.tencent.bkrepo.common.artifact.event.base.EventType import com.tencent.bkrepo.helm.listener.base.RemoteEventJobExecutor -import java.util.function.Consumer import org.slf4j.LoggerFactory import org.springframework.messaging.Message import org.springframework.stereotype.Component @@ -39,10 +38,10 @@ import org.springframework.stereotype.Component * 构件事件消费者,用于实时同步 * 对应destination为对应ArtifactEvent.topic */ -@Component("remoteRepo") +@Component class RemoteRepoEventConsumer( private val remoteEventJobExecutor: RemoteEventJobExecutor -) : Consumer> { +) { /** * 允许接收的事件类型 @@ -53,7 +52,7 @@ class RemoteRepoEventConsumer( EventType.REPO_REFRESHED ) - override fun accept(message: Message) { + fun accept(message: Message) { if (!acceptTypes.contains(message.payload.type)) { return } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/JobConfig.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/JobConfig.kt index d8a1a0fef8..b0edcd330d 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/JobConfig.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/JobConfig.kt @@ -27,14 +27,18 @@ package com.tencent.bkrepo.job.config +import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent import com.tencent.bkrepo.job.executor.BlockThreadPoolTaskExecutorDecorator import com.tencent.bkrepo.job.migrate.config.MigrateRepoStorageProperties import com.tencent.bkrepo.job.separation.config.DataSeparationConfig +import com.tencent.bkrepo.job.separation.listener.SeparationRecoveryEventConsumer import org.springframework.boot.autoconfigure.task.TaskExecutionProperties import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration +import org.springframework.messaging.Message import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor +import java.util.function.Consumer /** * Job配置 @@ -57,4 +61,13 @@ class JobConfig { Runtime.getRuntime().availableProcessors() ) } + + @Bean("separationRecovery") + fun separationRecoveryEventConsumer( + separationRecoveryEventConsumer: SeparationRecoveryEventConsumer + ): Consumer> { + return Consumer { + separationRecoveryEventConsumer.accept(it) + } + } } diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/separation/listener/SeparationRecoveryEventConsumer.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/separation/listener/SeparationRecoveryEventConsumer.kt index e37c88fe9a..6882969ddc 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/separation/listener/SeparationRecoveryEventConsumer.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/separation/listener/SeparationRecoveryEventConsumer.kt @@ -49,19 +49,18 @@ import org.slf4j.LoggerFactory import org.springframework.messaging.Message import org.springframework.stereotype.Component import java.time.format.DateTimeFormatter -import java.util.function.Consumer /** * 消费降冷自动恢复事件 */ -@Component("separationRecovery") +@Component class SeparationRecoveryEventConsumer( private val separationTaskService: SeparationTaskService, private val dataSeparationConfig: DataSeparationConfig, private val separationPackageDao: SeparationPackageDao, private val separationPackageVersionDao: SeparationPackageVersionDao, private val separationNodeDao: SeparationNodeDao -) : Consumer> { +) { /** * 允许接收的事件类型 @@ -70,7 +69,7 @@ class SeparationRecoveryEventConsumer( EventType.NODE_SEPARATION_RECOVERY, ) - override fun accept(message: Message) { + fun accept(message: Message) { if (!dataSeparationConfig.enableAutoRecovery) return if (!acceptTypes.contains(message.payload.type)) { return diff --git a/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/config/OciConsumerConfig.kt b/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/config/OciConsumerConfig.kt new file mode 100644 index 0000000000..2c9470cff8 --- /dev/null +++ b/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/config/OciConsumerConfig.kt @@ -0,0 +1,63 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.tencent.bkrepo.oci.config + +import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent +import com.tencent.bkrepo.oci.listener.consumer.RemoteImageRepoEventConsumer +import com.tencent.bkrepo.oci.listener.consumer.ThirdPartyReplicationEventConsumer +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.messaging.Message +import java.util.function.Consumer + +@Configuration +class OciConsumerConfig { + + // 之前继承Consumer方式框架升级后会报错,https://github.com/spring-cloud/spring-cloud-stream/issues/2704 + @Bean("remoteOciRepo") + fun remoteImageRepoEventConsumer( + remoteImageRepoEventConsumer: RemoteImageRepoEventConsumer + ): Consumer> { + return Consumer { + remoteImageRepoEventConsumer.accept(it) + } + } + + @Bean("thirdPartyReplication") + fun thirdPartyReplicationEventConsumer( + thirdPartyReplicationEventConsumer: ThirdPartyReplicationEventConsumer + ): Consumer> { + return Consumer { + thirdPartyReplicationEventConsumer.accept(it) + } + } +} diff --git a/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/listener/consumer/RemoteImageRepoEventConsumer.kt b/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/listener/consumer/RemoteImageRepoEventConsumer.kt index 753b5fd69b..638da05dfb 100644 --- a/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/listener/consumer/RemoteImageRepoEventConsumer.kt +++ b/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/listener/consumer/RemoteImageRepoEventConsumer.kt @@ -33,16 +33,15 @@ import com.tencent.bkrepo.oci.listener.base.EventExecutor import org.slf4j.LoggerFactory import org.springframework.messaging.Message import org.springframework.stereotype.Component -import java.util.function.Consumer /** * 构件事件消费者,用于实时同步 * 对应destination为对应ArtifactEvent.topic */ -@Component("remoteOciRepo") +@Component class RemoteImageRepoEventConsumer( private val eventExecutor: EventExecutor -) : Consumer>{ +) { /** * 允许接收的事件类型 @@ -53,7 +52,7 @@ class RemoteImageRepoEventConsumer( EventType.REPO_REFRESHED ) - override fun accept(message: Message) { + fun accept(message: Message) { if (!acceptTypes.contains(message.payload.type)) { return } diff --git a/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/listener/consumer/ThirdPartyReplicationEventConsumer.kt b/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/listener/consumer/ThirdPartyReplicationEventConsumer.kt index bad477b59f..4820ce423f 100644 --- a/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/listener/consumer/ThirdPartyReplicationEventConsumer.kt +++ b/src/backend/oci/biz-oci/src/main/kotlin/com/tencent/bkrepo/oci/listener/consumer/ThirdPartyReplicationEventConsumer.kt @@ -33,17 +33,16 @@ import com.tencent.bkrepo.oci.listener.base.EventExecutor import org.slf4j.LoggerFactory import org.springframework.messaging.Message import org.springframework.stereotype.Component -import java.util.function.Consumer /** * 消费基于MQ传递的事件 * 对应destination为对应ArtifactEvent.topic */ -@Component("thirdPartyReplication") +@Component class ThirdPartyReplicationEventConsumer( private val eventExecutor: EventExecutor -) : Consumer> { +) { /** * 允许接收的事件类型 @@ -52,7 +51,7 @@ class ThirdPartyReplicationEventConsumer( EventType.REPLICATION_THIRD_PARTY ) - override fun accept(message: Message) { + fun accept(message: Message) { if (!acceptTypes.contains(message.payload.type)) { return } diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/config/ReplicationConsumerConfig.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/config/ReplicationConsumerConfig.kt new file mode 100644 index 0000000000..b3efd2a1e3 --- /dev/null +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/config/ReplicationConsumerConfig.kt @@ -0,0 +1,52 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.tencent.bkrepo.replication.config + +import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent +import com.tencent.bkrepo.replication.replica.type.event.ArtifactEventConsumer +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import java.util.function.Consumer + +@Configuration +class ReplicationConsumerConfig { + + // 之前继承Consumer方式框架升级后会报错,https://github.com/spring-cloud/spring-cloud-stream/issues/2704 + @Bean("artifactEventReplication") + fun artifactEventConsumer( + artifactEventConsumer: ArtifactEventConsumer + ): Consumer { + return Consumer { + artifactEventConsumer.accept(it) + } + } +} diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/ArtifactEventConsumer.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/ArtifactEventConsumer.kt index 8f834e5de2..a261f090dd 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/ArtifactEventConsumer.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/ArtifactEventConsumer.kt @@ -38,7 +38,7 @@ import org.springframework.stereotype.Component * 构件事件消费者,用于实时同步 * 对应binding name为artifactEvent-in-0 */ -@Component("artifactEventReplication") +@Component class ArtifactEventConsumer( private val replicaTaskService: ReplicaTaskService, private val eventBasedReplicaJobExecutor: EventBasedReplicaJobExecutor diff --git a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/EventConsumer.kt b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/EventConsumer.kt index f5c63981b6..fcb1017687 100644 --- a/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/EventConsumer.kt +++ b/src/backend/replication/biz-replication/src/main/kotlin/com/tencent/bkrepo/replication/replica/type/event/EventConsumer.kt @@ -29,19 +29,19 @@ package com.tencent.bkrepo.replication.replica.type.event import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent import com.tencent.bkrepo.common.artifact.event.base.EventType -import java.util.function.Consumer + /** * 构件事件消费者,用于实时同步 * 对应binding name为artifactEvent-in-0 */ -abstract class EventConsumer : Consumer { +open class EventConsumer { /** * 允许接收的事件类型 */ open fun getAcceptTypes(): Set = emptySet() - override fun accept(message: ArtifactEvent) { + fun accept(message: ArtifactEvent) { if (!getAcceptTypes().contains(message.type)) { return } diff --git a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/config/RepositoryConsumerConfig.kt b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/config/RepositoryConsumerConfig.kt new file mode 100644 index 0000000000..e86704d91f --- /dev/null +++ b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/config/RepositoryConsumerConfig.kt @@ -0,0 +1,53 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.tencent.bkrepo.repository.config + +import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent +import com.tencent.bkrepo.repository.listener.NodeUpdateAccessDateEventListener +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.messaging.Message +import java.util.function.Consumer + +@Configuration +class RepositoryConsumerConfig { + + // 之前继承Consumer方式框架升级后会报错,https://github.com/spring-cloud/spring-cloud-stream/issues/2704 + @Bean("nodeUpdateAccessDate") + fun nodeUpdateAccessDateEventConsumer( + nodeUpdateAccessDateEventListener: NodeUpdateAccessDateEventListener + ): Consumer> { + return Consumer { + nodeUpdateAccessDateEventListener.accept(it) + } + } +} diff --git a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/listener/NodeUpdateAccessDateEventListener.kt b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/listener/NodeUpdateAccessDateEventListener.kt index 41ce9fd18f..8e1119a293 100644 --- a/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/listener/NodeUpdateAccessDateEventListener.kt +++ b/src/backend/repository/biz-repository/src/main/kotlin/com/tencent/bkrepo/repository/listener/NodeUpdateAccessDateEventListener.kt @@ -42,17 +42,16 @@ import org.springframework.messaging.Message import org.springframework.stereotype.Component import java.time.LocalDateTime import java.time.format.DateTimeFormatter -import java.util.function.Consumer /** * 消费基于MQ传递的事件去更新对应access date */ -@Component("nodeUpdateAccessDate") +@Component class NodeUpdateAccessDateEventListener( private val nodeDao: NodeDao, private val artifactEventProperties: ArtifactEventProperties, -) : Consumer> { +) { /** * 允许接收的事件类型 @@ -61,7 +60,7 @@ class NodeUpdateAccessDateEventListener( EventType.NODE_UPDATE_ACCESS_DATE, ) - override fun accept(message: Message) { + fun accept(message: Message) { if (!acceptTypes.contains(message.payload.type)) { return } diff --git a/src/backend/webhook/biz-webhook/src/main/kotlin/com/tencent/bkrepo/webhook/config/WebHookConsumerConfig.kt b/src/backend/webhook/biz-webhook/src/main/kotlin/com/tencent/bkrepo/webhook/config/WebHookConsumerConfig.kt new file mode 100644 index 0000000000..69c3fb482b --- /dev/null +++ b/src/backend/webhook/biz-webhook/src/main/kotlin/com/tencent/bkrepo/webhook/config/WebHookConsumerConfig.kt @@ -0,0 +1,52 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2024 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.tencent.bkrepo.webhook.config + +import com.tencent.bkrepo.common.artifact.event.base.ArtifactEvent +import com.tencent.bkrepo.webhook.executor.ArtifactEventConsumer +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.messaging.Message +import java.util.function.Consumer + +@Configuration +class WebHookConsumerConfig { + + @Bean("artifactEventWebhook") + fun artifactEventConsumer( + artifactEventConsumer: ArtifactEventConsumer + ): Consumer> { + return Consumer { + artifactEventConsumer.accept(it) + } + } +} \ No newline at end of file diff --git a/src/backend/webhook/biz-webhook/src/main/kotlin/com/tencent/bkrepo/webhook/executor/ArtifactEventConsumer.kt b/src/backend/webhook/biz-webhook/src/main/kotlin/com/tencent/bkrepo/webhook/executor/ArtifactEventConsumer.kt index cf1624a924..c452d61dc0 100644 --- a/src/backend/webhook/biz-webhook/src/main/kotlin/com/tencent/bkrepo/webhook/executor/ArtifactEventConsumer.kt +++ b/src/backend/webhook/biz-webhook/src/main/kotlin/com/tencent/bkrepo/webhook/executor/ArtifactEventConsumer.kt @@ -43,18 +43,17 @@ import org.springframework.stereotype.Component import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit -import java.util.function.Consumer import java.util.regex.Pattern /** * 事件消息消费者 */ -@Component("artifactEventWebhook") +@Component class ArtifactEventConsumer( private val webHookDao: WebHookDao, private val webHookExecutor: WebHookExecutor, private val webHookProperties: WebHookProperties -) : Consumer> { +) { private val executors = ThreadPoolExecutor( 100, @@ -74,7 +73,7 @@ class ArtifactEventConsumer( ) }) - override fun accept(message: Message) { + fun accept(message: Message) { logger.info("accept artifact event: ${message.payload}, header: ${message.headers}") val task = Runnable { triggerWebHooks(message.payload) }.trace() executors.execute(task)