Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 27 additions & 8 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,11 +107,13 @@ class Executor
}
}

void spin_node_once(rclcpp::node::Node::SharedPtr & node, bool nonblocking = false)
template<typename T = std::milli>
void spin_node_once(rclcpp::node::Node::SharedPtr & node,
std::chrono::duration<int64_t, T> timeout = std::chrono::duration<int64_t, T>(-1))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We might want some typedefs for these durations. And may be a default value for -1.

{
this->add_node(node, false);
// non-blocking = true
auto any_exec = get_next_executable(nonblocking);
auto any_exec = get_next_executable(timeout);
if (any_exec) {
execute_any_executable(any_exec);
}
Expand All @@ -122,7 +124,9 @@ class Executor
{
this->add_node(node, false);
// non-blocking = true
while (AnyExecutable::SharedPtr any_exec = get_next_executable(true)) {
while (AnyExecutable::SharedPtr any_exec =
get_next_executable(std::chrono::milliseconds::zero()))
{
execute_any_executable(any_exec);
}
this->remove_node(node, false);
Expand Down Expand Up @@ -240,8 +244,9 @@ class Executor

/*** Populate class storage from stored weak node pointers and wait. ***/

template<typename T = std::milli>
void
wait_for_work(bool nonblocking)
wait_for_work(std::chrono::duration<int64_t, T> timeout = std::chrono::duration<int64_t, T>(-1))
{
// Collect the subscriptions and timers to be waited on
bool has_invalid_weak_nodes = false;
Expand Down Expand Up @@ -371,14 +376,26 @@ class Executor
guard_cond_handle_index += 1;
}

rmw_time_t * wait_timeout = NULL;
rmw_time_t rmw_timeout;

if (timeout >= std::chrono::duration<int64_t, T>::zero()) {
// Convert timeout representation to rmw_time
auto timeout_sec = std::chrono::duration_cast<std::chrono::seconds>(timeout);
rmw_timeout.sec = timeout_sec.count();
rmw_timeout.nsec = std::chrono::duration_cast<std::chrono::nanoseconds>(timeout).count() %
(1000 * 1000 * 1000);
wait_timeout = &rmw_timeout;
}

// Now wait on the waitable subscriptions and timers
rmw_ret_t status = rmw_wait(
&subscriber_handles,
&guard_condition_handles,
&service_handles,
&client_handles,
nonblocking);
if (status != RMW_RET_OK) {
wait_timeout);
if (status != RMW_RET_OK && status != RMW_RET_TIMEOUT) {
throw std::runtime_error(rmw_get_error_string_safe());
}
// If ctrl-c guard condition, return directly
Expand Down Expand Up @@ -824,16 +841,18 @@ class Executor
return any_exec;
}

template<typename T = std::milli>
AnyExecutable::SharedPtr
get_next_executable(bool nonblocking = false)
get_next_executable(std::chrono::duration<int64_t, T> timeout = std::chrono::duration<int64_t,
T>(-1))
{
// Check to see if there are any subscriptions or timers needing service
// TODO(wjwwood): improve run to run efficiency of this function
auto any_exec = get_next_ready_executable();
// If there are none
if (!any_exec) {
// Wait for subscriptions or timers to work on
wait_for_work(nonblocking);
wait_for_work(timeout);
// Try again
any_exec = get_next_ready_executable();
}
Expand Down