Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bolt/connectors/hive/HiveDataSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "bolt/connectors/hive/HiveConfig.h"
#include "bolt/connectors/hive/HivePartitionFunction.h"
#include "bolt/connectors/hive/TableHandle.h"
#include "bolt/connectors/hive/storage_adapters/hdfs/HdfsUtil.h"
#include "bolt/core/ITypedExpr.h"
#include "bolt/dwio/common/SortingWriter.h"
#include "bolt/exec/OperatorUtils.h"
Expand Down Expand Up @@ -410,6 +411,8 @@ HiveDataSink::HiveDataSink(
fileOptions_.values[key] = value.value();
}
}
filesystems::setHdfsOpenFileOptionsFromConfig(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

θΏ™ι‡Œε―δ»₯η›΄ζŽ₯把 key εŠ θΏ› HiveConfig::hms_session_key

connectorQueryCtx_->sessionProperties(), fileOptions_);

if (!isBucketed()) {
return;
Expand Down
3 changes: 3 additions & 0 deletions bolt/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "bolt/connectors/hive/PaimonMiscHelpers.h"
#include "bolt/connectors/hive/PaimonSplitReader.h"
#include "bolt/connectors/hive/SplitReader.h"
#include "bolt/connectors/hive/storage_adapters/hdfs/HdfsUtil.h"
#include "bolt/dwio/common/ReaderFactory.h"
#include "bolt/dwio/common/exception/Exception.h"
#include "bolt/expression/FieldReference.h"
Expand Down Expand Up @@ -75,6 +76,8 @@ HiveDataSource::HiveDataSource(
fsSessionConfig_.values[key] = value.value();
}
}
filesystems::setHdfsOpenFileOptionsFromConfig(
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上

connectorQueryCtx_->sessionProperties(), fsSessionConfig_);
fsSessionConfig_.bufferSize = static_cast<size_t>(hiveConfig_->loadQuantum());
native_cache_enabled = queryConfig.isNativeCacheEnabled();
IgnoreCorruptFileHelper::globalInitialize(
Expand Down
17 changes: 13 additions & 4 deletions bolt/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "bolt/connectors/hive/storage_adapters/hdfs/HdfsFileSystem.h"
#include "bolt/common/config/Config.h"
#include "bolt/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h"
#include "bolt/connectors/hive/storage_adapters/hdfs/HdfsUtil.h"
#include "bolt/connectors/hive/storage_adapters/hdfs/HdfsWriteFile.h"
#include "bolt/external/hdfs/ArrowHdfsInternal.h"
namespace bytedance::bolt::filesystems {
Expand Down Expand Up @@ -123,23 +124,31 @@ std::string HdfsFileSystem::name() const {

std::unique_ptr<ReadFile> HdfsFileSystem::openFileForRead(
std::string_view path,
const FileOptions& /*unused*/) {
const FileOptions& options) {
// Only remove the scheme for hdfs path.
if (path.find(kScheme) == 0) {
path.remove_prefix(kScheme.length());
if (auto index = path.find('/')) {
path.remove_prefix(index);
}
}
const auto bufferSize =
parseHdfsOpenFileIntOption(options, HdfsOpenFileOptions::kBufferSize);
return std::make_unique<HdfsReadFile>(
impl_->hdfsShim(), impl_->hdfsClient(), path);
impl_->hdfsShim(), impl_->hdfsClient(), path, bufferSize);
}

std::unique_ptr<WriteFile> HdfsFileSystem::openFileForWrite(
std::string_view path,
const FileOptions& /*unused*/) {
const FileOptions& options) {
const auto hdfsOptions = getHdfsOpenFileOptions(options);
return std::make_unique<HdfsWriteFile>(
impl_->hdfsShim(), impl_->hdfsClient(), path);
impl_->hdfsShim(),
impl_->hdfsClient(),
path,
hdfsOptions.bufferSize,
hdfsOptions.replication,
hdfsOptions.blockSize);
}

void HdfsFileSystem::close() {
Expand Down
21 changes: 14 additions & 7 deletions bolt/connectors/hive/storage_adapters/hdfs/HdfsReadFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ struct HdfsFile {
void open(
filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS client,
const std::string& path) {
const std::string& path,
int bufferSize) {
driver_ = driver;
client_ = client;
handle_ = driver->OpenFile(client, path.data(), O_RDONLY, 0, 0, 0);
handle_ = driver->OpenFile(client, path.data(), O_RDONLY, bufferSize, 0, 0);
BOLT_CHECK_NOT_NULL(
handle_,
"Unable to open file {}. got error: {}",
Expand Down Expand Up @@ -78,8 +79,12 @@ class HdfsReadFile::Impl {
Impl(
filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS hdfs,
const std::string_view path)
: driver_(driver), hdfsClient_(hdfs), filePath_(path) {
const std::string_view path,
int bufferSize)
: driver_(driver),
hdfsClient_(hdfs),
filePath_(path),
bufferSize_(bufferSize) {
fileInfo_ = driver_->GetPathInfo(hdfsClient_, filePath_.data());
if (fileInfo_ == nullptr) {
auto error = fmt::format(
Expand All @@ -105,7 +110,7 @@ class HdfsReadFile::Impl {
void preadInternal(uint64_t offset, uint64_t length, char* pos) const {
checkFileReadParameters(offset, length);
if (!file_->handle_) {
file_->open(driver_, hdfsClient_, filePath_);
file_->open(driver_, hdfsClient_, filePath_, bufferSize_);
}
file_->seek(offset);
uint64_t totalBytesRead = 0;
Expand Down Expand Up @@ -160,15 +165,17 @@ class HdfsReadFile::Impl {
filesystems::arrow::io::internal::LibHdfsShim* driver_;
hdfsFS hdfsClient_;
std::string filePath_;
int bufferSize_;
hdfsFileInfo* fileInfo_;
folly::ThreadLocal<HdfsFile> file_;
};

HdfsReadFile::HdfsReadFile(
filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS hdfs,
const std::string_view path)
: pImpl(std::make_unique<Impl>(driver, hdfs, path)) {}
const std::string_view path,
int bufferSize)
: pImpl(std::make_unique<Impl>(driver, hdfs, path, bufferSize)) {}

HdfsReadFile::~HdfsReadFile() = default;

Expand Down
3 changes: 2 additions & 1 deletion bolt/connectors/hive/storage_adapters/hdfs/HdfsReadFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ class HdfsReadFile final : public ReadFile {
explicit HdfsReadFile(
filesystems::arrow::io::internal::LibHdfsShim* driver,
hdfsFS hdfs,
std::string_view path);
std::string_view path,
int bufferSize = 0);
~HdfsReadFile() override;

std::string_view pread(uint64_t offset, uint64_t length, void* buf)
Expand Down
85 changes: 85 additions & 0 deletions bolt/connectors/hive/storage_adapters/hdfs/HdfsUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,28 @@

#pragma once

#include <cstdint>
#include <limits>
#include <string>
#include <string_view>

#include <folly/Conv.h>

#include "bolt/common/config/Config.h"
#include "bolt/common/file/FileSystems.h"

namespace bytedance::bolt::filesystems {

struct HdfsOpenFileOptions {
static constexpr const char* kBufferSize = "bolt.io.file.buffer.size";
static constexpr const char* kReplication = "bolt.dfs.replication";
static constexpr const char* kBlockSize = "bolt.dfs.blocksize";

int bufferSize{0};
short replication{0};
int blockSize{0};
};

inline std::string getHdfsPath(
const std::string& filePath,
const std::string_view& kScheme) {
Expand All @@ -46,4 +65,70 @@ inline std::string getHdfsPath(
return std::string(filePath.substr(endOfAuthority));
}

inline int parseHdfsOpenFileIntOption(
const FileOptions& options,
const char* key) {
auto it = options.values.find(key);
if (it == options.values.end()) {
return 0;
}

auto value = folly::tryTo<int64_t>(it->second);
BOLT_CHECK(
value.hasValue(),
"Invalid HDFS open file option '{}': '{}'. Expected an integer.",
key,
it->second);
BOLT_CHECK_GE(
value.value(),
0,
"Invalid HDFS open file option '{}': '{}'. Expected a non-negative integer.",
key,
it->second);
BOLT_CHECK_LE(
value.value(),
std::numeric_limits<int>::max(),
"Invalid HDFS open file option '{}': '{}'. Exceeds int range.",
key,
it->second);
return static_cast<int>(value.value());
}

inline HdfsOpenFileOptions getHdfsOpenFileOptions(const FileOptions& options) {
HdfsOpenFileOptions hdfsOptions;
hdfsOptions.bufferSize =
parseHdfsOpenFileIntOption(options, HdfsOpenFileOptions::kBufferSize);
hdfsOptions.blockSize =
parseHdfsOpenFileIntOption(options, HdfsOpenFileOptions::kBlockSize);

const auto replication =
parseHdfsOpenFileIntOption(options, HdfsOpenFileOptions::kReplication);
BOLT_CHECK_LE(
replication,
std::numeric_limits<short>::max(),
"Invalid HDFS open file option '{}': '{}'. Exceeds short range.",
HdfsOpenFileOptions::kReplication,
replication);
hdfsOptions.replication = static_cast<short>(replication);
return hdfsOptions;
}

inline void setHdfsOpenFileOptionsFromConfig(
const config::ConfigBase* config,
FileOptions& options) {
if (config == nullptr) {
return;
}

for (const auto* key :
{HdfsOpenFileOptions::kBufferSize,
HdfsOpenFileOptions::kReplication,
HdfsOpenFileOptions::kBlockSize}) {
auto value = config->get<std::string>(key);
if (value.hasValue()) {
options.values[key] = value.value();
}
}
}

} // namespace bytedance::bolt::filesystems
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,11 @@ hdfsWriteFileSinkGenerator() {
getHdfsPath(fileURI, HdfsFileSystem::kScheme);
auto fileSystem =
filesystems::getFileSystem(fileURI, options.connectorProperties);
const auto fileOptions = options.fileOptions == nullptr
? FileOptions{}
: *options.fileOptions;
return std::make_unique<dwio::common::WriteFileSink>(
fileSystem->openFileForWrite(pathSuffix),
fileSystem->openFileForWrite(pathSuffix, fileOptions),
fileURI,
options.metricLogger,
options.stats);
Expand Down
62 changes: 62 additions & 0 deletions bolt/connectors/hive/storage_adapters/hdfs/tests/HdfsUtilTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@

#include "bolt/connectors/hive/storage_adapters/hdfs/HdfsUtil.h"

#include "bolt/common/base/Exceptions.h"
#include "bolt/common/file/FileSystems.h"

#include "gtest/gtest.h"
using namespace bytedance::bolt::filesystems;

Expand All @@ -46,3 +49,62 @@ TEST(HdfsUtilTest, getHdfsPath) {
std::string path3 = getHdfsPath("hdfs:///user/hive/a.txt", kScheme);
EXPECT_EQ("/user/hive/a.txt", path3);
}

TEST(HdfsUtilTest, hdfsOpenFileOptionKeys) {
EXPECT_STREQ(HdfsOpenFileOptions::kBufferSize, "bolt.io.file.buffer.size");
EXPECT_STREQ(HdfsOpenFileOptions::kReplication, "bolt.dfs.replication");
EXPECT_STREQ(HdfsOpenFileOptions::kBlockSize, "bolt.dfs.blocksize");
}

TEST(HdfsUtilTest, getHdfsOpenFileOptions) {
FileOptions defaultOptions;
const auto defaultHdfsOptions = getHdfsOpenFileOptions(defaultOptions);
EXPECT_EQ(defaultHdfsOptions.bufferSize, 0);
EXPECT_EQ(defaultHdfsOptions.replication, 0);
EXPECT_EQ(defaultHdfsOptions.blockSize, 0);

FileOptions options;
options.values[HdfsOpenFileOptions::kBufferSize] = "4096";
options.values[HdfsOpenFileOptions::kReplication] = "2";
options.values[HdfsOpenFileOptions::kBlockSize] = "134217728";

const auto hdfsOptions = getHdfsOpenFileOptions(options);
EXPECT_EQ(hdfsOptions.bufferSize, 4096);
EXPECT_EQ(hdfsOptions.replication, 2);
EXPECT_EQ(hdfsOptions.blockSize, 134217728);
}

TEST(HdfsUtilTest, getHdfsOpenFileOptionsRejectsInvalidValues) {
FileOptions negativeReplication;
negativeReplication.values[HdfsOpenFileOptions::kReplication] = "-1";
EXPECT_THROW(
getHdfsOpenFileOptions(negativeReplication),
bytedance::bolt::BoltException);

FileOptions largeReplication;
largeReplication.values[HdfsOpenFileOptions::kReplication] = "32768";
EXPECT_THROW(
getHdfsOpenFileOptions(largeReplication), bytedance::bolt::BoltException);

FileOptions invalidBufferSize;
invalidBufferSize.values[HdfsOpenFileOptions::kBufferSize] = "not-a-number";
EXPECT_THROW(
getHdfsOpenFileOptions(invalidBufferSize),
bytedance::bolt::BoltException);
}

TEST(HdfsUtilTest, setHdfsOpenFileOptionsFromConfig) {
auto config = std::make_shared<const bytedance::bolt::config::ConfigBase>(
std::unordered_map<std::string, std::string>{
{HdfsOpenFileOptions::kBufferSize, "4096"},
{HdfsOpenFileOptions::kReplication, "2"},
{HdfsOpenFileOptions::kBlockSize, "134217728"}});

FileOptions fileOptions;
setHdfsOpenFileOptionsFromConfig(config.get(), fileOptions);

EXPECT_EQ(fileOptions.values.at(HdfsOpenFileOptions::kBufferSize), "4096");
EXPECT_EQ(fileOptions.values.at(HdfsOpenFileOptions::kReplication), "2");
EXPECT_EQ(
fileOptions.values.at(HdfsOpenFileOptions::kBlockSize), "134217728");
}