[Feature] Support message trace feature#162
Conversation
…variable. declare correct string::size_type by auto. (apache#143)
* update network interface. - feature: use only one event loop for all TcpTransport. - update: network components. * remove boost mutex, timed_mutex and condition_variable in TcpRemotingClient, TcpTransport and ReponseFunture.
src/consumer/MQConsumer.cpp
Outdated
| #include "AsyncTraceDispatcher.h" | ||
| namespace rocketmq { | ||
|
|
||
| MQConsumer::MQConsumer(PullRequest* request) { |
There was a problem hiding this comment.
where the param request is used?
include/MQConsumer.h
Outdated
|
|
||
|
|
||
| private: | ||
| std::shared_ptr<TraceDispatcher> traceDispatcher; |
There was a problem hiding this comment.
rename to 'm_traceDispatcher'
include/MQConsumer.h
Outdated
| //DefaultMQProducerImpl defaultMQProducerImpl; | ||
|
|
||
|
|
||
| std::vector<std::shared_ptr<ConsumeMessageHook> > consumeMessageHookList; |
src/consumer/MQConsumer.cpp
Outdated
|
|
||
|
|
||
| MQConsumer::~MQConsumer() { | ||
| if (traceDispatcher.use_count()>0) { |
There was a problem hiding this comment.
don't use 'use_count() > 0' as condition, but 'traceDispatcher != nullptr'.
|
|
||
|
|
||
| #include "AsyncTraceDispatcher.h" | ||
| #include <boost/shared_ptr.hpp> |
There was a problem hiding this comment.
where use boost::shared_ptr or boost::weak_ptr ?
include/AsyncTraceDispatcher.h
Outdated
| @@ -0,0 +1,165 @@ | |||
| #ifndef __AsyncTraceDispatcher_H__ | |||
There was a problem hiding this comment.
add Apache License declaration.
include/AsyncTraceDispatcher.h
Outdated
| //DefaultMQProducer* traceProducer; | ||
| std::string TraceTopicName; | ||
| AsyncRunnable_run_context(bool stoppedv, int batchSizev, | ||
| //AsyncTraceDispatcher* atdv, |
There was a problem hiding this comment.
format all changed files with clang-format.
| public: | ||
| SendMessageTraceHookImpl(std::shared_ptr<TraceDispatcher>& localDispatcher); | ||
| virtual std::string hookName(); | ||
| //virtual void sendMessageBefore(SendMessageContext* context); |
There was a problem hiding this comment.
remove unnecessary functions.
|
|
||
| // Caculate the cost time for processing messages | ||
| int costTime = 0;//(int)((System.currentTimeMillis() - subBeforeContext.getTimeStamp()) / context.getMsgList().size()); | ||
| subAfterContext.setCostTime(costTime);// |
| */ | ||
|
|
||
| /* | ||
| package org.apache.rocketmq.client.trace.hook; |
There was a problem hiding this comment.
review recommends are read,checks will become,thanks.
remove unnecessary code.
include/AsyncTraceDispatcher.h
Outdated
| @@ -0,0 +1,118 @@ | |||
| #ifndef __AsyncTraceDispatcher_H__ | |||
There was a problem hiding this comment.
please use big alphabet for macro definition, the same for following files
AsyncTraceDispatcher_H change to ASYNCTRACEDISPATCHER_H
include/AsyncTraceDispatcher.h
Outdated
|
|
||
| class AsyncTraceDispatcher : public TraceDispatcher, public enable_shared_from_this<AsyncTraceDispatcher> { | ||
| private: | ||
| // static InternalLogger log; //= ClientLogger.getLog(); |
There was a problem hiding this comment.
delete the unnecessary code for new class
include/AsyncTraceDispatcher.h
Outdated
| std::condition_variable m_appenderQueuenotEmpty; | ||
|
|
||
| std::thread* m_shutDownHook; | ||
| bool m_stopped = false; |
There was a problem hiding this comment.
not assignment class variant here
include/AsyncTraceDispatcher.h
Outdated
| // std::thread* worker; | ||
| std::shared_ptr<std::thread> m_worker; | ||
|
|
||
| public: |
There was a problem hiding this comment.
why here is public class members ?
include/ConsumeMessageHook.h
Outdated
|
|
||
| class ConsumeMessageHook { | ||
| public: | ||
| virtual std::string hookName() { return ""; } |
There was a problem hiding this comment.
here format, please check if there is format problem for others
include/DefaultMQProducer.h
Outdated
| class ROCKETMQCLIENT_API DefaultMQProducer : public MQProducer { | ||
| public: | ||
| DefaultMQProducer(const std::string& groupname); | ||
| DefaultMQProducer(const std::string& groupname, bool Withouttrace = false, void* rpcHook = nullptr); |
There was a problem hiding this comment.
notice the name format for funtion variant, such as Withouttrace, please check others
include/TraceHelper.h
Outdated
| static std::string LOCAL_ADDRESS; //= "/*UtilAll.ipToIPv4Str(UtilAll.getIP())*/"; | ||
|
|
||
| private: | ||
| std::string m_topic = ""; |
src/message/MQMessage.cpp
Outdated
| const string MQMessage::PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET = "TRAN_PREPARED_QUEUE_OFFSET"; | ||
| const string MQMessage::PROPERTY_TRANSACTION_CHECK_TIMES = "TRANSACTION_CHECK_TIMES"; | ||
| const string MQMessage::PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS = "CHECK_IMMUNITY_TIME_IN_SECONDS"; | ||
| const string MQMessage::CONSUME_CONTEXT_TYPE = "ConsumeContextType"; |
There was a problem hiding this comment.
keep the same style for parameter name
There was a problem hiding this comment.
keep the same style for parameter name
updated,-:)
jovany-wang
left a comment
There was a problem hiding this comment.
Basically, I have 2 suggestions:
- Add doc comments for the methods.
- Show some usages.
| #include "TraceDispatcher.h" | ||
| namespace rocketmq { | ||
|
|
||
| class AsyncTraceDispatcher : public TraceDispatcher, public enable_shared_from_this<AsyncTraceDispatcher> { |
There was a problem hiding this comment.
public enable_shared_from_this<AsyncTraceDispatcher> seems can be private here right?
| std::atomic<long> m_discardCount; | ||
| std::shared_ptr<std::thread> m_worker; | ||
|
|
||
| // public: |
| std::atomic<bool> m_delydelflag; | ||
|
|
||
| public: | ||
| bool get_stopped() { return m_stopped; } |
There was a problem hiding this comment.
| bool get_stopped() { return m_stopped; } | |
| bool get_stopped() const { return m_stopped; } |
| virtual void start(std::string nameSrvAddr, AccessChannel accessChannel = AccessChannel::LOCAL); | ||
|
|
||
| virtual void setdelydelflag(bool v) { m_delydelflag = v; } | ||
| bool getdelydelflag() { return m_delydelflag; } |
There was a problem hiding this comment.
| bool getdelydelflag() { return m_delydelflag; } | |
| bool getdelydelflag() const { return m_delydelflag; } |
| std::string& getTraceTopicName() { return m_traceTopicName; } | ||
|
|
||
| void setTraceTopicName(std::string traceTopicNamev) { m_traceTopicName = traceTopicNamev; } | ||
| bool getisStarted() { return m_isStarted.load(); }; |
There was a problem hiding this comment.
| bool getisStarted() { return m_isStarted.load(); }; | |
| bool getisStarted() const { return m_isStarted.load(); }; |
|
|
||
| } // namespace rocketmq | ||
|
|
||
| #endif No newline at end of file |
There was a problem hiding this comment.
Please add new blank line at the end of this file.
| std::string msgnamespace; | ||
|
|
||
| public: | ||
| std::string getConsumerGroup() { return consumerGroup; }; |
There was a problem hiding this comment.
| std::string getConsumerGroup() { return consumerGroup; }; | |
| std::string getConsumerGroup() const { return consumerGroup; }; |
|
|
||
| void setConsumerGroup(std::string consumerGroup) { consumerGroup = consumerGroup; }; | ||
|
|
||
| std::list<MQMessageExt> getMsgList() { return msgList; }; |
There was a problem hiding this comment.
| std::list<MQMessageExt> getMsgList() { return msgList; }; | |
| std::list<MQMessageExt> getMsgList() const { return msgList; }; |
|
|
||
| void setMsgList(std::list<MQMessageExt> msgList) { msgList = msgList; }; | ||
| void setMsgList(std::vector<MQMessageExt> pmsgList) { msgList.assign(pmsgList.begin(), pmsgList.end()); }; | ||
| MQMessageQueue getMq() { return mq; }; |
There was a problem hiding this comment.
| MQMessageQueue getMq() { return mq; }; | |
| MQMessageQueue getMq() const { return mq; }; |
|
|
||
| class ConsumeMessageHook { | ||
| public: | ||
| virtual std::string hookName() { return ""; } |
There was a problem hiding this comment.
hookName -> getHookname?
|
thanks for your PR, |
What is the purpose of the change
[Feature] Support message trace feature
Brief changelog
add message trace feature for cpp sdk,add hook,dispatcher
Verifying this change
can send message trace message.
TBD:web console show.
Follow this checklist to help us incorporate your contribution quickly and easily. Notice,
it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR.[ISSUE #123] Fix UnknownException when host config not exist. Each commit in the pull request should have a meaningful subject line and body.