diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/AbstractFileStorage.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/AbstractFileStorage.kt index a2fe402e72..9696921958 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/AbstractFileStorage.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/AbstractFileStorage.kt @@ -47,6 +47,8 @@ import org.springframework.retry.support.RetryTemplate import java.io.File import java.io.IOException import java.io.InputStream +import java.nio.file.Path +import java.util.stream.Stream /** * 文件存储抽象模板类 @@ -192,6 +194,11 @@ abstract class AbstractFileStorage : F restore(path, name, days, tier, client) } + override fun listAll(path: String, storageCredentials: StorageCredentials): Stream { + val client = getClient(storageCredentials) + return listAll(path, client) + } + private fun getClient(storageCredentials: StorageCredentials): Client { return if (storageCredentials == storageProperties.defaultStorageCredentials()) { defaultClient @@ -229,6 +236,10 @@ abstract class AbstractFileStorage : F throw UnsupportedOperationException("Restore operation unsupported") } + open fun listAll(path: String, client: Client): Stream { + throw UnsupportedOperationException("ListAll operation unsupported") + } + companion object { private val logger = LoggerFactory.getLogger(AbstractFileStorage::class.java) private const val MAX_CACHE_CLIENT = 10L diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/FileStorage.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/FileStorage.kt index 88625bc7aa..5a8dde3f88 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/FileStorage.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/core/FileStorage.kt @@ -35,6 +35,8 @@ import com.tencent.bkrepo.common.artifact.stream.Range import com.tencent.bkrepo.common.storage.credentials.StorageCredentials import java.io.File import java.io.InputStream +import java.nio.file.Path +import java.util.stream.Stream /** * 文件存储接口 @@ -145,4 +147,11 @@ interface FileStorage { * @param storageCredentials 存储凭证 */ fun getTempPath(storageCredentials: StorageCredentials): String = System.getProperty("java.io.tmpdir") + + /** + * 列出指定目录下的所有文件 + * @param path 目录路径 + * @param storageCredentials 存储实例 + * */ + fun listAll(path: String, storageCredentials: StorageCredentials): Stream } diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemClient.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemClient.kt index bb838e9050..627a98b867 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemClient.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemClient.kt @@ -40,6 +40,7 @@ import java.nio.file.FileAlreadyExistsException import java.nio.file.Files import java.nio.file.Path import java.nio.file.Paths +import java.util.stream.Stream /** * 本地文件存储客户端 @@ -140,7 +141,6 @@ class FileSystemClient(val root: String) { * 使用channel拷贝 * */ private fun copyByChannel(src: Path, target: Path) { - if (!Files.exists(src)) { throw IOException("src[$src] file not exist") } @@ -337,6 +337,10 @@ class FileSystemClient(val root: String) { return Files.size(filePath) } + fun walk(path: String): Stream { + return Files.walk(Paths.get(root, path)) + } + private fun transfer( input: ReadableByteChannel, output: FileChannel, diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemStorage.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemStorage.kt index f8ba8f32dd..1aeb22b379 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemStorage.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/filesystem/FileSystemStorage.kt @@ -40,8 +40,10 @@ import com.tencent.bkrepo.common.storage.credentials.StorageCredentials import java.io.File import java.io.InputStream import java.nio.file.Files +import java.nio.file.Path import java.nio.file.Paths import java.nio.file.StandardCopyOption +import java.util.stream.Stream /** * 文件系统存储 @@ -91,4 +93,8 @@ open class FileSystemStorage : AbstractEncryptorFileStorage { + return client.walk(path).filter { Files.isRegularFile(it) } + } } diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/InnerCosFileStorage.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/InnerCosFileStorage.kt index c320ad988e..d56f878b12 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/InnerCosFileStorage.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/InnerCosFileStorage.kt @@ -31,6 +31,7 @@ package com.tencent.bkrepo.common.storage.innercos +import com.tencent.bkrepo.common.api.constant.StringPool import com.tencent.bkrepo.common.artifact.stream.Range import com.tencent.bkrepo.common.storage.core.AbstractEncryptorFileStorage import com.tencent.bkrepo.common.storage.credentials.InnerCosCredentials @@ -38,12 +39,16 @@ import com.tencent.bkrepo.common.storage.innercos.client.CosClient import com.tencent.bkrepo.common.storage.innercos.request.CheckObjectExistRequest import com.tencent.bkrepo.common.storage.innercos.request.DeleteObjectRequest import com.tencent.bkrepo.common.storage.innercos.request.GetObjectRequest +import com.tencent.bkrepo.common.storage.innercos.request.ListObjectsRequest import com.tencent.bkrepo.common.storage.innercos.request.MigrateObjectRequest import com.tencent.bkrepo.common.storage.innercos.request.RestoreObjectRequest import org.slf4j.LoggerFactory import java.io.File import java.io.IOException import java.io.InputStream +import java.nio.file.Path +import java.nio.file.Paths +import java.util.stream.Stream /** * 内部cos文件存储实现类 @@ -100,6 +105,12 @@ open class InnerCosFileStorage : AbstractEncryptorFileStorage { + val keyPrefix = if (path == StringPool.ROOT) null else path + val listObjectsRequest = ListObjectsRequest(prefix = keyPrefix) + return client.listObjects(listObjectsRequest).map { Paths.get(it) } + } + override fun onCreateClient(credentials: InnerCosCredentials): CosClient { require(credentials.secretId.isNotBlank()) require(credentials.secretKey.isNotBlank()) diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/CosClient.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/CosClient.kt index af5f6fc9c8..0104df1aae 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/CosClient.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/client/CosClient.kt @@ -61,6 +61,7 @@ import com.tencent.bkrepo.common.storage.innercos.request.FileCleanupChunkedFutu import com.tencent.bkrepo.common.storage.innercos.request.GetObjectRequest import com.tencent.bkrepo.common.storage.innercos.request.HeadObjectRequest import com.tencent.bkrepo.common.storage.innercos.request.InitiateMultipartUploadRequest +import com.tencent.bkrepo.common.storage.innercos.request.ListObjectsRequest import com.tencent.bkrepo.common.storage.innercos.request.MigrateObjectRequest import com.tencent.bkrepo.common.storage.innercos.request.PartETag import com.tencent.bkrepo.common.storage.innercos.request.PutObjectRequest @@ -71,6 +72,7 @@ import com.tencent.bkrepo.common.storage.innercos.request.UploadPartRequest import com.tencent.bkrepo.common.storage.innercos.request.UploadPartRequestFactory import com.tencent.bkrepo.common.storage.innercos.response.CopyObjectResponse import com.tencent.bkrepo.common.storage.innercos.response.CosObject +import com.tencent.bkrepo.common.storage.innercos.response.ListObjectsResponse import com.tencent.bkrepo.common.storage.innercos.response.PutObjectResponse import com.tencent.bkrepo.common.storage.innercos.response.handler.CheckArchiveObjectExistResponseHandler import com.tencent.bkrepo.common.storage.innercos.response.handler.CheckObjectRestoreResponseHandler @@ -79,6 +81,8 @@ import com.tencent.bkrepo.common.storage.innercos.response.handler.CopyObjectRes import com.tencent.bkrepo.common.storage.innercos.response.handler.GetObjectResponseHandler import com.tencent.bkrepo.common.storage.innercos.response.handler.HeadObjectResponseHandler import com.tencent.bkrepo.common.storage.innercos.response.handler.InitiateMultipartUploadResponseHandler +import com.tencent.bkrepo.common.storage.innercos.response.handler.ListObjects0ResponseHandler +import com.tencent.bkrepo.common.storage.innercos.response.handler.ListObjectsResponseHandler import com.tencent.bkrepo.common.storage.innercos.response.handler.PutObjectResponseHandler import com.tencent.bkrepo.common.storage.innercos.response.handler.SlowLogHandler import com.tencent.bkrepo.common.storage.innercos.response.handler.UploadPartResponseHandler @@ -99,6 +103,7 @@ import java.util.concurrent.PriorityBlockingQueue import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger +import java.util.stream.Stream import kotlin.math.ceil import kotlin.math.max @@ -114,7 +119,6 @@ class CosClient(val credentials: InnerCosCredentials) { */ private val uploadThreadPool = Executors.newFixedThreadPool(config.uploadWorkers) - /** * 分块下载使用的执行器。可以为null,为null则不使用分块下载 * */ @@ -148,7 +152,6 @@ class CosClient(val credentials: InnerCosCredentials) { private val fastFallbackTimeout = config.timeout shr 1 - fun headObject(cosRequest: HeadObjectRequest): CosObject { val httpRequest = buildHttpRequest(cosRequest) return CosHttpClient.execute(httpRequest, HeadObjectResponseHandler()) @@ -291,7 +294,7 @@ class CosClient(val credentials: InnerCosCredentials) { return putObject(PutObjectRequest(key, StringPool.EMPTY.byteInputStream(), length)) } val partSize = calculateOptimalPartSize(length, true) - val factory = DownloadPartRequestFactory(key, partSize, 0, length-1) + val factory = DownloadPartRequestFactory(key, partSize, 0, length - 1) while (factory.hasMoreRequests()) { val getObjectRequest = factory.nextDownloadPartRequest() val downloadRequest = fromClient.buildHttpRequest(getObjectRequest) @@ -320,11 +323,21 @@ class CosClient(val credentials: InnerCosCredentials) { } } + fun listObjects(cosRequest: ListObjectsRequest): Stream { + val httpRequest = buildHttpRequest(cosRequest) + return CosHttpClient.execute(httpRequest, ListObjectsResponseHandler(this, cosRequest)) + } + + fun listObjects0(cosRequest: ListObjectsRequest): ListObjectsResponse { + val httpRequest = buildHttpRequest(cosRequest) + return CosHttpClient.execute(httpRequest, ListObjects0ResponseHandler()) + } + private fun multipartMigrate( key: String, uploadId: String, partNumber: Int, - downloadRequest: Request + downloadRequest: Request, ): Callable { return Callable { retry(RETRY_COUNT) { @@ -335,8 +348,8 @@ class CosClient(val credentials: InnerCosCredentials) { uploadId = uploadId, partNumber = partNumber, partSize = it.header(HttpHeaders.CONTENT_LENGTH)!!.toLong(), - inputStream = it.body!!.byteStream() - ) + inputStream = it.body!!.byteStream(), + ), ) PartETag(partNumber, CosHttpClient.execute(putObjectRequest, UploadPartResponseHandler()).eTag) } diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/http/CosHttpClient.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/http/CosHttpClient.kt index f29f8df58a..8395af6696 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/http/CosHttpClient.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/http/CosHttpClient.kt @@ -100,7 +100,11 @@ object CosHttpClient { .append(response.code) .appendLine("[${response.message}]") .appendLine(response.headers) - .appendLine(response.body?.bytes()?.toString(Charset.forName("GB2312"))) + try { + builder.appendLine(response.body?.bytes()?.toString(Charset.forName("GB2312"))) + } catch (e: Exception) { + logger.warn("read body error", e) + } } val message = builder.toString() logger.warn(message) diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/http/HttpResponseHandler.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/http/HttpResponseHandler.kt index 795740e75e..84c11fbcb5 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/http/HttpResponseHandler.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/http/HttpResponseHandler.kt @@ -41,10 +41,13 @@ abstract class HttpResponseHandler { open fun keepConnection(response: Response): Boolean = false companion object { - private val xmlMapper: XmlMapper = XmlMapper() + val xmlMapper: XmlMapper = XmlMapper() - fun readXmlValue(response: Response): Map<*, *> { - return xmlMapper.readValue(response.body?.string(), Map::class.java) + fun readXmlToMap(response: Response): Map<*, *> { + return readXmlValue(response) + } + inline fun readXmlValue(response: Response): T { + return xmlMapper.readValue(response.body?.string(), T::class.java) } } } diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/request/ListObjectsRequest.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/request/ListObjectsRequest.kt new file mode 100644 index 0000000000..c7440367f5 --- /dev/null +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/request/ListObjectsRequest.kt @@ -0,0 +1,52 @@ +/* + * Tencent is pleased to support the open source community by making BK-CI 蓝鲸持续集成平台 available. + * + * Copyright (C) 2020 THL A29 Limited, a Tencent company. All rights reserved. + * + * BK-CI 蓝鲸持续集成平台 is licensed under the MIT license. + * + * A copy of the MIT License is included in this file. + * + * + * Terms of the MIT License: + * --------------------------------------------------- + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +package com.tencent.bkrepo.common.storage.innercos.request + +import com.tencent.bkrepo.common.api.constant.StringPool.ROOT +import com.tencent.bkrepo.common.storage.innercos.http.HttpMethod +import okhttp3.RequestBody + +data class ListObjectsRequest( + val prefix: String? = null, + val marker: String? = null, + val maxKeys: Int = 1000, +) : CosRequest(HttpMethod.GET, ROOT) { + + init { + check(maxKeys <= 1000) + prefix?.let { parameters["prefix"] = prefix } + marker?.let { parameters["marker"] = marker } + parameters["max-keys"] = maxKeys.toString() + } + + override fun buildRequestBody(): RequestBody? = null +} diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/Content.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/Content.kt new file mode 100644 index 0000000000..d3257a8880 --- /dev/null +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/Content.kt @@ -0,0 +1,20 @@ +package com.tencent.bkrepo.common.storage.innercos.response + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty + +@JsonIgnoreProperties(ignoreUnknown = true) +data class Content( + @JacksonXmlProperty(localName = "Key") + var key: String = "", + @JacksonXmlProperty(localName = "LastModified") + var lastModified: String = "", + @JacksonXmlProperty(localName = "Created") + var created: String = "", + @JacksonXmlProperty(localName = "ETag") + var etag: String = "", + @JacksonXmlProperty(localName = "Size") + var size: Long = 0, + @JacksonXmlProperty(localName = "Forbid") + var forbid: Int = 0, +) diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/ListObjectsResponse.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/ListObjectsResponse.kt new file mode 100644 index 0000000000..b56305dcb9 --- /dev/null +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/ListObjectsResponse.kt @@ -0,0 +1,24 @@ +package com.tencent.bkrepo.common.storage.innercos.response + +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlElementWrapper +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlProperty +import com.fasterxml.jackson.dataformat.xml.annotation.JacksonXmlRootElement + +@JacksonXmlRootElement(localName = "ListBucketResult") +data class ListObjectsResponse( + @JacksonXmlProperty(localName = "Name") + var name: String = "", + @JacksonXmlProperty(localName = "Prefix") + var prefix: String = "", + @JacksonXmlProperty(localName = "Marker") + var marker: String = "", + @JacksonXmlProperty(localName = "IsTruncated") + var sTruncated: Boolean = false, + @JacksonXmlProperty(localName = "MaxKeys") + var maxKeys: Int = 0, + @JacksonXmlProperty(localName = "NextMarker") + var nextMarker: String? = null, + @JacksonXmlElementWrapper(useWrapping = false) + @JacksonXmlProperty(localName = "Contents") + var contents: List = mutableListOf(), +) diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/CompleteMultipartUploadResponseHandler.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/CompleteMultipartUploadResponseHandler.kt index 6ac1699b5e..827a411c7d 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/CompleteMultipartUploadResponseHandler.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/CompleteMultipartUploadResponseHandler.kt @@ -40,8 +40,8 @@ import okhttp3.Response class CompleteMultipartUploadResponseHandler : HttpResponseHandler() { override fun handle(response: Response): PutObjectResponse { return PutObjectResponse( - readXmlValue(response)[ETAG].toString(), - response.header(RESPONSE_CRC64) + readXmlToMap(response)[ETAG].toString(), + response.header(RESPONSE_CRC64), ) } } diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/CopyObjectResponseHandler.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/CopyObjectResponseHandler.kt index abdf3fd73d..4e03e6d91a 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/CopyObjectResponseHandler.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/CopyObjectResponseHandler.kt @@ -39,7 +39,7 @@ import okhttp3.Response class CopyObjectResponseHandler : HttpResponseHandler() { override fun handle(response: Response): CopyObjectResponse { - val result = readXmlValue(response) + val result = readXmlToMap(response) val eTag = (result[ETAG].toString()).trim('"') val lastModified = result[RESPONSE_LAST_MODIFIED].toString() return CopyObjectResponse(eTag, lastModified) diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/InitiateMultipartUploadResponseHandler.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/InitiateMultipartUploadResponseHandler.kt index 0dd4916102..544103a918 100644 --- a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/InitiateMultipartUploadResponseHandler.kt +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/InitiateMultipartUploadResponseHandler.kt @@ -37,6 +37,6 @@ import okhttp3.Response class InitiateMultipartUploadResponseHandler : HttpResponseHandler() { override fun handle(response: Response): String { - return readXmlValue(response)[RESPONSE_UPLOAD_ID].toString() + return readXmlToMap(response)[RESPONSE_UPLOAD_ID].toString() } } diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/ListObjects0ResponseHandler.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/ListObjects0ResponseHandler.kt new file mode 100644 index 0000000000..c73abeae96 --- /dev/null +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/ListObjects0ResponseHandler.kt @@ -0,0 +1,11 @@ +package com.tencent.bkrepo.common.storage.innercos.response.handler + +import com.tencent.bkrepo.common.storage.innercos.http.HttpResponseHandler +import com.tencent.bkrepo.common.storage.innercos.response.ListObjectsResponse +import okhttp3.Response + +class ListObjects0ResponseHandler : HttpResponseHandler() { + override fun handle(response: Response): ListObjectsResponse { + return readXmlValue(response) + } +} diff --git a/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/ListObjectsResponseHandler.kt b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/ListObjectsResponseHandler.kt new file mode 100644 index 0000000000..f4362e6937 --- /dev/null +++ b/src/backend/common/common-storage/storage-service/src/main/kotlin/com/tencent/bkrepo/common/storage/innercos/response/handler/ListObjectsResponseHandler.kt @@ -0,0 +1,50 @@ +package com.tencent.bkrepo.common.storage.innercos.response.handler + +import com.tencent.bkrepo.common.storage.innercos.client.CosClient +import com.tencent.bkrepo.common.storage.innercos.http.HttpResponseHandler +import com.tencent.bkrepo.common.storage.innercos.request.ListObjectsRequest +import com.tencent.bkrepo.common.storage.innercos.response.ListObjectsResponse +import okhttp3.Response +import org.slf4j.LoggerFactory +import java.util.stream.Stream +import kotlin.streams.asStream + +class ListObjectsResponseHandler(val client: CosClient, val req: ListObjectsRequest) : + HttpResponseHandler>() { + override fun handle(response: Response): Stream { + val listRes = readXmlValue(response) + return Itr(listRes, client, req).asSequence().asStream() + } + + private class Itr(var response: ListObjectsResponse, val client: CosClient, val req: ListObjectsRequest) : + Iterator { + + private var it = response.contents.iterator() + private var nextMarker = response.nextMarker + override fun hasNext(): Boolean { + if (!it.hasNext() && nextMarker?.isNotEmpty() == true) { + load() + } + return it.hasNext() + } + + override fun next(): String { + return it.next().key + } + + private fun load() { + val listObjectsRequest = ListObjectsRequest(prefix = req.prefix, marker = response.nextMarker, req.maxKeys) + response = client.listObjects0(listObjectsRequest) + val size = response.contents.size + if (logger.isDebugEnabled && size > 0) { + logger.debug("load $size objects.") + } + it = response.contents.iterator() + nextMarker = response.nextMarker + } + } + + companion object { + private val logger = LoggerFactory.getLogger(ListObjectsResponseHandler::class.java) + } +} diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt new file mode 100644 index 0000000000..d1e414fa6a --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJob.kt @@ -0,0 +1,121 @@ +package com.tencent.bkrepo.job.batch.task.storage + +import com.google.common.hash.BloomFilter +import com.google.common.hash.Funnels +import com.tencent.bkrepo.common.api.constant.StringPool +import com.tencent.bkrepo.common.service.cluster.properties.ClusterProperties +import com.tencent.bkrepo.common.storage.core.FileStorage +import com.tencent.bkrepo.common.storage.core.StorageProperties +import com.tencent.bkrepo.common.storage.credentials.StorageCredentials +import com.tencent.bkrepo.job.CREDENTIALS +import com.tencent.bkrepo.job.SHA256 +import com.tencent.bkrepo.job.batch.base.DefaultContextJob +import com.tencent.bkrepo.job.batch.base.JobContext +import com.tencent.bkrepo.job.batch.utils.NodeCommonUtils +import com.tencent.bkrepo.job.config.properties.FileReferenceCleanupJobProperties +import com.tencent.bkrepo.job.config.properties.StorageReconcileJobProperties +import com.tencent.bkrepo.repository.api.FileReferenceClient +import com.tencent.bkrepo.repository.api.StorageCredentialsClient +import org.slf4j.LoggerFactory +import org.springframework.boot.context.properties.EnableConfigurationProperties +import org.springframework.data.mongodb.core.query.Criteria +import org.springframework.data.mongodb.core.query.Criteria.where +import org.springframework.data.mongodb.core.query.Query +import org.springframework.data.mongodb.core.query.isEqualTo +import org.springframework.stereotype.Component +import java.nio.charset.StandardCharsets + +/** + * 实际存储与文件引用进行对账,删除存储中多余的文件。 + * 支持安全模式,在安全模式下,不区分数据库中的存储实例, + * 只要数据库存在文件引用,就不会删除实际存储。 + * */ +@Component +@EnableConfigurationProperties(StorageReconcileJobProperties::class) +@Suppress("UnstableApiUsage") +class StorageReconcileJob( + private val properties: StorageReconcileJobProperties, + private val bloomFilterProp: FileReferenceCleanupJobProperties, + private val fileStorage: FileStorage, + private val clusterProperties: ClusterProperties, + private val storageCredentialsClient: StorageCredentialsClient, + private val storageProperties: StorageProperties, + private val fileReferenceClient: FileReferenceClient, +) : DefaultContextJob(properties) { + override fun doStart0(jobContext: JobContext) { + // 校验默认存储 + reconcile(storageProperties.defaultStorageCredentials()) + + // 校验其他存储 + storageCredentialsClient.list(clusterProperties.region).data?.forEach { + reconcile(it) + } + } + + /** + * 存储对账,只会处理sha256为文件名的文件 + * 通过二次确认,以保证不会存在误删 + * 1. 先加载引用快照 + * 2. 比对实际存储中的文件,筛选出待删除文件列表 + * 3. 再次加载引用快照,从待删除文件列表中确定需要删除的文件。 + * */ + private fun reconcile(storageCredentials: StorageCredentials) { + val credentialsKey = storageCredentials.key + logger.info("Start reconcile storage [$credentialsKey]") + val firstRefSnapshot = buildBloomFilter(storageCredentials) + var total = 0L + var deleted = 0L + val pendingDeleteList = mutableSetOf() + fileStorage.listAll(StringPool.ROOT, storageCredentials).map { it.toFile().name }.forEach { + total++ + if (it.length == SHA256_LEN && !firstRefSnapshot.mightContain(it)) { + // 准备删除 + logger.info("File [$it] miss ref.") + pendingDeleteList.add(it) + } + } + // 二次确认,因为待确认的文件数量远低于总体数量,所以这里采用直接查表,效率更高些。 + pendingDeleteList.forEach { + val exists = fileReferenceClient.exists(it, credentialsKey).data ?: true + if (!exists) { + logger.info("Delete file [$it]") + fileReferenceClient.increment(it, credentialsKey, 0) + deleted++ + } + } + logger.info("Reconcile storage [$credentialsKey] successful, deleted[$deleted], total[$total].") + } + + private fun buildBloomFilter(storageCredentials: StorageCredentials): BloomFilter { + logger.info("Start build bloom filter.") + val bf = BloomFilter.create( + Funnels.stringFunnel(StandardCharsets.UTF_8), + bloomFilterProp.expectedNodes, + bloomFilterProp.fpp, + ) + val criteria = Criteria().apply { + if (!properties.safeMode) { + where(CREDENTIALS).isEqualTo(storageCredentials.key) + } else { + logger.info("Work in safe mode") + } + } + val query = Query(criteria) + query.fields().include(SHA256) + NodeCommonUtils.forEachRefByCollectionParallel(query) { + val sha256 = it[SHA256]?.toString() + if (sha256 != null) { + bf.put(sha256) + } + } + val count = "${bf.approximateElementCount()}/${bloomFilterProp.expectedNodes}" + val fpp = bf.expectedFpp() + logger.info("Build bloom filter successful,key: ${storageCredentials.key},count: $count,fpp: $fpp") + return bf + } + + companion object { + private val logger = LoggerFactory.getLogger(StorageReconcileJob::class.java) + private const val SHA256_LEN = 64 + } +} diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/NodeCommonUtils.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/NodeCommonUtils.kt index 5a1786bb04..bc2af70139 100644 --- a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/NodeCommonUtils.kt +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/batch/utils/NodeCommonUtils.kt @@ -55,6 +55,7 @@ class NodeCommonUtils( lateinit var separationTaskService: SeparationTaskService private const val COLLECTION_NAME_PREFIX = "node_" private const val SEPARATION_COLLECTION_NAME_PREFIX = "separation_node_" + private const val FILE_REFERENCE_COLLECTION_NAME_PREFIX = "file_reference_" private val workPool = ThreadPoolExecutor( Runtime.getRuntime().availableProcessors(), Runtime.getRuntime().availableProcessors(), @@ -138,6 +139,19 @@ class NodeCommonUtils( futures.forEach { it.get() } } + fun forEachRefByCollectionParallel( + query: Query, + batchSize: Int = BATCH_SIZE, + consumer: Consumer>, + ) { + val futures = mutableListOf>() + for (i in 0 until SHARDING_COUNT) { + val collection = FILE_REFERENCE_COLLECTION_NAME_PREFIX.plus(i) + futures.add(workPool.submit { findByCollection(query, batchSize, collection, consumer) }) + } + futures.forEach { it.get() } + } + fun forEachColdNodeByCollectionParallel( query: Query, batchSize: Int = BATCH_SIZE, diff --git a/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/StorageReconcileJobProperties.kt b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/StorageReconcileJobProperties.kt new file mode 100644 index 0000000000..e0b458962d --- /dev/null +++ b/src/backend/job/biz-job/src/main/kotlin/com/tencent/bkrepo/job/config/properties/StorageReconcileJobProperties.kt @@ -0,0 +1,9 @@ +package com.tencent.bkrepo.job.config.properties + +import org.springframework.boot.context.properties.ConfigurationProperties + +@ConfigurationProperties("job.storage-reconcile") +class StorageReconcileJobProperties( + override var cron: String = "0 0 0 1 */1 ?", + var safeMode: Boolean = true, // 安全模式下,只要数据库存在引用,即保留存储。 +) : BatchJobProperties() diff --git a/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/JobBaseTest.kt b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/JobBaseTest.kt index c355f98eb9..7c308ca6c9 100644 --- a/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/JobBaseTest.kt +++ b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/JobBaseTest.kt @@ -31,7 +31,9 @@ import com.tencent.bkrepo.common.job.JobAutoConfiguration import com.tencent.bkrepo.common.service.cluster.properties.ClusterProperties import com.tencent.bkrepo.common.service.util.SpringContextUtils import com.tencent.bkrepo.common.storage.StorageAutoConfiguration +import com.tencent.bkrepo.job.batch.file.ExpireFileResolverConfig import com.tencent.bkrepo.job.config.JobConfig +import com.tencent.bkrepo.job.config.ScheduledTaskConfigurer import io.mockk.every import io.mockk.mockk import io.mockk.mockkObject @@ -45,6 +47,7 @@ import org.springframework.boot.autoconfigure.task.TaskSchedulingAutoConfigurati import org.springframework.cloud.sleuth.Tracer import org.springframework.cloud.sleuth.otel.bridge.OtelTracer import org.springframework.context.annotation.ComponentScan +import org.springframework.context.annotation.FilterType import org.springframework.context.annotation.Import import org.springframework.test.context.TestPropertySource @@ -60,10 +63,18 @@ import org.springframework.test.context.TestPropertySource @TestPropertySource( locations = [ "classpath:bootstrap-ut.properties", - "classpath:job-ut.properties" - ] + "classpath:job-ut.properties", + ], +) +@ComponentScan( + basePackages = ["com.tencent.bkrepo.job"], + excludeFilters = [ + ComponentScan.Filter( + type = FilterType.ASSIGNABLE_TYPE, + value = [ScheduledTaskConfigurer::class, ExpireFileResolverConfig::class], + ), + ], ) -@ComponentScan(basePackages = ["com.tencent.bkrepo.job"]) @SpringBootConfiguration @EnableAutoConfiguration @TestInstance(TestInstance.Lifecycle.PER_CLASS) diff --git a/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJobTest.kt b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJobTest.kt new file mode 100644 index 0000000000..da46b102a9 --- /dev/null +++ b/src/backend/job/biz-job/src/test/kotlin/com/tencent/bkrepo/job/batch/task/storage/StorageReconcileJobTest.kt @@ -0,0 +1,92 @@ +package com.tencent.bkrepo.job.batch.task.storage + +import com.tencent.bkrepo.common.artifact.api.ArtifactFile +import com.tencent.bkrepo.common.artifact.api.FileSystemArtifactFile +import com.tencent.bkrepo.common.service.util.ResponseBuilder +import com.tencent.bkrepo.common.storage.core.StorageProperties +import com.tencent.bkrepo.common.storage.core.StorageService +import com.tencent.bkrepo.common.storage.credentials.FileSystemCredentials +import com.tencent.bkrepo.common.storage.util.toPath +import com.tencent.bkrepo.job.batch.JobBaseTest +import com.tencent.bkrepo.job.batch.utils.NodeCommonUtils +import com.tencent.bkrepo.job.migrate.MigrateRepoStorageService +import com.tencent.bkrepo.repository.api.FileReferenceClient +import com.tencent.bkrepo.repository.api.RepositoryClient +import com.tencent.bkrepo.repository.api.StorageCredentialsClient +import org.junit.jupiter.api.AfterEach +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.DisplayName +import org.junit.jupiter.api.Test +import org.mockito.Mockito.anyLong +import org.mockito.Mockito.anyString +import org.mockito.Mockito.isNull +import org.mockito.Mockito.`when` +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.autoconfigure.data.mongo.DataMongoTest +import org.springframework.boot.test.mock.mockito.MockBean +import kotlin.random.Random + +@DisplayName("存储对账Job测试") +@DataMongoTest(properties = ["job.file-reference-cleanup.expectedNodes=100"]) +class StorageReconcileJobTest @Autowired constructor( + private val storageReconcileJob: StorageReconcileJob, + private val storageService: StorageService, + private val storageProperties: StorageProperties, +) : JobBaseTest() { + + @MockBean + lateinit var migrateRepoStorageService: MigrateRepoStorageService + + @MockBean + lateinit var storageCredentialsClient: StorageCredentialsClient + + @MockBean + lateinit var fileReferenceClient: FileReferenceClient + + @MockBean + lateinit var repositoryClient: RepositoryClient + + @Autowired + lateinit var nodeCommonUtils: NodeCommonUtils + private val cred = storageProperties.defaultStorageCredentials() as FileSystemCredentials + + init { + cred.path = System.getProperty("java.io.tmpdir").plus("/ut-test") + } + + @BeforeEach + fun before() { + `when`(storageCredentialsClient.list(null)).thenReturn(ResponseBuilder.success(emptyList())) + } + + @AfterEach + fun afterEach() { + cred.path.toPath().toFile().deleteRecursively() + } + + @Test + fun reconcileTest() { + var checked = 0 + `when`(fileReferenceClient.increment(anyString(), isNull(), anyLong())).then { + checked++ + ResponseBuilder.success(true) + } + `when`(fileReferenceClient.exists(anyString(), isNull())).then { + ResponseBuilder.success(false) + } + repeat(10) { + val file = createTempArtifactFile() + storageService.store(file.getFileSha256(), file, null) + } + storageReconcileJob.start() + Assertions.assertEquals(10, checked) + } + + private fun createTempArtifactFile(): ArtifactFile { + val data = Random.nextBytes(Random.nextInt(1024, 1 shl 20)) + val tempFile = createTempFile() + tempFile.writeBytes(data) + return FileSystemArtifactFile(tempFile) + } +}