diff --git a/cpp/src/arrow/util/mutex.cc b/cpp/src/arrow/util/mutex.cc index 7456d7889d8e..f0b51a73e965 100644 --- a/cpp/src/arrow/util/mutex.cc +++ b/cpp/src/arrow/util/mutex.cc @@ -50,5 +50,31 @@ Mutex::Guard Mutex::Lock() { Mutex::Mutex() : impl_(new Impl, [](Impl* impl) { delete impl; }) {} +#ifndef _WIN32 +namespace { + +struct AfterForkState { + // A global instance that will also register the atfork handler when + // constructed. + static AfterForkState instance; + + // The mutex may be used at shutdown, so make it eternal. + // The leak (only in child processes) is a small price to pay for robustness. + Mutex* mutex = nullptr; + + private: + AfterForkState() { + pthread_atfork(/*prepare=*/nullptr, /*parent=*/nullptr, /*child=*/&AfterFork); + } + + static void AfterFork() { instance.mutex = new Mutex; } +}; + +AfterForkState AfterForkState::instance; +} // namespace + +Mutex* GlobalForkSafeMutex() { return AfterForkState::instance.mutex; } +#endif // _WIN32 + } // namespace util } // namespace arrow diff --git a/cpp/src/arrow/util/mutex.h b/cpp/src/arrow/util/mutex.h index f4fc64181fb1..168d710bd523 100644 --- a/cpp/src/arrow/util/mutex.h +++ b/cpp/src/arrow/util/mutex.h @@ -60,5 +60,27 @@ class ARROW_EXPORT Mutex { std::unique_ptr impl_; }; +#ifndef _WIN32 +/// Return a pointer to a process-wide, process-specific Mutex that can be used +/// at any point in a child process. NULL is returned when called in the parent. +/// +/// The rule is to first check that getpid() corresponds to the parent process pid +/// and, if not, call this function to lock any after-fork reinitialization code. +/// Like this: +/// +/// std::atomic pid{getpid()}; +/// ... +/// if (pid.load() != getpid()) { +/// // In child process +/// auto lock = GlobalForkSafeMutex()->Lock(); +/// if (pid.load() != getpid()) { +/// // Reinitialize internal structures after fork +/// ... +/// pid.store(getpid()); +ARROW_EXPORT +Mutex* GlobalForkSafeMutex(); +#endif + + } // namespace util } // namespace arrow diff --git a/cpp/src/arrow/util/thread_pool.cc b/cpp/src/arrow/util/thread_pool.cc index 37132fe1a9ce..a1387947e3a2 100644 --- a/cpp/src/arrow/util/thread_pool.cc +++ b/cpp/src/arrow/util/thread_pool.cc @@ -28,6 +28,7 @@ #include "arrow/util/io_util.h" #include "arrow/util/logging.h" +#include "arrow/util/mutex.h" namespace arrow { namespace internal { @@ -235,24 +236,28 @@ ThreadPool::~ThreadPool() { void ThreadPool::ProtectAgainstFork() { #ifndef _WIN32 pid_t current_pid = getpid(); - if (pid_ != current_pid) { - // Reinitialize internal state in child process after fork() - // Ideally we would use pthread_at_fork(), but that doesn't allow - // storing an argument, hence we'd need to maintain a list of all - // existing ThreadPools. - int capacity = state_->desired_capacity_; - - auto new_state = std::make_shared(); - new_state->please_shutdown_ = state_->please_shutdown_; - new_state->quick_shutdown_ = state_->quick_shutdown_; - - pid_ = current_pid; - sp_state_ = new_state; - state_ = sp_state_.get(); - - // Launch worker threads anew - if (!state_->please_shutdown_) { - ARROW_UNUSED(SetCapacity(capacity)); + if (pid_.load() != current_pid) { + // Reinitialize internal state in child process after fork(). + { + // Since after-fork reinitialization is triggered when one of the ThreadPool + // methods is called, it can be very well be called from multiple threads + // at once. Therefore, it needs to be guarded with a lock. + auto lock = util::GlobalForkSafeMutex()->Lock(); + + if (pid_.load() != current_pid) { + int capacity = state_->desired_capacity_; + + auto new_state = std::make_shared(); + new_state->please_shutdown_ = state_->please_shutdown_; + new_state->quick_shutdown_ = state_->quick_shutdown_; + + sp_state_ = new_state; + state_ = sp_state_.get(); + pid_ = current_pid; + + // Launch worker threads anew + ARROW_UNUSED(SetCapacity(capacity)); + } } } #endif diff --git a/cpp/src/arrow/util/thread_pool.h b/cpp/src/arrow/util/thread_pool.h index 4ed908d6f297..e3aec1b2edb6 100644 --- a/cpp/src/arrow/util/thread_pool.h +++ b/cpp/src/arrow/util/thread_pool.h @@ -373,7 +373,7 @@ class ARROW_EXPORT ThreadPool : public Executor { State* state_; bool shutdown_on_destroy_; #ifndef _WIN32 - pid_t pid_; + std::atomic pid_; #endif };