Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 26 additions & 4 deletions lib/ClientConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,16 @@ using namespace ASIO::ip;

namespace pulsar {

namespace {
static std::ostream& operator<<(std::ostream& os, const tcp::resolver::results_type& results) {
for (const auto& entry : results) {
const auto& ep = entry.endpoint();
os << ep.address().to_string() << ":" << ep.port() << " ";
}
return os;
}
} // anonymous namespace

using proto::BaseCommand;

static const uint32_t DefaultBufferSize = 64 * 1024;
Expand Down Expand Up @@ -486,7 +496,7 @@ void ClientConnection::handleTcpConnected(const ASIO_ERROR& err, const tcp::endp
handleHandshake(ASIO_SUCCESS);
}
} else {
LOG_ERROR(cnxString_ << "Failed to establish connection: " << err.message());
LOG_ERROR(cnxString_ << "Failed to establish connection to " << endpoint << ": " << err.message());
if (err == ASIO::error::operation_aborted) {
close();
} else {
Expand Down Expand Up @@ -603,16 +613,25 @@ void ClientConnection::handleResolve(ASIO_ERROR err, const tcp::resolver::result
return;
}

if (!results.empty()) {
LOG_DEBUG(cnxString_ << "Resolved " << results.size() << " endpoints");
for (const auto& entry : results) {
const auto& ep = entry.endpoint();
LOG_DEBUG(cnxString_ << " " << ep.address().to_string() << ":" << ep.port());
}
}

auto weakSelf = weak_from_this();
connectTimeoutTask_->setCallback([weakSelf](const PeriodicTask::ErrorCode& ec) {
connectTimeoutTask_->setCallback([weakSelf, results = tcp::resolver::results_type(results)](
const PeriodicTask::ErrorCode& ec) {
ClientConnectionPtr ptr = weakSelf.lock();
if (!ptr) {
// Connection was already destroyed
LOG_DEBUG("Connect timeout callback skipped: connection was already destroyed");
return;
}

if (ptr->state_ != Ready) {
LOG_ERROR(ptr->cnxString_ << "Connection was not established in "
LOG_ERROR(ptr->cnxString_ << "Connection to " << results << " was not established in "
<< ptr->connectTimeoutTask_->getPeriodMs() << " ms, close the socket");
PeriodicTask::ErrorCode err;
ptr->socket_->close(err);
Expand Down Expand Up @@ -1212,20 +1231,23 @@ Future<Result, ResponseData> ClientConnection::sendRequestWithId(const SharedBuf
void ClientConnection::handleRequestTimeout(const ASIO_ERROR& ec,
const PendingRequestData& pendingRequestData) {
if (!ec && !pendingRequestData.hasGotResponse->load()) {
LOG_WARN(cnxString_ << "Network request timeout to broker, remote: " << physicalAddress_);
pendingRequestData.promise.setFailed(ResultTimeout);
}
}

void ClientConnection::handleLookupTimeout(const ASIO_ERROR& ec,
const LookupRequestData& pendingRequestData) {
if (!ec) {
LOG_WARN(cnxString_ << "Lookup request timeout to broker, remote: " << physicalAddress_);
pendingRequestData.promise->setFailed(ResultTimeout);
}
}

void ClientConnection::handleGetLastMessageIdTimeout(const ASIO_ERROR& ec,
const ClientConnection::LastMessageIdRequestData& data) {
if (!ec) {
LOG_WARN(cnxString_ << "GetLastMessageId request timeout to broker, remote: " << physicalAddress_);
data.promise->setFailed(ResultTimeout);
}
}
Expand Down
8 changes: 8 additions & 0 deletions lib/ConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1104,6 +1104,14 @@ Result ConsumerImpl::receiveHelper(Message& msg, int timeout) {
if (state_ != Ready) {
return ResultAlreadyClosed;
}
auto cnx = getCnx().lock();
if (cnx) {
LOG_WARN(getName() << " Receive timeout after " << timeout << " ms, connection: "
<< cnx->cnxString() << ", queue size: " << incomingMessages_.size());
} else {
LOG_WARN(getName() << " Receive timeout after " << timeout
<< " ms, no connection, queue size: " << incomingMessages_.size());
}
return ResultTimeout;
}
}
Expand Down
9 changes: 9 additions & 0 deletions lib/MultiTopicsConsumerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <chrono>
#include <stdexcept>

#include "ClientConnection.h"
#include "ClientImpl.h"
#include "ConsumerImpl.h"
#include "ExecutorService.h"
Expand Down Expand Up @@ -600,6 +601,14 @@ Result MultiTopicsConsumerImpl::receive(Message& msg, int timeout) {
if (state_ != Ready) {
return ResultAlreadyClosed;
}
auto cnx = getCnx().lock();
if (cnx) {
LOG_WARN(getName() << " Receive timeout after " << timeout << " ms, connection: "
<< cnx->cnxString() << ", queue size: " << incomingMessages_.size());
} else {
LOG_WARN(getName() << " Receive timeout after " << timeout
<< " ms, no connection, queue size: " << incomingMessages_.size());
}
return ResultTimeout;
}
}
Expand Down
9 changes: 9 additions & 0 deletions lib/ProducerImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -868,6 +868,15 @@ void ProducerImpl::handleSendTimeout(const ASIO_ERROR& err) {
}

lock.unlock();
auto cnx = getCnx().lock();
if (cnx) {
LOG_WARN(getName() << "Send timeout due to queueing delay, connection: " << cnx->cnxString()
<< ", pending messages: " << pendingMessages.size()
<< ", queue size: " << pendingMessagesQueue_.size());
} else {
LOG_WARN(getName() << "Send timeout due to queueing delay, no connection, pending messages: "
<< pendingMessages.size() << ", queue size: " << pendingMessagesQueue_.size());
}
for (const auto& op : pendingMessages) {
op->complete(ResultTimeout, {});
}
Expand Down
14 changes: 11 additions & 3 deletions lib/UnAckedMessageTrackerEnabled.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <functional>

#include "ClientConnection.h"
#include "ClientImpl.h"
#include "ConsumerImplBase.h"
#include "ExecutorService.h"
Expand Down Expand Up @@ -57,9 +58,16 @@ void UnAckedMessageTrackerEnabled::timeoutHandlerHelper() {

std::set<MessageId> msgIdsToRedeliver;
if (!headPartition.empty()) {
LOG_INFO(consumerReference_.getName().c_str()
<< ": " << headPartition.size() << " Messages were not acked within "
<< timePartitions.size() * tickDurationInMs_ << " time");
auto cnx = consumerReference_.getCnx().lock();
if (cnx) {
LOG_WARN(consumerReference_.getName()
<< " Unacked messages timeout: " << headPartition.size() << " messages not acked within "
<< timeoutMs_ << " ms, connection: " << cnx->cnxString());
} else {
LOG_WARN(consumerReference_.getName()
<< " Unacked messages timeout: " << headPartition.size() << " messages not acked within "
<< timeoutMs_ << " ms, no connection");
}
for (auto it = headPartition.begin(); it != headPartition.end(); it++) {
msgIdsToRedeliver.insert(*it);
messageIdPartitionMap.erase(*it);
Expand Down
Loading