From 1ffbf9af43b2302e138e3a88cba8296e917a804f Mon Sep 17 00:00:00 2001 From: zacYL <100330102+zacYL@users.noreply.github.com> Date: Fri, 31 May 2024 17:18:56 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=85=BC=E5=AE=B9accessdate=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E5=8E=86=E5=8F=B2=E9=80=BB=E8=BE=91=20#2200?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../listener/ArtifactDownloadListener.kt | 93 ++++++++++++++++--- 1 file changed, 78 insertions(+), 15 deletions(-) diff --git a/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/event/listener/ArtifactDownloadListener.kt b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/event/listener/ArtifactDownloadListener.kt index ed74d11723..74d0b0517c 100644 --- a/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/event/listener/ArtifactDownloadListener.kt +++ b/src/backend/common/common-artifact/artifact-service/src/main/kotlin/com/tencent/bkrepo/common/artifact/event/listener/ArtifactDownloadListener.kt @@ -27,6 +27,9 @@ package com.tencent.bkrepo.common.artifact.event.listener +import com.google.common.cache.Cache +import com.google.common.cache.CacheBuilder +import com.google.common.util.concurrent.ThreadFactoryBuilder import com.tencent.bkrepo.common.api.constant.HttpHeaders import com.tencent.bkrepo.common.api.constant.StringPool import com.tencent.bkrepo.common.artifact.event.ArtifactDownloadedEvent @@ -37,15 +40,20 @@ import com.tencent.bkrepo.common.artifact.repository.context.ArtifactClient import com.tencent.bkrepo.common.artifact.repository.context.ArtifactContextHolder import com.tencent.bkrepo.common.artifact.repository.context.ArtifactDownloadContext import com.tencent.bkrepo.common.operate.api.OperateLogService +import com.tencent.bkrepo.common.service.otel.util.AsyncUtils.trace import com.tencent.bkrepo.common.service.util.HttpContextHolder import com.tencent.bkrepo.common.stream.event.supplier.MessageSupplier import com.tencent.bkrepo.repository.constant.SYSTEM_USER import com.tencent.bkrepo.repository.pojo.node.NodeDetail import com.tencent.bkrepo.repository.pojo.node.NodeInfo +import com.tencent.bkrepo.repository.pojo.node.service.NodeUpdateAccessDateRequest import org.slf4j.LoggerFactory import org.springframework.context.event.EventListener import java.time.LocalDateTime import java.time.format.DateTimeFormatter +import java.util.concurrent.ArrayBlockingQueue +import java.util.concurrent.ThreadPoolExecutor +import java.util.concurrent.TimeUnit class ArtifactDownloadListener( private val artifactClient: ArtifactClient, @@ -53,6 +61,35 @@ class ArtifactDownloadListener( private val artifactEventProperties: ArtifactEventProperties, private val messageSupplier: MessageSupplier, ) { + // 更新节点访问时间任务线程池 + private val updateAccessDateExecutor = ThreadPoolExecutor( + 4, + 8, + 60, + TimeUnit.SECONDS, + ArrayBlockingQueue(1000), + ThreadFactoryBuilder().setNameFormat("update-access-date-%d").build(), + ThreadPoolExecutor.CallerRunsPolicy() + ) + + // 节点下载后将节点信息放入缓存,当缓存失效时更新accessDate + private val cache: Cache, LocalDateTime> = CacheBuilder.newBuilder() + .maximumSize(1000) + .concurrencyLevel(1) + .expireAfterAccess(30, TimeUnit.MINUTES) + .removalListener, LocalDateTime> { + updateAccessDateExecutor.execute( + Runnable { + updateNodeLastAccessDate( + projectId = it.key!!.first, + repoName = it.key!!.second, + fullPath = it.key!!.third, + accessDate = it.value!! + ) + }.trace() + ) + } + .build() @EventListener(ArtifactDownloadedEvent::class) fun listen(event: ArtifactDownloadedEvent) { @@ -84,24 +121,50 @@ class ArtifactDownloadListener( return } } - if (!artifactEventProperties.reportAccessDateEvent) { - logger.info( - "mock update node [${nodeInfo.fullPath}] " + - "access time [${nodeInfo.projectId}/${nodeInfo.repoName}]" + if (artifactEventProperties.updateAccessDate) { + // 兼容之前逻辑 + val key = Triple(nodeInfo.projectId, nodeInfo.repoName, nodeInfo.fullPath) + if (cache.getIfPresent(key) == null) { + cache.put(key, LocalDateTime.now()) + } + } else { + if (!artifactEventProperties.reportAccessDateEvent) { + logger.info( + "mock update node [${nodeInfo.fullPath}] " + + "access time [${nodeInfo.projectId}/${nodeInfo.repoName}]" + ) + return + } + if (artifactEventProperties.topic.isNullOrEmpty()) return + val event = buildNodeUpdateAccessDateEvent( + projectId = nodeInfo.projectId, + repoName = nodeInfo.repoName, + id = nodeInfo.id!!, ) + messageSupplier.delegateToSupplier( + data = event, + topic = artifactEventProperties.topic!!, + key = event.getFullResourceKey(), + ) + } + } + + private fun updateNodeLastAccessDate( + projectId: String, + repoName: String, + fullPath: String, + accessDate: LocalDateTime + ) { + if (!artifactEventProperties.updateAccessDate) { + logger.info("mock update node access time [$projectId/$repoName$fullPath]") return } - if (artifactEventProperties.topic.isNullOrEmpty()) return - val event = buildNodeUpdateAccessDateEvent( - projectId = nodeInfo.projectId, - repoName = nodeInfo.repoName, - id = nodeInfo.id!!, - ) - messageSupplier.delegateToSupplier( - data = event, - topic = artifactEventProperties.topic!!, - key = event.getFullResourceKey(), - ) + val updateRequest = NodeUpdateAccessDateRequest(projectId, repoName, fullPath, SYSTEM_USER, accessDate) + try { + artifactClient.updateAccessDate(updateRequest) + } catch (ignore: Exception) { + logger.warn("update node access time [$updateRequest] error, ${ignore.message}") + } } private fun durationCheck(accessDate: String?, lastModifiedDate: String): Boolean {