Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions be/src/io/fs/file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ struct FileCacheAllocatorBuilder;

// Only affects remote file writers
struct FileWriterOptions {
// S3 committer will start multipart uploading all files on BE side,
// and then complete multipart upload these files on FE side.
// If you do not complete multi parts of a file, the file will not be visible.
// So in this way, the atomicity of a single file can be guaranteed. But it still cannot
// guarantee the atomicity of multiple files.
// Because hive committers have best-effort semantics,
// this shortens the inconsistent time window.
bool used_by_s3_committer = false;
bool write_file_cache = false;
bool is_cold_data = false;
bool sync_file_data = true; // Whether flush data into storage system
Expand Down
130 changes: 77 additions & 53 deletions be/src/io/fs/s3_file_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ S3FileWriter::S3FileWriter(std::shared_ptr<Aws::S3::S3Client> client, std::strin
: _path(fmt::format("s3://{}/{}", bucket, key)),
_bucket(std::move(bucket)),
_key(std::move(key)),
_client(std::move(client)) {
_client(std::move(client)),
_used_by_s3_committer(opts ? opts->used_by_s3_committer : false) {
s3_file_writer_total << 1;
s3_file_being_written << 1;
Aws::Http::SetCompliantRfc3986Encoding(true);
Expand Down Expand Up @@ -195,10 +196,7 @@ Status S3FileWriter::close() {
Defer defer {[this] { _closed = true; }};

if (_upload_id.empty() && _pending_buf) {
// It might be one file less than 5MB, and call close without finalize
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); });
RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
}

if (_bytes_appended == 0) {
Expand All @@ -225,6 +223,13 @@ Status S3FileWriter::close() {
RETURN_IF_ERROR(builder.build(&_pending_buf));
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
if (_used_by_s3_committer) {
buf->set_upload_to_remote([part_num = _cur_part_num, this](UploadFileBuffer& buf) {
_upload_one_part(part_num, buf);
});
DCHECK(_cur_part_num == 1);
RETURN_IF_ERROR(_create_multi_upload_request());
}
}

if (_pending_buf != nullptr) {
Expand Down Expand Up @@ -392,56 +397,61 @@ Status S3FileWriter::_complete() {
_wait_until_finish("PutObject");
return _st;
}
CompleteMultipartUploadRequest complete_request;
complete_request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);

// Wait multipart load and finish.
_wait_until_finish("Complete");
DBUG_EXECUTE_IF("s3_file_writer::_complete:1", { _cur_part_num++; });
if (_failed || _completed_parts.size() != _cur_part_num) {
_st = Status::InternalError(
"error status {}, complete parts {}, cur part num {}, whole parts {}", _st,
_completed_parts.size(), _cur_part_num, _dump_completed_part());
LOG(WARNING) << _st;
return _st;
}
// make sure _completed_parts are ascending order
std::sort(_completed_parts.begin(), _completed_parts.end(),
[](auto& p1, auto& p2) { return p1->GetPartNumber() < p2->GetPartNumber(); });
DBUG_EXECUTE_IF("s3_file_writer::_complete:2",
{ _completed_parts.back()->SetPartNumber(10 * _completed_parts.size()); });
CompletedMultipartUpload completed_upload;
for (size_t i = 0; i < _completed_parts.size(); i++) {
if (_completed_parts[i]->GetPartNumber() != i + 1) [[unlikely]] {
auto st = Status::InternalError(
"error status {}, part num not continous, expected num {}, actual num {}, "
"whole parts {}",
_st, i + 1, _completed_parts[i]->GetPartNumber(), _dump_completed_part());
LOG(WARNING) << st;
_st = st;
return st;
if (!_used_by_s3_committer) { // S3 committer will complete multipart upload file on FE side.
CompleteMultipartUploadRequest complete_request;
complete_request.WithBucket(_bucket).WithKey(_key).WithUploadId(_upload_id);

if (_failed || _completed_parts.size() != _cur_part_num) {
_st = Status::InternalError(
"error status {}, complete parts {}, cur part num {}, whole parts {}", _st,
_completed_parts.size(), _cur_part_num, _dump_completed_part());
LOG(WARNING) << _st;
return _st;
}
// make sure _completed_parts are ascending order
std::sort(_completed_parts.begin(), _completed_parts.end(),
[](auto& p1, auto& p2) { return p1->GetPartNumber() < p2->GetPartNumber(); });
DBUG_EXECUTE_IF("s3_file_writer::_complete:2",
{ _completed_parts.back()->SetPartNumber(10 * _completed_parts.size()); });
CompletedMultipartUpload completed_upload;
for (size_t i = 0; i < _completed_parts.size(); i++) {
if (_completed_parts[i]->GetPartNumber() != i + 1) [[unlikely]] {
auto st = Status::InternalError(
"error status {}, part num not continous, expected num {}, actual num {}, "
"whole parts {}",
_st, i + 1, _completed_parts[i]->GetPartNumber(), _dump_completed_part());
LOG(WARNING) << st;
_st = st;
return st;
}
completed_upload.AddParts(*_completed_parts[i]);
}
completed_upload.AddParts(*_completed_parts[i]);
}

complete_request.WithMultipartUpload(completed_upload);
complete_request.WithMultipartUpload(completed_upload);

DBUG_EXECUTE_IF("s3_file_writer::_complete:3", {
auto s = Status::IOError(
"failed to create complete multi part upload (bucket={}, key={}): injected error",
_bucket, _path.native());
LOG_WARNING(s.to_string());
return s;
});
SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
auto complete_outcome = _client->CompleteMultipartUpload(complete_request);
DBUG_EXECUTE_IF("s3_file_writer::_complete:3", {
auto s = Status::IOError(
"failed to create complete multi part upload (bucket={}, key={}): injected "
"error",
_bucket, _path.native());
LOG_WARNING(s.to_string());
return s;
});
SCOPED_BVAR_LATENCY(s3_bvar::s3_multi_part_upload_latency);
auto complete_outcome = _client->CompleteMultipartUpload(complete_request);

if (!complete_outcome.IsSuccess()) {
_st = s3fs_error(
complete_outcome.GetError(),
fmt::format("failed to complete multi part upload {}, upload_id={}, whole parts={}",
if (!complete_outcome.IsSuccess()) {
_st = s3fs_error(
complete_outcome.GetError(),
fmt::format(
"failed to complete multi part upload {}, upload_id={}, whole parts={}",
_path.native(), _upload_id, _dump_completed_part()));
LOG(WARNING) << _st;
return _st;
LOG(WARNING) << _st;
return _st;
}
}
s3_file_created_total << 1;
return Status::OK();
Expand All @@ -457,12 +467,8 @@ Status S3FileWriter::finalize() {
// submit pending buf if it's not nullptr
// it's the last buf, we can submit it right now
if (_pending_buf != nullptr) {
// if we only need to upload one file less than 5MB, we can just
// call PutObject to reduce the network IO
if (_upload_id.empty()) {
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); });
RETURN_IF_ERROR(_set_upload_to_remote_less_than_buffer_size());
}
_countdown_event.add_count();
RETURN_IF_ERROR(_pending_buf->submit(std::move(_pending_buf)));
Expand All @@ -472,6 +478,24 @@ Status S3FileWriter::finalize() {
return _st;
}

Status S3FileWriter::_set_upload_to_remote_less_than_buffer_size() {
auto* buf = dynamic_cast<UploadFileBuffer*>(_pending_buf.get());
DCHECK(buf != nullptr);
if (_used_by_s3_committer) {
// If used_by_s3_committer, we always use multi-parts uploading.
buf->set_upload_to_remote([part_num = _cur_part_num, this](UploadFileBuffer& buf) {
_upload_one_part(part_num, buf);
});
DCHECK(_cur_part_num == 1);
RETURN_IF_ERROR(_create_multi_upload_request());
} else {
// if we only need to upload one file less than 5MB, we can just
// call PutObject to reduce the network IO
buf->set_upload_to_remote([this](UploadFileBuffer& b) { _put_object(b); });
}
return Status::OK();
}

void S3FileWriter::_put_object(UploadFileBuffer& buf) {
DCHECK(!closed());
Aws::S3::Model::PutObjectRequest request;
Expand Down
18 changes: 18 additions & 0 deletions be/src/io/fs/s3_file_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,21 @@ class S3FileWriter final : public FileWriter {
return _cache_builder == nullptr ? nullptr : _cache_builder.get();
}

const std::vector<std::unique_ptr<Aws::S3::Model::CompletedPart>>& completed_parts() const {
return _completed_parts;
}

const std::string& key() const { return _key; }
const std::string& bucket() const { return _bucket; }
const std::string& upload_id() const { return _upload_id; }

private:
Status _abort();
[[nodiscard]] std::string _dump_completed_part() const;
void _wait_until_finish(std::string_view task_name);
Status _complete();
Status _create_multi_upload_request();
Status _set_upload_to_remote_less_than_buffer_size();
void _put_object(UploadFileBuffer& buf);
void _upload_one_part(int64_t part_num, UploadFileBuffer& buf);

Expand Down Expand Up @@ -95,6 +104,15 @@ class S3FileWriter final : public FileWriter {
std::shared_ptr<FileBuffer> _pending_buf;
std::unique_ptr<FileCacheAllocatorBuilder>
_cache_builder; // nullptr if disable write file cache

// S3 committer will start multipart uploading all files on BE side,
// and then complete multipart upload these files on FE side.
// If you do not complete multi parts of a file, the file will not be visible.
// So in this way, the atomicity of a single file can be guaranteed. But it still cannot
// guarantee the atomicity of multiple files.
// Because hive committers have best-effort semantics,
// this shortens the inconsistent time window.
bool _used_by_s3_committer;
};

} // namespace io
Expand Down
24 changes: 22 additions & 2 deletions be/src/vec/sink/writer/vhive_partition_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,10 @@

#include "vhive_partition_writer.h"

#include <aws/s3/model/CompletedPart.h>

#include "io/file_factory.h"
#include "io/fs/s3_file_writer.h"
#include "runtime/runtime_state.h"
#include "vec/columns/column_map.h"
#include "vec/core/materialize_block.h"
Expand Down Expand Up @@ -55,7 +58,8 @@ Status VHivePartitionWriter::open(RuntimeState* state, RuntimeProfile* profile)
io::FileDescription file_description = {
.path = fmt::format("{}/{}", _write_info.write_path, _get_target_file_name())};
_fs = DORIS_TRY(FileFactory::create_fs(fs_properties, file_description));
RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer));
io::FileWriterOptions file_writer_options = {.used_by_s3_committer = true};
RETURN_IF_ERROR(_fs->create_file(file_description.path, &_file_writer, &file_writer_options));

std::vector<std::string> column_names;
column_names.reserve(_columns.size());
Expand Down Expand Up @@ -189,12 +193,28 @@ THivePartitionUpdate VHivePartitionWriter::_build_partition_update() {
hive_partition_update.__set_name(_partition_name);
hive_partition_update.__set_update_mode(_update_mode);
THiveLocationParams location;
location.__set_write_path(_write_info.write_path);
location.__set_write_path(_write_info.original_write_path);
location.__set_target_path(_write_info.target_path);
hive_partition_update.__set_location(location);
hive_partition_update.__set_file_names({_get_target_file_name()});
hive_partition_update.__set_row_count(_row_count);
hive_partition_update.__set_file_size(_input_size_in_bytes);

if (_write_info.file_type == TFileType::FILE_S3) {
doris::io::S3FileWriter* s3_mpu_file_writer =
dynamic_cast<doris::io::S3FileWriter*>(_file_writer.get());
TS3MPUPendingUpload s3_mpu_pending_upload;
s3_mpu_pending_upload.__set_bucket(s3_mpu_file_writer->bucket());
s3_mpu_pending_upload.__set_key(s3_mpu_file_writer->key());
s3_mpu_pending_upload.__set_upload_id(s3_mpu_file_writer->upload_id());

std::map<int, std::string> etags;
for (auto& completed_part : s3_mpu_file_writer->completed_parts()) {
etags.insert({completed_part->GetPartNumber(), completed_part->GetETag()});
}
s3_mpu_pending_upload.__set_etags(etags);
hive_partition_update.__set_s3_mpu_pending_uploads({s3_mpu_pending_upload});
}
return hive_partition_update;
}

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/sink/writer/vhive_partition_writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class VHivePartitionWriter {
public:
struct WriteInfo {
std::string write_path;
std::string original_write_path;
std::string target_path;
TFileType::type file_type;
};
Expand Down
31 changes: 20 additions & 11 deletions be/src/vec/sink/writer/vhive_table_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,26 +256,30 @@ std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer
if (existing_table == false) { // new table
update_mode = TUpdateMode::NEW;
if (_partition_columns_input_index.empty()) { // new unpartitioned table
write_info = {write_location.write_path, write_location.target_path,
write_location.file_type};
write_info = {write_location.write_path, write_location.original_write_path,
write_location.target_path, write_location.file_type};
} else { // a new partition in a new partitioned table
auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
auto original_write_path =
fmt::format("{}/{}", write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
write_info = {std::move(write_path), std::move(target_path),
write_location.file_type};
write_info = {std::move(write_path), std::move(original_write_path),
std::move(target_path), write_location.file_type};
}
} else { // a new partition in an existing partitioned table, or an existing unpartitioned table
if (_partition_columns_input_index.empty()) { // an existing unpartitioned table
update_mode =
!hive_table_sink.overwrite ? TUpdateMode::APPEND : TUpdateMode::OVERWRITE;
write_info = {write_location.write_path, write_location.target_path,
write_location.file_type};
write_info = {write_location.write_path, write_location.original_write_path,
write_location.target_path, write_location.file_type};
} else { // a new partition in an existing partitioned table
update_mode = TUpdateMode::NEW;
auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
auto original_write_path =
fmt::format("{}/{}", write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
write_info = {std::move(write_path), std::move(target_path),
write_location.file_type};
write_info = {std::move(write_path), std::move(original_write_path),
std::move(target_path), write_location.file_type};
}
// need to get schema from existing table ?
}
Expand All @@ -285,16 +289,21 @@ std::shared_ptr<VHivePartitionWriter> VHiveTableWriter::_create_partition_writer
if (!hive_table_sink.overwrite) {
update_mode = TUpdateMode::APPEND;
auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
auto original_write_path =
fmt::format("{}/{}", write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}", existing_partition->location.target_path);
write_info = {std::move(write_path), std::move(target_path),
existing_partition->location.file_type};
write_info = {std::move(write_path), std::move(original_write_path),
std::move(target_path), existing_partition->location.file_type};
file_format_type = existing_partition->file_format;
write_compress_type = hive_table_sink.compression_type;
} else {
update_mode = TUpdateMode::OVERWRITE;
auto write_path = fmt::format("{}/{}", write_location.write_path, partition_name);
auto original_write_path =
fmt::format("{}/{}", write_location.original_write_path, partition_name);
auto target_path = fmt::format("{}/{}", write_location.target_path, partition_name);
write_info = {std::move(write_path), std::move(target_path), write_location.file_type};
write_info = {std::move(write_path), std::move(original_write_path),
std::move(target_path), write_location.file_type};
file_format_type = hive_table_sink.file_format;
write_compress_type = hive_table_sink.compression_type;
// need to get schema from existing table ?
Expand Down
Loading