Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 26 additions & 0 deletions cpp/src/arrow/util/mutex.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,31 @@ Mutex::Guard Mutex::Lock() {

Mutex::Mutex() : impl_(new Impl, [](Impl* impl) { delete impl; }) {}

#ifndef _WIN32
namespace {

struct AfterForkState {
// A global instance that will also register the atfork handler when
// constructed.
static AfterForkState instance;

// The mutex may be used at shutdown, so make it eternal.
// The leak (only in child processes) is a small price to pay for robustness.
Mutex* mutex = nullptr;

private:
AfterForkState() {
pthread_atfork(/*prepare=*/nullptr, /*parent=*/nullptr, /*child=*/&AfterFork);
}

static void AfterFork() { instance.mutex = new Mutex; }
};

AfterForkState AfterForkState::instance;
} // namespace

Mutex* GlobalForkSafeMutex() { return AfterForkState::instance.mutex; }
#endif // _WIN32

} // namespace util
} // namespace arrow
22 changes: 22 additions & 0 deletions cpp/src/arrow/util/mutex.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,5 +60,27 @@ class ARROW_EXPORT Mutex {
std::unique_ptr<Impl, void (*)(Impl*)> impl_;
};

#ifndef _WIN32
/// Return a pointer to a process-wide, process-specific Mutex that can be used
/// at any point in a child process. NULL is returned when called in the parent.
///
/// The rule is to first check that getpid() corresponds to the parent process pid
/// and, if not, call this function to lock any after-fork reinitialization code.
/// Like this:
///
/// std::atomic<pid_t> pid{getpid()};
/// ...
/// if (pid.load() != getpid()) {
/// // In child process
/// auto lock = GlobalForkSafeMutex()->Lock();
/// if (pid.load() != getpid()) {
/// // Reinitialize internal structures after fork
/// ...
/// pid.store(getpid());
ARROW_EXPORT
Mutex* GlobalForkSafeMutex();
#endif


} // namespace util
} // namespace arrow
41 changes: 23 additions & 18 deletions cpp/src/arrow/util/thread_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

#include "arrow/util/io_util.h"
#include "arrow/util/logging.h"
#include "arrow/util/mutex.h"

namespace arrow {
namespace internal {
Expand Down Expand Up @@ -235,24 +236,28 @@ ThreadPool::~ThreadPool() {
void ThreadPool::ProtectAgainstFork() {
#ifndef _WIN32
pid_t current_pid = getpid();
if (pid_ != current_pid) {
// Reinitialize internal state in child process after fork()
// Ideally we would use pthread_at_fork(), but that doesn't allow
// storing an argument, hence we'd need to maintain a list of all
// existing ThreadPools.
int capacity = state_->desired_capacity_;

auto new_state = std::make_shared<ThreadPool::State>();
new_state->please_shutdown_ = state_->please_shutdown_;
new_state->quick_shutdown_ = state_->quick_shutdown_;

pid_ = current_pid;
sp_state_ = new_state;
state_ = sp_state_.get();

// Launch worker threads anew
if (!state_->please_shutdown_) {
ARROW_UNUSED(SetCapacity(capacity));
if (pid_.load() != current_pid) {
// Reinitialize internal state in child process after fork().
{
// Since after-fork reinitialization is triggered when one of the ThreadPool
// methods is called, it can be very well be called from multiple threads
// at once. Therefore, it needs to be guarded with a lock.
auto lock = util::GlobalForkSafeMutex()->Lock();

if (pid_.load() != current_pid) {
int capacity = state_->desired_capacity_;

auto new_state = std::make_shared<ThreadPool::State>();
new_state->please_shutdown_ = state_->please_shutdown_;
new_state->quick_shutdown_ = state_->quick_shutdown_;

sp_state_ = new_state;
state_ = sp_state_.get();
pid_ = current_pid;

// Launch worker threads anew
ARROW_UNUSED(SetCapacity(capacity));
}
}
}
#endif
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/util/thread_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ class ARROW_EXPORT ThreadPool : public Executor {
State* state_;
bool shutdown_on_destroy_;
#ifndef _WIN32
pid_t pid_;
std::atomic<pid_t> pid_;
#endif
};

Expand Down