From 39a100ebe2395a1f34323f089daba340acfe9af3 Mon Sep 17 00:00:00 2001 From: felixncheng Date: Thu, 23 May 2024 16:55:57 +0800 Subject: [PATCH 01/10] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E5=AF=B9=E8=B4=A6=E5=8A=9F=E8=83=BD=20#2091?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../storage/core/AbstractFileStorage.kt | 11 +++ .../bkrepo/common/storage/core/FileStorage.kt | 9 ++ .../storage/filesystem/FileSystemClient.kt | 6 +- .../storage/filesystem/FileSystemStorage.kt | 6 ++ .../storage/innercos/InnerCosFileStorage.kt | 11 +++ .../storage/innercos/client/CosClient.kt | 29 ++++-- .../innercos/http/HttpResponseHandler.kt | 9 +- .../innercos/request/ListObjectsRequest.kt | 52 +++++++++++ .../storage/innercos/response/Content.kt | 18 ++++ .../innercos/response/ListObjectsResponse.kt | 24 +++++ .../CompleteMultipartUploadResponseHandler.kt | 4 +- .../handler/CopyObjectResponseHandler.kt | 2 +- .../InitiateMultipartUploadResponseHandler.kt | 2 +- .../handler/ListObjects0ResponseHandler.kt | 11 +++ .../handler/ListObjectsResponseHandler.kt | 50 ++++++++++ .../batch/task/storage/StorageReconcileJob.kt | 93 +++++++++++++++++++ .../bkrepo/job/batch/utils/NodeCommonUtils.kt | 14 +++ .../StorageReconcileJobProperties.kt | 8 ++ .../tencent/bkrepo/job/batch/JobBaseTest.kt | 17 +++- .../task/storage/StorageReconcileJobTest.kt | 89 ++++++++++++++++++ 20 files changed, 448 insertions(+), 17 deletions(-) create mode 100644 src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/request/ListObjectsRequest.kt create mode 100644 src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/Content.kt create mode 100644 src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/ListObjectsResponse.kt create mode 100644 src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/ListObjects0ResponseHandler.kt create mode 100644 src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/ListObjectsResponseHandler.kt create mode 100644 src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt create mode 100644 src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/StorageReconcileJobProperties.kt create mode 100644 src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJobTest.kt diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/AbstractFileStorage.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/AbstractFileStorage.kt index a2fe402e72..9696921958 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/AbstractFileStorage.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/AbstractFileStorage.kt @@ -47,6 +47,8 @@ import org.springframework.retry.support.RetryTemplate import java.io.File import java.io.IOException import java.io.InputStream +import java.nio.file.Path +import java.util.stream.Stream /** * 文件存储抽象模板类 @@ -192,6 +194,11 @@ abstract class AbstractFileStorage : F restore(path, name, days, tier, client) } + override fun listAll(path: String, storageCredentials: StorageCredentials): Stream { + val client = getClient(storageCredentials) + return listAll(path, client) + } + private fun getClient(storageCredentials: StorageCredentials): Client { return if (storageCredentials == storageProperties.defaultStorageCredentials()) { defaultClient @@ -229,6 +236,10 @@ abstract class AbstractFileStorage : F throw UnsupportedOperationException("Restore operation unsupported") } + open fun listAll(path: String, client: Client): Stream { + throw UnsupportedOperationException("ListAll operation unsupported") + } + companion object { private val logger = LoggerFactory.getLogger(AbstractFileStorage::class.java) private const val MAX_CACHE_CLIENT = 10L diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/FileStorage.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/FileStorage.kt index 88625bc7aa..5a8dde3f88 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/FileStorage.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/FileStorage.kt @@ -35,6 +35,8 @@ import com.tencent.bkrepo.common.artifact.stream.Range import com.tencent.bkrepo.common.storage.credentials.StorageCredentials import java.io.File import java.io.InputStream +import java.nio.file.Path +import java.util.stream.Stream /** * 文件存储接口 @@ -145,4 +147,11 @@ interface FileStorage { * @param storageCredentials 存储凭证 */ fun getTempPath(storageCredentials: StorageCredentials): String = System.getProperty("java.io.tmpdir") + + /** + * 列出指定目录下的所有文件 + * @param path 目录路径 + * @param storageCredentials 存储实例 + * */ + fun listAll(path: String, storageCredentials: StorageCredentials): Stream } diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemClient.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemClient.kt index bb838e9050..627a98b867 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemClient.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemClient.kt @@ -40,6 +40,7 @@ import java.nio.file.FileAlreadyExistsException import java.nio.file.Files import java.nio.file.Path import java.nio.file.Paths +import java.util.stream.Stream /** * 本地文件存储客户端 @@ -140,7 +141,6 @@ class FileSystemClient(val root: String) { * 使用channel拷贝 * */ private fun copyByChannel(src: Path, target: Path) { - if (!Files.exists(src)) { throw IOException("src[$src] file not exist") } @@ -337,6 +337,10 @@ class FileSystemClient(val root: String) { return Files.size(filePath) } + fun walk(path: String): Stream { + return Files.walk(Paths.get(root, path)) + } + private fun transfer( input: ReadableByteChannel, output: FileChannel, diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemStorage.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemStorage.kt index f8ba8f32dd..1aeb22b379 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemStorage.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemStorage.kt @@ -40,8 +40,10 @@ import com.tencent.bkrepo.common.storage.credentials.StorageCredentials import java.io.File import java.io.InputStream import java.nio.file.Files +import java.nio.file.Path import java.nio.file.Paths import java.nio.file.StandardCopyOption +import java.util.stream.Stream /** * 文件系统存储 @@ -91,4 +93,8 @@ open class FileSystemStorage : AbstractEncryptorFileStorage { + return client.walk(path).filter { Files.isRegularFile(it) } + } } diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/InnerCosFileStorage.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/InnerCosFileStorage.kt index c320ad988e..d56f878b12 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/InnerCosFileStorage.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/InnerCosFileStorage.kt @@ -31,6 +31,7 @@ package com.tencent.bkrepo.common.storage.innercos +import com.tencent.bkrepo.common.api.constant.StringPool import com.tencent.bkrepo.common.artifact.stream.Range import com.tencent.bkrepo.common.storage.core.AbstractEncryptorFileStorage import com.tencent.bkrepo.common.storage.credentials.InnerCosCredentials @@ -38,12 +39,16 @@ import com.tencent.bkrepo.common.storage.innercos.client.CosClient import com.tencent.bkrepo.common.storage.innercos.request.CheckObjectExistRequest import com.tencent.bkrepo.common.storage.innercos.request.DeleteObjectRequest import com.tencent.bkrepo.common.storage.innercos.request.GetObjectRequest +import com.tencent.bkrepo.common.storage.innercos.request.ListObjectsRequest import com.tencent.bkrepo.common.storage.innercos.request.MigrateObjectRequest import com.tencent.bkrepo.common.storage.innercos.request.RestoreObjectRequest import org.slf4j.LoggerFactory import java.io.File import java.io.IOException import java.io.InputStream +import java.nio.file.Path +import java.nio.file.Paths +import java.util.stream.Stream /** * 内部cos文件存储实现类 @@ -100,6 +105,12 @@ open class InnerCosFileStorage : AbstractEncryptorFileStorage { + val keyPrefix = if (path == StringPool.ROOT) null else path + val listObjectsRequest = ListObjectsRequest(prefix = keyPrefix) + return client.listObjects(listObjectsRequest).map { Paths.get(it) } + } + override fun onCreateClient(credentials: InnerCosCredentials): CosClient { require(credentials.secretId.isNotBlank()) require(credentials.secretKey.isNotBlank()) diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/CosClient.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/CosClient.kt index af5f6fc9c8..3ab48ca4af 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/CosClient.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/CosClient.kt @@ -61,6 +61,7 @@ import com.tencent.bkrepo.common.storage.innercos.request.FileCleanupChunkedFutu import com.tencent.bkrepo.common.storage.innercos.request.GetObjectRequest import com.tencent.bkrepo.common.storage.innercos.request.HeadObjectRequest import com.tencent.bkrepo.common.storage.innercos.request.InitiateMultipartUploadRequest +import com.tencent.bkrepo.common.storage.innercos.request.ListObjectsRequest import com.tencent.bkrepo.common.storage.innercos.request.MigrateObjectRequest import com.tencent.bkrepo.common.storage.innercos.request.PartETag import com.tencent.bkrepo.common.storage.innercos.request.PutObjectRequest @@ -71,6 +72,7 @@ import com.tencent.bkrepo.common.storage.innercos.request.UploadPartRequest import com.tencent.bkrepo.common.storage.innercos.request.UploadPartRequestFactory import com.tencent.bkrepo.common.storage.innercos.response.CopyObjectResponse import com.tencent.bkrepo.common.storage.innercos.response.CosObject +import com.tencent.bkrepo.common.storage.innercos.response.ListObjectsResponse import com.tencent.bkrepo.common.storage.innercos.response.PutObjectResponse import com.tencent.bkrepo.common.storage.innercos.response.handler.CheckArchiveObjectExistResponseHandler import com.tencent.bkrepo.common.storage.innercos.response.handler.CheckObjectRestoreResponseHandler @@ -79,6 +81,8 @@ import com.tencent.bkrepo.common.storage.innercos.response.handler.CopyObjectRes import com.tencent.bkrepo.common.storage.innercos.response.handler.GetObjectResponseHandler import com.tencent.bkrepo.common.storage.innercos.response.handler.HeadObjectResponseHandler import com.tencent.bkrepo.common.storage.innercos.response.handler.InitiateMultipartUploadResponseHandler +import com.tencent.bkrepo.common.storage.innercos.response.handler.ListObjects0ResponseHandler +import com.tencent.bkrepo.common.storage.innercos.response.handler.ListObjectsResponseHandler import com.tencent.bkrepo.common.storage.innercos.response.handler.PutObjectResponseHandler import com.tencent.bkrepo.common.storage.innercos.response.handler.SlowLogHandler import com.tencent.bkrepo.common.storage.innercos.response.handler.UploadPartResponseHandler @@ -99,6 +103,7 @@ import java.util.concurrent.PriorityBlockingQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger +import java.util.stream.Stream import kotlin.math.ceil import kotlin.math.max @@ -114,7 +119,6 @@ class CosClient(val credentials: InnerCosCredentials) { */ private val uploadThreadPool = Executors.newFixedThreadPool(config.uploadWorkers) - /** * 分块下载使用的执行器。可以为null,为null则不使用分块下载 * */ @@ -144,11 +148,14 @@ class CosClient(val credentials: InnerCosCredentials) { null } + init { + this.listObjects(ListObjectsRequest()) + } + private val useChunkedLoad = (watchDog != null) && (downloadThreadPool != null) private val fastFallbackTimeout = config.timeout shr 1 - fun headObject(cosRequest: HeadObjectRequest): CosObject { val httpRequest = buildHttpRequest(cosRequest) return CosHttpClient.execute(httpRequest, HeadObjectResponseHandler()) @@ -291,7 +298,7 @@ class CosClient(val credentials: InnerCosCredentials) { return putObject(PutObjectRequest(key, StringPool.EMPTY.byteInputStream(), length)) } val partSize = calculateOptimalPartSize(length, true) - val factory = DownloadPartRequestFactory(key, partSize, 0, length-1) + val factory = DownloadPartRequestFactory(key, partSize, 0, length - 1) while (factory.hasMoreRequests()) { val getObjectRequest = factory.nextDownloadPartRequest() val downloadRequest = fromClient.buildHttpRequest(getObjectRequest) @@ -320,11 +327,21 @@ class CosClient(val credentials: InnerCosCredentials) { } } + fun listObjects(cosRequest: ListObjectsRequest): Stream { + val httpRequest = buildHttpRequest(cosRequest) + return CosHttpClient.execute(httpRequest, ListObjectsResponseHandler(this, cosRequest)) + } + + fun listObjects0(cosRequest: ListObjectsRequest): ListObjectsResponse { + val httpRequest = buildHttpRequest(cosRequest) + return CosHttpClient.execute(httpRequest, ListObjects0ResponseHandler()) + } + private fun multipartMigrate( key: String, uploadId: String, partNumber: Int, - downloadRequest: Request + downloadRequest: Request, ): Callable { return Callable { retry(RETRY_COUNT) { @@ -335,8 +352,8 @@ class CosClient(val credentials: InnerCosCredentials) { uploadId = uploadId, partNumber = partNumber, partSize = it.header(HttpHeaders.CONTENT_LENGTH)!!.toLong(), - inputStream = it.body!!.byteStream() - ) + inputStream = it.body!!.byteStream(), + ), ) PartETag(partNumber, CosHttpClient.execute(putObjectRequest, UploadPartResponseHandler()).eTag) } diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/http/HttpResponseHandler.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/http/HttpResponseHandler.kt index 795740e75e..84c11fbcb5 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/http/HttpResponseHandler.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/http/HttpResponseHandler.kt @@ -41,10 +41,13 @@ abstract class HttpResponseHandler { open fun keepConnection(response: Response): Boolean = false companion object { - private val xmlMapper: XmlMapper = XmlMapper() + val xmlMapper: XmlMapper = XmlMapper() - fun readXmlValue(response: Response): Map<*, *> { - return xmlMapper.readValue(response.body?.string(), Map::class.java) + fun readXmlToMap(response: Response): Map<*, *> { + return readXmlValue(response) + } + inline fun readXmlValue(response: Response): T { + return xmlMapper.readValue(response.body?.string(), T::class.java) } } } diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/request/ListObjectsRequest.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/request/ListObjectsRequest.kt new file mode 100644 index 0000000000..c7440367f5 --- /dev/null +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/request/ListObjectsRequest.kt @@ -0,0 +1,52 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2020 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.tencent.bkrepo.common.storage.innercos.request + +import com.tencent.bkrepo.common.api.constant.StringPool.ROOT +import com.tencent.bkrepo.common.storage.innercos.http.HttpMethod +import okhttp3.RequestBody + +data class ListObjectsRequest( + val prefix: String? = null, + val marker: String? = null, + val maxKeys: Int = 1000, +) : CosRequest(HttpMethod.GET, ROOT) { + + init { + check(maxKeys <= 1000) + prefix?.let { parameters["prefix"] = prefix } + marker?.let { parameters["marker"] = marker } + parameters["max-keys"] = maxKeys.toString() + } + + override fun buildRequestBody(): RequestBody? = null +} diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/Content.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/Content.kt new file mode 100644 index 0000000000..304600ad89 --- /dev/null +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/Content.kt @@ -0,0 +1,18 @@ +package com.tencent.bkrepo.common.storage.innercos.response + +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty + +data class Content( + @JacksonXmlProperty(localName = "Key") + var key: String = "", + @JacksonXmlProperty(localName = "LastModified") + var lastModified: String = "", + @JacksonXmlProperty(localName = "Created") + var created: String = "", + @JacksonXmlProperty(localName = "ETag") + var etag: String = "", + @JacksonXmlProperty(localName = "Size") + var size: Long = 0, + @JacksonXmlProperty(localName = "Forbid") + var forbid: Int = 0, +) diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/ListObjectsResponse.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/ListObjectsResponse.kt new file mode 100644 index 0000000000..418d772f62 --- /dev/null +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/ListObjectsResponse.kt @@ -0,0 +1,24 @@ +package com.tencent.bkrepo.common.storage.innercos.response + +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlElementWrapper +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement + +@JacksonXmlRootElement(localName = "ListBucketResult") +data class ListObjectsResponse( + @JacksonXmlProperty(localName = "Name") + var name: String = "", + @JacksonXmlProperty(localName = "Prefix") + var prefix: String = "", + @JacksonXmlProperty(localName = "Marker") + var marker: String = "", + @JacksonXmlProperty(localName = "IsTruncated") + var sTruncated: Boolean = false, + @JacksonXmlProperty(localName = "MaxKeys") + var maxKeys: Int = 0, + @JacksonXmlProperty(localName = "NextMarker") + var nextMarker: String = "", + @JacksonXmlElementWrapper(useWrapping = false) + @JacksonXmlProperty(localName = "Contents") + var contents: List = mutableListOf(), +) diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/CompleteMultipartUploadResponseHandler.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/CompleteMultipartUploadResponseHandler.kt index 6ac1699b5e..827a411c7d 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/CompleteMultipartUploadResponseHandler.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/CompleteMultipartUploadResponseHandler.kt @@ -40,8 +40,8 @@ import okhttp3.Response class CompleteMultipartUploadResponseHandler : HttpResponseHandler() { override fun handle(response: Response): PutObjectResponse { return PutObjectResponse( - readXmlValue(response)[ETAG].toString(), - response.header(RESPONSE_CRC64) + readXmlToMap(response)[ETAG].toString(), + response.header(RESPONSE_CRC64), ) } } diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/CopyObjectResponseHandler.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/CopyObjectResponseHandler.kt index abdf3fd73d..4e03e6d91a 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/CopyObjectResponseHandler.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/CopyObjectResponseHandler.kt @@ -39,7 +39,7 @@ import okhttp3.Response class CopyObjectResponseHandler : HttpResponseHandler() { override fun handle(response: Response): CopyObjectResponse { - val result = readXmlValue(response) + val result = readXmlToMap(response) val eTag = (result[ETAG].toString()).trim('"') val lastModified = result[RESPONSE_LAST_MODIFIED].toString() return CopyObjectResponse(eTag, lastModified) diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/InitiateMultipartUploadResponseHandler.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/InitiateMultipartUploadResponseHandler.kt index 0dd4916102..544103a918 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/InitiateMultipartUploadResponseHandler.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/InitiateMultipartUploadResponseHandler.kt @@ -37,6 +37,6 @@ import okhttp3.Response class InitiateMultipartUploadResponseHandler : HttpResponseHandler() { override fun handle(response: Response): String { - return readXmlValue(response)[RESPONSE_UPLOAD_ID].toString() + return readXmlToMap(response)[RESPONSE_UPLOAD_ID].toString() } } diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/ListObjects0ResponseHandler.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/ListObjects0ResponseHandler.kt new file mode 100644 index 0000000000..c73abeae96 --- /dev/null +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/ListObjects0ResponseHandler.kt @@ -0,0 +1,11 @@ +package com.tencent.bkrepo.common.storage.innercos.response.handler + +import com.tencent.bkrepo.common.storage.innercos.http.HttpResponseHandler +import com.tencent.bkrepo.common.storage.innercos.response.ListObjectsResponse +import okhttp3.Response + +class ListObjects0ResponseHandler : HttpResponseHandler() { + override fun handle(response: Response): ListObjectsResponse { + return readXmlValue(response) + } +} diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/ListObjectsResponseHandler.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/ListObjectsResponseHandler.kt new file mode 100644 index 0000000000..f604d4d2d8 --- /dev/null +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/ListObjectsResponseHandler.kt @@ -0,0 +1,50 @@ +package com.tencent.bkrepo.common.storage.innercos.response.handler + +import com.tencent.bkrepo.common.storage.innercos.client.CosClient +import com.tencent.bkrepo.common.storage.innercos.http.HttpResponseHandler +import com.tencent.bkrepo.common.storage.innercos.request.ListObjectsRequest +import com.tencent.bkrepo.common.storage.innercos.response.ListObjectsResponse +import okhttp3.Response +import org.slf4j.LoggerFactory +import java.util.stream.Stream +import kotlin.streams.asStream + +class ListObjectsResponseHandler(val client: CosClient, val req: ListObjectsRequest) : + HttpResponseHandler>() { + override fun handle(response: Response): Stream { + val listRes = readXmlValue(response) + return Itr(listRes, client, req).asSequence().asStream() + } + + private class Itr(var response: ListObjectsResponse, val client: CosClient, val req: ListObjectsRequest) : + Iterator { + + private var it = response.contents.iterator() + private var nextMarker = response.nextMarker + override fun hasNext(): Boolean { + if (!it.hasNext() && nextMarker.isNotEmpty()) { + load() + } + return it.hasNext() + } + + override fun next(): String { + return it.next().key + } + + private fun load() { + val listObjectsRequest = ListObjectsRequest(prefix = req.prefix, marker = response.nextMarker, req.maxKeys) + response = client.listObjects0(listObjectsRequest) + val size = response.contents.size + if (logger.isDebugEnabled && size > 0) { + logger.debug("load $size objects.") + } + it = response.contents.iterator() + nextMarker = response.nextMarker + } + } + + companion object { + private val logger = LoggerFactory.getLogger(ListObjectsResponseHandler::class.java) + } +} diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt new file mode 100644 index 0000000000..fd2237d550 --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt @@ -0,0 +1,93 @@ +package com.tencent.bkrepo.job.batch.task.storage + +import com.google.common.hash.BloomFilter +import com.google.common.hash.Funnels +import com.tencent.bkrepo.common.api.constant.StringPool +import com.tencent.bkrepo.common.service.cluster.properties.ClusterProperties +import com.tencent.bkrepo.common.storage.core.FileStorage +import com.tencent.bkrepo.common.storage.core.StorageProperties +import com.tencent.bkrepo.common.storage.credentials.StorageCredentials +import com.tencent.bkrepo.job.CREDENTIALS +import com.tencent.bkrepo.job.SHA256 +import com.tencent.bkrepo.job.batch.base.DefaultContextJob +import com.tencent.bkrepo.job.batch.base.JobContext +import com.tencent.bkrepo.job.batch.utils.NodeCommonUtils +import com.tencent.bkrepo.job.config.properties.FileReferenceCleanupJobProperties +import com.tencent.bkrepo.job.config.properties.StorageReconcileJobProperties +import com.tencent.bkrepo.repository.api.FileReferenceClient +import com.tencent.bkrepo.repository.api.StorageCredentialsClient +import org.slf4j.LoggerFactory +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.data.mongodb.core.query.Criteria +import org.springframework.data.mongodb.core.query.Query +import org.springframework.data.mongodb.core.query.isEqualTo +import org.springframework.stereotype.Component +import java.nio.charset.StandardCharsets + +/** + * 设计目标 + * 1. 找出未引用的cos + * */ +@Component +@EnableConfigurationProperties(StorageReconcileJobProperties::class) +@Suppress("UnstableApiUsage") +class StorageReconcileJob( + private val properties: StorageReconcileJobProperties, + private val bloomFilterProp: FileReferenceCleanupJobProperties, + private val fileStorage: FileStorage, + private val clusterProperties: ClusterProperties, + private val storageCredentialsClient: StorageCredentialsClient, + private val storageProperties: StorageProperties, + private val fileReferenceClient: FileReferenceClient, +) : DefaultContextJob(properties) { + override fun doStart0(jobContext: JobContext) { + // 校验默认存储 + reconcile(storageProperties.defaultStorageCredentials()) + + // 校验其他存储 + storageCredentialsClient.list(clusterProperties.region).data?.forEach { + reconcile(it) + } + } + + private fun reconcile(storageCredentials: StorageCredentials) { + logger.info("Start reconcile storage [${storageCredentials.key}]") + val bf = buildBloomFilter(storageCredentials) + var total = 0L + var deleted = 0L + fileStorage.listAll(StringPool.ROOT, storageCredentials).map { it.toFile().name }.forEach { + total++ + if (!bf.mightContain(it)) { + logger.info("File [$it] miss ref.") + fileReferenceClient.increment(it, storageCredentials.key, 0) + deleted++ + } + } + logger.info("Reconcile storage [${storageCredentials.key}] successful, deleted[$deleted], total[$total].") + } + + private fun buildBloomFilter(storageCredentials: StorageCredentials): BloomFilter { + logger.info("Start build bloom filter.") + val bf = BloomFilter.create( + Funnels.stringFunnel(StandardCharsets.UTF_8), + bloomFilterProp.expectedNodes, + bloomFilterProp.fpp, + ) + val query = Query(Criteria.where(CREDENTIALS).isEqualTo(storageCredentials.key)) + query.fields().include(SHA256) + NodeCommonUtils.forEachRefByCollectionParallel(query) { + val sha256 = it[SHA256]?.toString() + if (sha256 != null) { + bf.put(sha256) + } + } + val count = "${bf.approximateElementCount()}/${bloomFilterProp.expectedNodes}" + val fpp = bf.expectedFpp() + logger.info("Build bloom filter successful,key: ${storageCredentials.key},count: $count,fpp: $fpp") + return bf + } + + companion object { + private val logger = LoggerFactory.getLogger(StorageReconcileJob::class.java) + } +} diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/NodeCommonUtils.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/NodeCommonUtils.kt index 5a1786bb04..bc2af70139 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/NodeCommonUtils.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/NodeCommonUtils.kt @@ -55,6 +55,7 @@ class NodeCommonUtils( lateinit var separationTaskService: SeparationTaskService private const val COLLECTION_NAME_PREFIX = "node_" private const val SEPARATION_COLLECTION_NAME_PREFIX = "separation_node_" + private const val FILE_REFERENCE_COLLECTION_NAME_PREFIX = "file_reference_" private val workPool = ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), @@ -138,6 +139,19 @@ class NodeCommonUtils( futures.forEach { it.get() } } + fun forEachRefByCollectionParallel( + query: Query, + batchSize: Int = BATCH_SIZE, + consumer: Consumer>, + ) { + val futures = mutableListOf>() + for (i in 0 until SHARDING_COUNT) { + val collection = FILE_REFERENCE_COLLECTION_NAME_PREFIX.plus(i) + futures.add(workPool.submit { findByCollection(query, batchSize, collection, consumer) }) + } + futures.forEach { it.get() } + } + fun forEachColdNodeByCollectionParallel( query: Query, batchSize: Int = BATCH_SIZE, diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/StorageReconcileJobProperties.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/StorageReconcileJobProperties.kt new file mode 100644 index 0000000000..6ad09479a2 --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/StorageReconcileJobProperties.kt @@ -0,0 +1,8 @@ +package com.tencent.bkrepo.job.config.properties + +import org.springframework.boot.context.properties.ConfigurationProperties + +@ConfigurationProperties("job.storage-reconcile") +class StorageReconcileJobProperties( + override var cron: String = "0 0 0 1 */1 ?", +) : BatchJobProperties() diff --git a/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/JobBaseTest.kt b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/JobBaseTest.kt index c355f98eb9..7c308ca6c9 100644 --- a/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/JobBaseTest.kt +++ b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/JobBaseTest.kt @@ -31,7 +31,9 @@ import com.tencent.bkrepo.common.job.JobAutoConfiguration import com.tencent.bkrepo.common.service.cluster.properties.ClusterProperties import com.tencent.bkrepo.common.service.util.SpringContextUtils import com.tencent.bkrepo.common.storage.StorageAutoConfiguration +import com.tencent.bkrepo.job.batch.file.ExpireFileResolverConfig import com.tencent.bkrepo.job.config.JobConfig +import com.tencent.bkrepo.job.config.ScheduledTaskConfigurer import io.mockk.every import io.mockk.mockk import io.mockk.mockkObject @@ -45,6 +47,7 @@ import org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfigurati import org.springframework.cloud.sleuth.Tracer import org.springframework.cloud.sleuth.otel.bridge.OtelTracer import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.FilterType import org.springframework.context.annotation.Import import org.springframework.test.context.TestPropertySource @@ -60,10 +63,18 @@ import org.springframework.test.context.TestPropertySource @TestPropertySource( locations = [ "classpath:bootstrap-ut.properties", - "classpath:job-ut.properties" - ] + "classpath:job-ut.properties", + ], +) +@ComponentScan( + basePackages = ["com.tencent.bkrepo.job"], + excludeFilters = [ + ComponentScan.Filter( + type = FilterType.ASSIGNABLE_TYPE, + value = [ScheduledTaskConfigurer::class, ExpireFileResolverConfig::class], + ), + ], ) -@ComponentScan(basePackages = ["com.tencent.bkrepo.job"]) @SpringBootConfiguration @EnableAutoConfiguration @TestInstance(TestInstance.Lifecycle.PER_CLASS) diff --git a/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJobTest.kt b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJobTest.kt new file mode 100644 index 0000000000..5842a786dc --- /dev/null +++ b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJobTest.kt @@ -0,0 +1,89 @@ +package com.tencent.bkrepo.job.batch.task.storage + +import com.tencent.bkrepo.common.artifact.api.ArtifactFile +import com.tencent.bkrepo.common.artifact.api.FileSystemArtifactFile +import com.tencent.bkrepo.common.service.util.ResponseBuilder +import com.tencent.bkrepo.common.storage.core.StorageProperties +import com.tencent.bkrepo.common.storage.core.StorageService +import com.tencent.bkrepo.common.storage.credentials.FileSystemCredentials +import com.tencent.bkrepo.common.storage.util.toPath +import com.tencent.bkrepo.job.batch.JobBaseTest +import com.tencent.bkrepo.job.batch.utils.NodeCommonUtils +import com.tencent.bkrepo.job.migrate.MigrateRepoStorageService +import com.tencent.bkrepo.repository.api.FileReferenceClient +import com.tencent.bkrepo.repository.api.RepositoryClient +import com.tencent.bkrepo.repository.api.StorageCredentialsClient +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import org.mockito.Mockito.anyLong +import org.mockito.Mockito.anyString +import org.mockito.Mockito.isNull +import org.mockito.Mockito.`when` +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest +import org.springframework.boot.test.mock.mockito.MockBean +import kotlin.random.Random + +@DisplayName("存储对账Job测试") +@DataMongoTest +class StorageReconcileJobTest @Autowired constructor( + private val storageReconcileJob: StorageReconcileJob, + private val storageService: StorageService, + private val storageProperties: StorageProperties, +) : JobBaseTest() { + + @MockBean + lateinit var migrateRepoStorageService: MigrateRepoStorageService + + @MockBean + lateinit var storageCredentialsClient: StorageCredentialsClient + + @MockBean + lateinit var fileReferenceClient: FileReferenceClient + + @MockBean + lateinit var repositoryClient: RepositoryClient + + @Autowired + lateinit var nodeCommonUtils: NodeCommonUtils + private val cred = storageProperties.defaultStorageCredentials() as FileSystemCredentials + + init { + cred.path = System.getProperty("java.io.tmpdir").plus("/ut-test") + } + + @BeforeEach + fun before() { + `when`(storageCredentialsClient.list(null)).thenReturn(ResponseBuilder.success(emptyList())) + } + + @AfterEach + fun afterEach() { + cred.path.toPath().toFile().deleteRecursively() + } + + @Test + fun reconcileTest() { + var checked = 0 + `when`(fileReferenceClient.increment(anyString(), isNull(), anyLong())).then { + checked++ + ResponseBuilder.success(true) + } + repeat(10) { + val file = createTempArtifactFile() + storageService.store(file.getFileSha256(), file, null) + } + storageReconcileJob.start() + Assertions.assertEquals(10, checked) + } + + private fun createTempArtifactFile(): ArtifactFile { + val data = Random.nextBytes(Random.nextInt(1024, 1 shl 20)) + val tempFile = createTempFile() + tempFile.writeBytes(data) + return FileSystemArtifactFile(tempFile) + } +} From 4818b680dcece38f5e1ce1df76a5b605da146db4 Mon Sep 17 00:00:00 2001 From: felixncheng Date: Wed, 24 Jul 2024 14:40:24 +0800 Subject: [PATCH 02/10] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E5=AF=B9=E8=B4=A6=E5=8A=9F=E8=83=BD=20#2091?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bkrepo/job/batch/task/storage/StorageReconcileJob.kt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt index fd2237d550..4ecb748fb7 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt @@ -57,7 +57,7 @@ class StorageReconcileJob( var deleted = 0L fileStorage.listAll(StringPool.ROOT, storageCredentials).map { it.toFile().name }.forEach { total++ - if (!bf.mightContain(it)) { + if (it.length == SHA256_LEN && !bf.mightContain(it)) { logger.info("File [$it] miss ref.") fileReferenceClient.increment(it, storageCredentials.key, 0) deleted++ @@ -89,5 +89,6 @@ class StorageReconcileJob( companion object { private val logger = LoggerFactory.getLogger(StorageReconcileJob::class.java) + private const val SHA256_LEN = 64 } } From 7b9ba3109a6d871a61c06ab9d5524563bfafd228 Mon Sep 17 00:00:00 2001 From: felixncheng Date: Wed, 24 Jul 2024 15:30:48 +0800 Subject: [PATCH 03/10] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E5=AF=B9=E8=B4=A6=E5=8A=9F=E8=83=BD=20#2091?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bkrepo/common/storage/innercos/http/CosHttpClient.kt | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/http/CosHttpClient.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/http/CosHttpClient.kt index f29f8df58a..8395af6696 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/http/CosHttpClient.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/http/CosHttpClient.kt @@ -100,7 +100,11 @@ object CosHttpClient { .append(response.code) .appendLine("[${response.message}]") .appendLine(response.headers) - .appendLine(response.body?.bytes()?.toString(Charset.forName("GB2312"))) + try { + builder.appendLine(response.body?.bytes()?.toString(Charset.forName("GB2312"))) + } catch (e: Exception) { + logger.warn("read body error", e) + } } val message = builder.toString() logger.warn(message) From bf87f7e38868d85ebf3e54f6e8f0586da4ad0d2f Mon Sep 17 00:00:00 2001 From: felixncheng Date: Wed, 24 Jul 2024 16:04:35 +0800 Subject: [PATCH 04/10] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E5=AF=B9=E8=B4=A6=E5=8A=9F=E8=83=BD=20#2091?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../tencent/bkrepo/common/storage/innercos/response/Content.kt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/Content.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/Content.kt index 304600ad89..d3257a8880 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/Content.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/Content.kt @@ -1,7 +1,9 @@ package com.tencent.bkrepo.common.storage.innercos.response +import com.fasterxml.jackson.annotation.JsonIgnoreProperties import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty +@JsonIgnoreProperties(ignoreUnknown = true) data class Content( @JacksonXmlProperty(localName = "Key") var key: String = "", From fb3e7a28f47eedc09511a011febca4420d2cc359 Mon Sep 17 00:00:00 2001 From: felixncheng Date: Wed, 24 Jul 2024 16:20:17 +0800 Subject: [PATCH 05/10] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E5=AF=B9=E8=B4=A6=E5=8A=9F=E8=83=BD=20#2091?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/storage/innercos/response/ListObjectsResponse.kt | 2 +- .../innercos/response/handler/ListObjectsResponseHandler.kt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/ListObjectsResponse.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/ListObjectsResponse.kt index 418d772f62..b56305dcb9 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/ListObjectsResponse.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/ListObjectsResponse.kt @@ -17,7 +17,7 @@ data class ListObjectsResponse( @JacksonXmlProperty(localName = "MaxKeys") var maxKeys: Int = 0, @JacksonXmlProperty(localName = "NextMarker") - var nextMarker: String = "", + var nextMarker: String? = null, @JacksonXmlElementWrapper(useWrapping = false) @JacksonXmlProperty(localName = "Contents") var contents: List = mutableListOf(), diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/ListObjectsResponseHandler.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/ListObjectsResponseHandler.kt index f604d4d2d8..f4362e6937 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/ListObjectsResponseHandler.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/ListObjectsResponseHandler.kt @@ -22,7 +22,7 @@ class ListObjectsResponseHandler(val client: CosClient, val req: ListObjectsRequ private var it = response.contents.iterator() private var nextMarker = response.nextMarker override fun hasNext(): Boolean { - if (!it.hasNext() && nextMarker.isNotEmpty()) { + if (!it.hasNext() && nextMarker?.isNotEmpty() == true) { load() } return it.hasNext() From 40f85c9da014a4802c94ad6c32b488c302eb6afe Mon Sep 17 00:00:00 2001 From: felixncheng Date: Thu, 25 Jul 2024 11:01:48 +0800 Subject: [PATCH 06/10] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E5=AF=B9=E8=B4=A6=E5=8A=9F=E8=83=BD=20#2091?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../batch/task/storage/StorageReconcileJob.kt | 35 ++++++++++++++++--- .../StorageReconcileJobProperties.kt | 1 + 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt index 4ecb748fb7..105dbd0c19 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt @@ -25,8 +25,9 @@ import org.springframework.stereotype.Component import java.nio.charset.StandardCharsets /** - * 设计目标 - * 1. 找出未引用的cos + * 实际存储与文件引用进行对账,删除存储中多余的文件。 + * 支持安全模式,在安全模式下,不区分数据库中的存储实例, + * 只要数据库存在文件引用,就不会删除实际存储。 * */ @Component @EnableConfigurationProperties(StorageReconcileJobProperties::class) @@ -50,15 +51,32 @@ class StorageReconcileJob( } } + /** + * 存储对账,只会处理sha256为文件名的文件 + * 通过二次确认,以保证不会存在误删 + * 1. 先加载引用快照 + * 2. 比对实际存储中的文件,筛选出待删除文件列表 + * 3. 再次加载引用快照,从待删除文件列表中确定需要删除的文件。 + * */ private fun reconcile(storageCredentials: StorageCredentials) { logger.info("Start reconcile storage [${storageCredentials.key}]") - val bf = buildBloomFilter(storageCredentials) + val firstRefSnapshot = buildBloomFilter(storageCredentials) var total = 0L var deleted = 0L + val pendingDeleteList = mutableSetOf() fileStorage.listAll(StringPool.ROOT, storageCredentials).map { it.toFile().name }.forEach { total++ - if (it.length == SHA256_LEN && !bf.mightContain(it)) { + if (it.length == SHA256_LEN && !firstRefSnapshot.mightContain(it)) { + // 准备删除 logger.info("File [$it] miss ref.") + pendingDeleteList.add(it) + } + } + // 二次确认 + val secondRefSnapshot = buildBloomFilter(storageCredentials) + pendingDeleteList.forEach { + if (!secondRefSnapshot.mightContain(it)) { + logger.info("Delete file [$it]") fileReferenceClient.increment(it, storageCredentials.key, 0) deleted++ } @@ -73,7 +91,14 @@ class StorageReconcileJob( bloomFilterProp.expectedNodes, bloomFilterProp.fpp, ) - val query = Query(Criteria.where(CREDENTIALS).isEqualTo(storageCredentials.key)) + val criteria = Criteria.where(CREDENTIALS).apply { + if (!properties.safeMode) { + isEqualTo(storageCredentials.key) + } else { + logger.info("Work in safe mode") + } + } + val query = Query(criteria) query.fields().include(SHA256) NodeCommonUtils.forEachRefByCollectionParallel(query) { val sha256 = it[SHA256]?.toString() diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/StorageReconcileJobProperties.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/StorageReconcileJobProperties.kt index 6ad09479a2..e0b458962d 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/StorageReconcileJobProperties.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/StorageReconcileJobProperties.kt @@ -5,4 +5,5 @@ import org.springframework.boot.context.properties.ConfigurationProperties @ConfigurationProperties("job.storage-reconcile") class StorageReconcileJobProperties( override var cron: String = "0 0 0 1 */1 ?", + var safeMode: Boolean = true, // 安全模式下,只要数据库存在引用,即保留存储。 ) : BatchJobProperties() From f173a2cbfb70999cc7aae63dc41c6871fe5e5970 Mon Sep 17 00:00:00 2001 From: felixncheng Date: Thu, 25 Jul 2024 11:14:25 +0800 Subject: [PATCH 07/10] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E5=AF=B9=E8=B4=A6=E5=8A=9F=E8=83=BD=20#2091?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bkrepo/job/batch/task/storage/StorageReconcileJob.kt | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt index 105dbd0c19..2098a218c4 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt @@ -19,6 +19,7 @@ import com.tencent.bkrepo.repository.api.StorageCredentialsClient import org.slf4j.LoggerFactory import org.springframework.boot.context.properties.EnableConfigurationProperties import org.springframework.data.mongodb.core.query.Criteria +import org.springframework.data.mongodb.core.query.Criteria.where import org.springframework.data.mongodb.core.query.Query import org.springframework.data.mongodb.core.query.isEqualTo import org.springframework.stereotype.Component @@ -91,9 +92,9 @@ class StorageReconcileJob( bloomFilterProp.expectedNodes, bloomFilterProp.fpp, ) - val criteria = Criteria.where(CREDENTIALS).apply { + val criteria = Criteria().apply { if (!properties.safeMode) { - isEqualTo(storageCredentials.key) + where(CREDENTIALS).isEqualTo(storageCredentials.key) } else { logger.info("Work in safe mode") } From 8712f876d4922066d9477f862e94a5bd345e279c Mon Sep 17 00:00:00 2001 From: felixncheng Date: Thu, 25 Jul 2024 11:47:23 +0800 Subject: [PATCH 08/10] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E5=AF=B9=E8=B4=A6=E5=8A=9F=E8=83=BD=20#2091?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bkrepo/job/batch/task/storage/StorageReconcileJobTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJobTest.kt b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJobTest.kt index 5842a786dc..e926ca3b88 100644 --- a/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJobTest.kt +++ b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJobTest.kt @@ -28,7 +28,7 @@ import org.springframework.boot.test.mock.mockito.MockBean import kotlin.random.Random @DisplayName("存储对账Job测试") -@DataMongoTest +@DataMongoTest(properties = ["job.file-reference-cleanup.expectedNodes=100"]) class StorageReconcileJobTest @Autowired constructor( private val storageReconcileJob: StorageReconcileJob, private val storageService: StorageService, From 122969255c9ba5c49904eb316d527178713590fc Mon Sep 17 00:00:00 2001 From: felixncheng Date: Mon, 29 Jul 2024 19:57:56 +0800 Subject: [PATCH 09/10] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E5=AF=B9=E8=B4=A6=E5=8A=9F=E8=83=BD=20#2091?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../common/storage/innercos/client/CosClient.kt | 4 ---- .../job/batch/task/storage/StorageReconcileJob.kt | 13 +++++++------ 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/CosClient.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/CosClient.kt index 3ab48ca4af..0104df1aae 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/CosClient.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/CosClient.kt @@ -148,10 +148,6 @@ class CosClient(val credentials: InnerCosCredentials) { null } - init { - this.listObjects(ListObjectsRequest()) - } - private val useChunkedLoad = (watchDog != null) && (downloadThreadPool != null) private val fastFallbackTimeout = config.timeout shr 1 diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt index 2098a218c4..d1e414fa6a 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt @@ -60,7 +60,8 @@ class StorageReconcileJob( * 3. 再次加载引用快照,从待删除文件列表中确定需要删除的文件。 * */ private fun reconcile(storageCredentials: StorageCredentials) { - logger.info("Start reconcile storage [${storageCredentials.key}]") + val credentialsKey = storageCredentials.key + logger.info("Start reconcile storage [$credentialsKey]") val firstRefSnapshot = buildBloomFilter(storageCredentials) var total = 0L var deleted = 0L @@ -73,16 +74,16 @@ class StorageReconcileJob( pendingDeleteList.add(it) } } - // 二次确认 - val secondRefSnapshot = buildBloomFilter(storageCredentials) + // 二次确认,因为待确认的文件数量远低于总体数量,所以这里采用直接查表,效率更高些。 pendingDeleteList.forEach { - if (!secondRefSnapshot.mightContain(it)) { + val exists = fileReferenceClient.exists(it, credentialsKey).data ?: true + if (!exists) { logger.info("Delete file [$it]") - fileReferenceClient.increment(it, storageCredentials.key, 0) + fileReferenceClient.increment(it, credentialsKey, 0) deleted++ } } - logger.info("Reconcile storage [${storageCredentials.key}] successful, deleted[$deleted], total[$total].") + logger.info("Reconcile storage [$credentialsKey] successful, deleted[$deleted], total[$total].") } private fun buildBloomFilter(storageCredentials: StorageCredentials): BloomFilter { From 67db001223436dfe0f11f876f0d87653de372333 Mon Sep 17 00:00:00 2001 From: felixncheng Date: Mon, 29 Jul 2024 20:26:31 +0800 Subject: [PATCH 10/10] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E5=AF=B9=E8=B4=A6=E5=8A=9F=E8=83=BD=20#2091?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../bkrepo/job/batch/task/storage/StorageReconcileJobTest.kt | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJobTest.kt b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJobTest.kt index e926ca3b88..da46b102a9 100644 --- a/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJobTest.kt +++ b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJobTest.kt @@ -72,6 +72,9 @@ class StorageReconcileJobTest @Autowired constructor( checked++ ResponseBuilder.success(true) } + `when`(fileReferenceClient.exists(anyString(), isNull())).then { + ResponseBuilder.success(false) + } repeat(10) { val file = createTempArtifactFile() storageService.store(file.getFileSha256(), file, null)