From 327c1eff4e2ed1b9a3f1a9223049df4719d5be03 Mon Sep 17 00:00:00 2001 From: Arthur Passos Date: Thu, 30 Apr 2026 09:28:08 -0300 Subject: [PATCH] remove lock inside the task export partition --- src/Core/Settings.cpp | 4 - src/Core/SettingsChangesHistory.cpp | 1 + ...portReplicatedMergeTreePartitionManifest.h | 4 - .../ExportPartFromPartitionExportTask.cpp | 75 --------------- .../ExportPartFromPartitionExportTask.h | 36 ------- .../ExportPartitionTaskScheduler.cpp | 96 +++++-------------- src/Storages/MergeTree/MergeTreeData.h | 1 - src/Storages/StorageReplicatedMergeTree.cpp | 2 - src/Storages/StorageReplicatedMergeTree.h | 1 - 9 files changed, 26 insertions(+), 194 deletions(-) delete mode 100644 src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp delete mode 100644 src/Storages/MergeTree/ExportPartFromPartitionExportTask.h diff --git a/src/Core/Settings.cpp b/src/Core/Settings.cpp index cc4328b8aaa9..92b225bc76e9 100644 --- a/src/Core/Settings.cpp +++ b/src/Core/Settings.cpp @@ -7542,10 +7542,6 @@ Throw an error if there are pending mutations when exporting a merge tree part. )", 0) \ DECLARE(Bool, export_merge_tree_part_throw_on_pending_patch_parts, true, R"( Throw an error if there are pending patch parts when exporting a merge tree part. -)", 0) \ - DECLARE(Bool, export_merge_tree_partition_lock_inside_the_task, false, R"( -Only lock a part when the task is already running. This might help with busy waiting where the scheduler locks a part, but the task ends in the pending list. -On the other hand, there is a chance once the task executes that part has already been locked by another replica and the task will simply early exit. )", 0) \ DECLARE(Bool, export_merge_tree_partition_system_table_prefer_remote_information, false, R"( Controls whether the system.replicated_partition_exports will prefer to query ZooKeeper to get the most up to date information or use the local information. diff --git a/src/Core/SettingsChangesHistory.cpp b/src/Core/SettingsChangesHistory.cpp index 245677b8a86d..10c33532fe1f 100644 --- a/src/Core/SettingsChangesHistory.cpp +++ b/src/Core/SettingsChangesHistory.cpp @@ -121,6 +121,7 @@ const VersionToSettingsChangesMap & getSettingsChangesHistory() // {"iceberg_metadata_staleness_ms", 0, 0, "New setting allowing using cached metadata version at READ operations to prevent fetching from remote catalog"}, {"export_merge_tree_partition_task_timeout_seconds", 0, 3600, "New setting to control the timeout for export partition tasks."}, {"export_merge_tree_partition_manifest_ttl", 180, 86400, "Reasonable default for real usage"}, + {"export_merge_tree_partition_lock_inside_the_task", false, false, "Obsolete. No-op."}, }); addSettingsChanges(settings_changes_history, "26.1", { diff --git a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h index f1c96b120a28..dd5ef9886ded 100644 --- a/src/Storages/ExportReplicatedMergeTreePartitionManifest.h +++ b/src/Storages/ExportReplicatedMergeTreePartitionManifest.h @@ -119,7 +119,6 @@ struct ExportReplicatedMergeTreePartitionManifest size_t max_rows_per_file; MergeTreePartExportManifest::FileAlreadyExistsPolicy file_already_exists_policy; String filename_pattern; - bool lock_inside_the_task; /// todo temporary bool write_full_path_in_iceberg_metadata = false; String iceberg_metadata_json; @@ -154,7 +153,6 @@ struct ExportReplicatedMergeTreePartitionManifest json.set("max_retries", max_retries); json.set("ttl_seconds", ttl_seconds); json.set("task_timeout_seconds", task_timeout_seconds); - json.set("lock_inside_the_task", lock_inside_the_task); json.set("write_full_path_in_iceberg_metadata", write_full_path_in_iceberg_metadata); std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM oss.exceptions(std::ios::failbit); @@ -208,8 +206,6 @@ struct ExportReplicatedMergeTreePartitionManifest /// what to do if it's not a valid value? } - manifest.lock_inside_the_task = json->getValue("lock_inside_the_task"); - manifest.write_full_path_in_iceberg_metadata = json->getValue("write_full_path_in_iceberg_metadata"); return manifest; diff --git a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp deleted file mode 100644 index 2a3343e73f13..000000000000 --- a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.cpp +++ /dev/null @@ -1,75 +0,0 @@ -#include -#include -#include - -namespace ProfileEvents -{ - extern const Event ExportPartitionZooKeeperRequests; - extern const Event ExportPartitionZooKeeperGetChildren; - extern const Event ExportPartitionZooKeeperCreate; -} -namespace DB -{ - -ExportPartFromPartitionExportTask::ExportPartFromPartitionExportTask( - StorageReplicatedMergeTree & storage_, - const std::string & key_, - const MergeTreePartExportManifest & manifest_) - : storage(storage_), - key(key_), - manifest(manifest_) -{ - export_part_task = std::make_shared(storage, manifest); -} - -bool ExportPartFromPartitionExportTask::executeStep() -{ - /// Runs on a MergeTreeBackgroundExecutor thread, so it does not inherit any component set by the scheduling task. - auto component_guard = Coordination::setCurrentComponent("ExportPartFromPartitionExportTask::executeStep"); - - const auto zk = storage.getZooKeeper(); - const auto part_name = manifest.data_part->name; - - LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Attempting to lock part: {}", part_name); - - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate); - if (Coordination::Error::ZOK == zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / part_name, storage.replica_name, zkutil::CreateMode::Ephemeral)) - { - LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Locked part: {}", part_name); - export_part_task->executeStep(); - return false; - } - - std::lock_guard inner_lock(storage.export_manifests_mutex); - storage.export_manifests.erase(manifest); - - LOG_INFO(storage.log, "ExportPartFromPartitionExportTask: Failed to lock part {}, skipping", part_name); - return false; -} - -void ExportPartFromPartitionExportTask::cancel() noexcept -{ - export_part_task->cancel(); -} - -void ExportPartFromPartitionExportTask::onCompleted() -{ - export_part_task->onCompleted(); -} - -StorageID ExportPartFromPartitionExportTask::getStorageID() const -{ - return export_part_task->getStorageID(); -} - -Priority ExportPartFromPartitionExportTask::getPriority() const -{ - return export_part_task->getPriority(); -} - -String ExportPartFromPartitionExportTask::getQueryId() const -{ - return export_part_task->getQueryId(); -} -} diff --git a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h b/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h deleted file mode 100644 index e170b22b470d..000000000000 --- a/src/Storages/MergeTree/ExportPartFromPartitionExportTask.h +++ /dev/null @@ -1,36 +0,0 @@ -#pragma once - -#include -#include -#include -#include - -namespace DB -{ - -/* - Decorator around the ExportPartTask to lock the part inside the task -*/ -class ExportPartFromPartitionExportTask : public IExecutableTask -{ -public: - explicit ExportPartFromPartitionExportTask( - StorageReplicatedMergeTree & storage_, - const std::string & key_, - const MergeTreePartExportManifest & manifest_); - bool executeStep() override; - void onCompleted() override; - StorageID getStorageID() const override; - Priority getPriority() const override; - String getQueryId() const override; - - void cancel() noexcept override; - -private: - StorageReplicatedMergeTree & storage; - std::string key; - MergeTreePartExportManifest manifest; - std::shared_ptr export_part_task; -}; - -} diff --git a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp index a77e2894b4a1..5ee5c42e3412 100644 --- a/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp +++ b/src/Storages/MergeTree/ExportPartitionTaskScheduler.cpp @@ -8,7 +8,6 @@ #include #include "Storages/MergeTree/ExportPartitionUtils.h" #include "Storages/MergeTree/MergeTreePartExportManifest.h" -#include "Storages/MergeTree/ExportPartFromPartitionExportTask.h" #include "Formats/FormatFactory.h" #include @@ -177,90 +176,45 @@ void ExportPartitionTaskScheduler::run() auto context = ExportPartitionUtils::getContextCopyWithTaskSettings(storage.getContext(), manifest); - /// todo arthur this code path does not perform all the validations a simple part export does because we are not calling exportPartToTable directly. - /// the schema and everything else has been validated when the export partition task was created, but nothing prevents the destination table from being - /// recreated with a new schema before the export task is scheduled. - if (manifest.lock_inside_the_task) + try { - LOG_INFO(storage.log, "ExportPartition scheduler task: Locking part export inside the task"); - std::lock_guard part_export_lock(storage.export_manifests_mutex); + LOG_INFO(storage.log, "ExportPartition scheduler task: Exporting part to table"); - MergeTreePartExportManifest part_export_manifest( - destination_storage, - part, + LOG_INFO(storage.log, "ExportPartition scheduler task: Attempting to lock part: {}", zk_part_name); + + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate); + if (Coordination::Error::ZOK != zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name, storage.replica_name, zkutil::CreateMode::Ephemeral)) + { + LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to lock part {}, skipping", zk_part_name); + continue; + } + + LOG_INFO(storage.log, "ExportPartition scheduler task: Locked part: {}", zk_part_name); + + storage.exportPartToTable( + part->name, + destination_storage_id, manifest.transaction_id, - manifest.query_id, - context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value, - context->getSettingsCopy(), - storage.getInMemoryMetadataPtr(), + context, manifest.iceberg_metadata_json, + /*allow_outdated_parts*/ true, [this, key, zk_part_name, manifest, destination_storage] (MergeTreePartExportManifest::CompletionCallbackResult result) { handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result); }); - part_export_manifest.task = std::make_shared(storage, key, part_export_manifest); - - /// todo arthur this might conflict with the standalone export part. what to do in this case? - if (!storage.export_manifests.emplace(part_export_manifest).second) - { - LOG_INFO(storage.log, "ExportPartition scheduler task: Part {} is already being exported, skipping", zk_part_name); - continue; - } - - if (!storage.background_moves_assignee.scheduleMoveTask(part_export_manifest.task)) - { - storage.export_manifests.erase(part_export_manifest); - LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to schedule export part task, skipping"); - return; - } - scheduled_exports_count++; } - else + catch (const Exception &) { - try - { - LOG_INFO(storage.log, "ExportPartition scheduler task: Exporting part to table"); - - LOG_INFO(storage.log, "ExportPartition scheduler task: Attempting to lock part: {}", zk_part_name); - - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperCreate); - if (Coordination::Error::ZOK != zk->tryCreate(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name, storage.replica_name, zkutil::CreateMode::Ephemeral)) - { - LOG_INFO(storage.log, "ExportPartition scheduler task: Failed to lock part {}, skipping", zk_part_name); - continue; - } - - LOG_INFO(storage.log, "ExportPartition scheduler task: Locked part: {}", zk_part_name); - - storage.exportPartToTable( - part->name, - destination_storage_id, - manifest.transaction_id, - context, - manifest.iceberg_metadata_json, - /*allow_outdated_parts*/ true, - [this, key, zk_part_name, manifest, destination_storage] - (MergeTreePartExportManifest::CompletionCallbackResult result) - { - handlePartExportCompletion(key, zk_part_name, manifest, destination_storage, result); - }); - - scheduled_exports_count++; - } - catch (const Exception &) - { - tryLogCurrentException(__PRETTY_FUNCTION__); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); - ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemove); - zk->tryRemove(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name); - /// we should not increment retry_count because the node might just be full - } + tryLogCurrentException(__PRETTY_FUNCTION__); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRequests); + ProfileEvents::increment(ProfileEvents::ExportPartitionZooKeeperRemove); + zk->tryRemove(fs::path(storage.zookeeper_path) / "exports" / key / "locks" / zk_part_name); + /// we should not increment retry_count because the node might just be full } - } } } diff --git a/src/Storages/MergeTree/MergeTreeData.h b/src/Storages/MergeTree/MergeTreeData.h index 4b923d1f26cc..fe776043c5ca 100644 --- a/src/Storages/MergeTree/MergeTreeData.h +++ b/src/Storages/MergeTree/MergeTreeData.h @@ -1433,7 +1433,6 @@ class MergeTreeData : public WithMutableContext, public IStorage, public IBackgr friend class IMergedBlockOutputStream; // for access to log friend struct DataPartsLock; // for access to shared_parts_list/shared_ranges_in_parts friend class ExportPartTask; - friend class ExportPartFromPartitionExportTask; bool require_part_metadata; diff --git a/src/Storages/StorageReplicatedMergeTree.cpp b/src/Storages/StorageReplicatedMergeTree.cpp index ee4e7811378a..2bb880994999 100644 --- a/src/Storages/StorageReplicatedMergeTree.cpp +++ b/src/Storages/StorageReplicatedMergeTree.cpp @@ -223,7 +223,6 @@ namespace Setting extern const SettingsUInt64 export_merge_tree_part_max_rows_per_file; extern const SettingsBool export_merge_tree_part_throw_on_pending_mutations; extern const SettingsBool export_merge_tree_part_throw_on_pending_patch_parts; - extern const SettingsBool export_merge_tree_partition_lock_inside_the_task; extern const SettingsString export_merge_tree_part_filename_pattern; extern const SettingsBool write_full_path_in_iceberg_metadata; extern const SettingsBool allow_insert_into_iceberg; @@ -8463,7 +8462,6 @@ void StorageReplicatedMergeTree::exportPartitionToTable(const PartitionCommand & manifest.parquet_parallel_encoding = query_context->getSettingsRef()[Setting::output_format_parquet_parallel_encoding]; manifest.max_bytes_per_file = query_context->getSettingsRef()[Setting::export_merge_tree_part_max_bytes_per_file]; manifest.max_rows_per_file = query_context->getSettingsRef()[Setting::export_merge_tree_part_max_rows_per_file]; - manifest.lock_inside_the_task = query_context->getSettingsRef()[Setting::export_merge_tree_partition_lock_inside_the_task]; manifest.file_already_exists_policy = query_context->getSettingsRef()[Setting::export_merge_tree_part_file_already_exists_policy].value; manifest.filename_pattern = query_context->getSettingsRef()[Setting::export_merge_tree_part_filename_pattern].value; diff --git a/src/Storages/StorageReplicatedMergeTree.h b/src/Storages/StorageReplicatedMergeTree.h index 094cfb8c5fd1..10381ffd899b 100644 --- a/src/Storages/StorageReplicatedMergeTree.h +++ b/src/Storages/StorageReplicatedMergeTree.h @@ -405,7 +405,6 @@ class StorageReplicatedMergeTree final : public MergeTreeData friend class ReplicatedMergeMutateTaskBase; friend class ExportPartitionManifestUpdatingTask; friend class ExportPartitionTaskScheduler; - friend class ExportPartFromPartitionExportTask; using MergeStrategyPicker = ReplicatedMergeTreeMergeStrategyPicker; using LogEntry = ReplicatedMergeTreeLogEntry;