Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion include/DefaultMQPullConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public MQConsumer {
//<!end mqadmin;

//<!begin MQConsumer
virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel);
virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel, std::string& brokerName);
virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>& mqs);
virtual void doRebalance();
virtual void persistConsumerOffset();
Expand Down
2 changes: 1 addition & 1 deletion include/DefaultMQPushConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public MQConsumer {
//<!end mqadmin;

//<!begin MQConsumer
virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel);
virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel, std::string& brokerName);
virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>& mqs);
virtual void doRebalance();
virtual void persistConsumerOffset();
Expand Down
2 changes: 1 addition & 1 deletion include/MQConsumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class ConsumerRunningInfo;
class ROCKETMQCLIENT_API MQConsumer : public MQClient {
public:
virtual ~MQConsumer() {}
virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel) = 0;
virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel, std::string& brokerName) = 0;
virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>& mqs) = 0;
virtual void doRebalance() = 0;
virtual void persistConsumerOffset() = 0;
Expand Down
5 changes: 3 additions & 2 deletions src/MQClientAPIImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -823,7 +823,8 @@ void MQClientAPIImpl::updateConsumerOffsetOneway(const string& addr,
m_pRemotingClient->invokeOneway(addr, request);
}

void MQClientAPIImpl::consumerSendMessageBack(MQMessageExt& msg,
void MQClientAPIImpl::consumerSendMessageBack(const string addr,
MQMessageExt& msg,
const string& consumerGroup,
int delayLevel,
int timeoutMillis,
Expand All @@ -833,7 +834,7 @@ void MQClientAPIImpl::consumerSendMessageBack(MQMessageExt& msg,
pRequestHeader->offset = msg.getCommitLogOffset();
pRequestHeader->delayLevel = delayLevel;

string addr = socketAddress2IPPort(msg.getStoreHost());
// string addr = socketAddress2IPPort(msg.getStoreHost());
RemotingCommand request(CONSUMER_SEND_MSG_BACK, pRequestHeader);
callSignatureBeforeRequest(addr, request, sessionCredentials);
request.Encode();
Expand Down
3 changes: 2 additions & 1 deletion src/MQClientAPIImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ class MQClientAPIImpl {
int timeoutMillis,
const SessionCredentials& sessionCredentials);

void consumerSendMessageBack(MQMessageExt& msg,
void consumerSendMessageBack(const string addr,
MQMessageExt& msg,
const string& consumerGroup,
int delayLevel,
int timeoutMillis,
Expand Down
3 changes: 2 additions & 1 deletion src/common/MQClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ MQClient::~MQClient() {}
string MQClient::getMQClientId() const {
string clientIP = UtilAll::getLocalAddress();
string processId = UtilAll::to_string(getpid());
return processId + "-" + clientIP + "@" + m_instanceName;
// return processId + "-" + clientIP + "@" + m_instanceName;
return clientIP + "@" + processId + "#" + m_instanceName;
}

//<!groupName;
Expand Down
20 changes: 12 additions & 8 deletions src/consumer/ConsumeMessageConcurrentlyService.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,18 @@ void ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
case CLUSTERING: {
// send back msg to broker;
for (size_t i = ackIndex + 1; i < msgs.size(); i++) {
LOG_WARN("consume fail, MQ is:%s, its msgId is:%s, index is:" SIZET_FMT ", reconsume times is:%d",
(request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(), i,
msgs[i].getReconsumeTimes());
if (m_pConsumer->getConsumeType() == CONSUME_PASSIVELY && !m_pConsumer->sendMessageBack(msgs[i], 0)) {
LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index is:%d, re-consume times is:%d",
(request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(), i,
msgs[i].getReconsumeTimes());
localRetryMsgs.push_back(msgs[i]);
LOG_DEBUG("consume fail, MQ is:%s, its msgId is:%s, index is:" SIZET_FMT ", reconsume times is:%d",
(request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(), i,
msgs[i].getReconsumeTimes());
if (m_pConsumer->getConsumeType() == CONSUME_PASSIVELY) {
string brokerName = request->m_messageQueue.getBrokerName();
if (!m_pConsumer->sendMessageBack(msgs[i], 0, brokerName)) {
LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index is:%d, re-consume times is:%d",
(request->m_messageQueue).toString().c_str(), msgs[i].getMsgId().c_str(), i,
msgs[i].getReconsumeTimes());
msgs[i].setReconsumeTimes(msgs[i].getReconsumeTimes() + 1);
localRetryMsgs.push_back(msgs[i]);
}
}
}
break;
Expand Down
2 changes: 1 addition & 1 deletion src/consumer/DefaultMQPullConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ void DefaultMQPullConsumer::shutdown() {
}
}

bool DefaultMQPullConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {
bool DefaultMQPullConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel, string& brokerName) {
return true;
}

Expand Down
9 changes: 7 additions & 2 deletions src/consumer/DefaultMQPushConsumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,9 +251,14 @@ DefaultMQPushConsumer::~DefaultMQPushConsumer() {
m_subTopics.clear();
}

bool DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) {
bool DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel, string& brokerName) {
string brokerAddr;
if (!brokerName.empty())
brokerAddr = getFactory()->findBrokerAddressInPublish(brokerName);
else
brokerAddr = socketAddress2IPPort(msg.getStoreHost());
try {
getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(msg, getGroupName(), delayLevel, 3000,
getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, msg, getGroupName(), delayLevel, 3000,
getSessionCredentials());
} catch (MQException& e) {
LOG_ERROR(e.what());
Expand Down
2 changes: 1 addition & 1 deletion src/consumer/Rebalance.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ void Rebalance::doRebalance() {
std::stringstream ss;
ss << "Allocation result for [Consumer Group: " << m_pConsumer->getGroupName() << ", Topic: " << topic
<< ", Current Consumer ID: " << m_pConsumer->getMQClientId() << "] is changed.\n "
<< "Total Queue #: " << mqAll.size() << ", Total Consumer #: " << cidAll.size()
<< "Total Queue :#" << mqAll.size() << ", Total Consumer :#" << cidAll.size()
<< " Allocated Queues are: \n";

for (vector<MQMessageQueue>::size_type i = 0; i < allocateResult.size(); ++i) {
Expand Down