Skip to content

Commit

Permalink
feat: 兼容accessdate更新历史逻辑 #2200
Browse files Browse the repository at this point in the history
  • Loading branch information
zacYL committed May 31, 2024
1 parent 8e18aae commit 1ffbf9a
Showing 1 changed file with 78 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@

package com.tencent.bkrepo.common.artifact.event.listener

import com.google.common.cache.Cache
import com.google.common.cache.CacheBuilder
import com.google.common.util.concurrent.ThreadFactoryBuilder
import com.tencent.bkrepo.common.api.constant.HttpHeaders
import com.tencent.bkrepo.common.api.constant.StringPool
import com.tencent.bkrepo.common.artifact.event.ArtifactDownloadedEvent
Expand All @@ -37,22 +40,56 @@ import com.tencent.bkrepo.common.artifact.repository.context.ArtifactClient
import com.tencent.bkrepo.common.artifact.repository.context.ArtifactContextHolder
import com.tencent.bkrepo.common.artifact.repository.context.ArtifactDownloadContext
import com.tencent.bkrepo.common.operate.api.OperateLogService
import com.tencent.bkrepo.common.service.otel.util.AsyncUtils.trace
import com.tencent.bkrepo.common.service.util.HttpContextHolder
import com.tencent.bkrepo.common.stream.event.supplier.MessageSupplier
import com.tencent.bkrepo.repository.constant.SYSTEM_USER
import com.tencent.bkrepo.repository.pojo.node.NodeDetail
import com.tencent.bkrepo.repository.pojo.node.NodeInfo
import com.tencent.bkrepo.repository.pojo.node.service.NodeUpdateAccessDateRequest
import org.slf4j.LoggerFactory
import org.springframework.context.event.EventListener
import java.time.LocalDateTime
import java.time.format.DateTimeFormatter
import java.util.concurrent.ArrayBlockingQueue
import java.util.concurrent.ThreadPoolExecutor
import java.util.concurrent.TimeUnit

class ArtifactDownloadListener(
private val artifactClient: ArtifactClient,
private val operateLogService: OperateLogService,
private val artifactEventProperties: ArtifactEventProperties,
private val messageSupplier: MessageSupplier,
) {
// 更新节点访问时间任务线程池
private val updateAccessDateExecutor = ThreadPoolExecutor(
4,
8,
60,
TimeUnit.SECONDS,
ArrayBlockingQueue(1000),
ThreadFactoryBuilder().setNameFormat("update-access-date-%d").build(),
ThreadPoolExecutor.CallerRunsPolicy()
)

// 节点下载后将节点信息放入缓存,当缓存失效时更新accessDate
private val cache: Cache<Triple<String, String, String>, LocalDateTime> = CacheBuilder.newBuilder()
.maximumSize(1000)
.concurrencyLevel(1)
.expireAfterAccess(30, TimeUnit.MINUTES)
.removalListener<Triple<String, String, String>, LocalDateTime> {
updateAccessDateExecutor.execute(
Runnable {
updateNodeLastAccessDate(
projectId = it.key!!.first,
repoName = it.key!!.second,
fullPath = it.key!!.third,
accessDate = it.value!!
)
}.trace()
)
}
.build()

@EventListener(ArtifactDownloadedEvent::class)
fun listen(event: ArtifactDownloadedEvent) {
Expand Down Expand Up @@ -84,24 +121,50 @@ class ArtifactDownloadListener(
return
}
}
if (!artifactEventProperties.reportAccessDateEvent) {
logger.info(
"mock update node [${nodeInfo.fullPath}] " +
"access time [${nodeInfo.projectId}/${nodeInfo.repoName}]"
if (artifactEventProperties.updateAccessDate) {
// 兼容之前逻辑
val key = Triple(nodeInfo.projectId, nodeInfo.repoName, nodeInfo.fullPath)
if (cache.getIfPresent(key) == null) {
cache.put(key, LocalDateTime.now())
}
} else {
if (!artifactEventProperties.reportAccessDateEvent) {
logger.info(
"mock update node [${nodeInfo.fullPath}] " +
"access time [${nodeInfo.projectId}/${nodeInfo.repoName}]"
)
return
}
if (artifactEventProperties.topic.isNullOrEmpty()) return
val event = buildNodeUpdateAccessDateEvent(
projectId = nodeInfo.projectId,
repoName = nodeInfo.repoName,
id = nodeInfo.id!!,
)
messageSupplier.delegateToSupplier(
data = event,
topic = artifactEventProperties.topic!!,
key = event.getFullResourceKey(),
)
}
}

private fun updateNodeLastAccessDate(
projectId: String,
repoName: String,
fullPath: String,
accessDate: LocalDateTime
) {
if (!artifactEventProperties.updateAccessDate) {
logger.info("mock update node access time [$projectId/$repoName$fullPath]")
return
}
if (artifactEventProperties.topic.isNullOrEmpty()) return
val event = buildNodeUpdateAccessDateEvent(
projectId = nodeInfo.projectId,
repoName = nodeInfo.repoName,
id = nodeInfo.id!!,
)
messageSupplier.delegateToSupplier(
data = event,
topic = artifactEventProperties.topic!!,
key = event.getFullResourceKey(),
)
val updateRequest = NodeUpdateAccessDateRequest(projectId, repoName, fullPath, SYSTEM_USER, accessDate)
try {
artifactClient.updateAccessDate(updateRequest)
} catch (ignore: Exception) {
logger.warn("update node access time [$updateRequest] error, ${ignore.message}")
}
}

private fun durationCheck(accessDate: String?, lastModifiedDate: String): Boolean {
Expand Down

0 comments on commit 1ffbf9a

Please sign in to comment.