Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions src/message/BatchMessage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#include "BatchMessage.h"

#include "MQDecoder.h"
#include "StringIdMaker.h"

using namespace std;
namespace rocketmq {

std::string BatchMessage::encode(std::vector<MQMessage>& msgs) {
string encodedBody;
std::string encodedBody;
for (auto message : msgs) {
string unique_id = StringIdMaker::get_mutable_instance().get_unique_id();
std::string unique_id = StringIdMaker::getInstance().createUniqID();
message.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, unique_id);
encodedBody.append(encode(message));
}
Expand Down Expand Up @@ -59,4 +58,5 @@ std::string BatchMessage::encode(MQMessage& message) {
encodeMsg.append(properties.c_str(), propertiesLength);
return encodeMsg;
}
}

} // namespace rocketmq
2 changes: 1 addition & 1 deletion src/producer/DefaultMQProducer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -395,7 +395,7 @@ SendResult DefaultMQProducer::sendKernelImpl(MQMessage& msg,
bool isBatchMsg = std::type_index(typeid(msg)) == std::type_index(typeid(BatchMessage));
// msgId is produced by client, offsetMsgId produced by broker. (same with java sdk)
if (!isBatchMsg) {
string unique_id = StringIdMaker::get_mutable_instance().get_unique_id();
string unique_id = StringIdMaker::getInstance().createUniqID();
msg.setProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX, unique_id);
}

Expand Down
228 changes: 87 additions & 141 deletions src/producer/StringIdMaker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,175 +16,121 @@
*/
#include "StringIdMaker.h"

#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <ctime>

#include "ByteOrder.h"
#include "UtilAll.h"

namespace rocketmq {

#ifdef WIN32
int gettimeofdayWin(struct timeval* tp, void* tzp) {
time_t clock;
struct tm tm;
SYSTEMTIME wtm;
GetLocalTime(&wtm);
tm.tm_year = wtm.wYear - 1900;
tm.tm_mon = wtm.wMonth - 1;
tm.tm_mday = wtm.wDay;
tm.tm_hour = wtm.wHour;
tm.tm_min = wtm.wMinute;
tm.tm_sec = wtm.wSecond;
tm.tm_isdst = -1;
clock = mktime(&tm);
tp->tv_sec = clock;
tp->tv_usec = wtm.wMilliseconds * 1000;
return (0);
}
#endif
const char StringIdMaker::sHexAlphabet[16] = {'0', '1', '2', '3', '4', '5', '6', '7',
'8', '9', 'A', 'B', 'C', 'D', 'E', 'F'};

StringIdMaker::StringIdMaker() {
memset(_buff, 0, sizeof(_buff));
memset(_0x_buff, 0, sizeof(_0x_buff));
srand((uint32_t)time(NULL));
init_prefix();
}
StringIdMaker::~StringIdMaker() {}
std::srand((uint32_t)std::time(NULL));

void StringIdMaker::init_prefix() {
uint32_t pid = getpid();
uint32_t ip = get_ip();
uint32_t random_num = (rand() % 0xFFFF);
uint32_t pid = ByteOrder::swapIfLittleEndian(static_cast<uint32_t>(getpid()));
uint32_t ip = ByteOrder::swapIfLittleEndian(getIP());
uint32_t random_num = ByteOrder::swapIfLittleEndian(static_cast<uint32_t>(std::rand()));

memcpy(_buff + 2, &pid, 4);
memcpy(_buff, &ip, 4);
memcpy(_buff + 6, &random_num, 4);
unsigned char bin_buf[10];
std::memcpy(bin_buf + 2, &pid, 4);
std::memcpy(bin_buf, &ip, 4);
std::memcpy(bin_buf + 6, &random_num, 4);

hexdump(_buff, _0x_buff, 10);
hexdump(bin_buf, kFixString, 10);
kFixString[20] = '\0';

set_start_and_next_tm();
setStartTime(UtilAll::currentTimeMillis());

mCounter = 0;
}

uint32_t StringIdMaker::get_ip() {
char name[1024];
boost::system::error_code ec;
if (boost::asio::detail::socket_ops::gethostname(name, sizeof(name), ec) != 0) {
return 0;
}
StringIdMaker::~StringIdMaker() {}

boost::asio::io_service io_service;
boost::asio::ip::tcp::resolver resolver(io_service);
boost::asio::ip::tcp::resolver::query query(name, "");
boost::system::error_code error;
boost::asio::ip::tcp::resolver::iterator iter = resolver.resolve(query, error);
if (error) {
uint32_t StringIdMaker::getIP() {
std::string ip = UtilAll::getLocalAddress();
if (ip.empty()) {
return 0;
}
boost::asio::ip::tcp::resolver::iterator end; // End marker.
boost::asio::ip::tcp::endpoint ep;
while (iter != end) {
ep = *iter++;
}
std::string s_localIpAddress = ep.address().to_string();

int a[4];
std::string IP = s_localIpAddress;
std::string strTemp;
size_t pos;
size_t i = 3;

do {
pos = IP.find(".");

if (pos != std::string::npos) {
strTemp = IP.substr(0, pos);
a[i] = atoi(strTemp.c_str());
i--;
IP.erase(0, pos + 1);
} else {
strTemp = IP;
a[i] = atoi(strTemp.c_str());
break;
}

} while (1);

uint32_t nResult = (a[3] << 24) + (a[2] << 16) + (a[1] << 8) + a[0];
return nResult;
}

uint64_t StringIdMaker::get_curr_ms() {
struct timeval time_now;
// windows and linux use the same function name, windows's defination as begining this file
#ifdef WIN32
gettimeofdayWin(&time_now, NULL); // WIN32
#else
gettimeofday(&time_now, NULL); // LINUX
#endif

uint64_t ms_time = time_now.tv_sec * 1000 + time_now.tv_usec / 1000;
return ms_time;
}
char* ip_str = new char[ip.length() + 1];
std::strncpy(ip_str, ip.c_str(), ip.length());
ip_str[ip.length()] = '\0';

void StringIdMaker::set_start_and_next_tm() {
time_t tmNow = time(NULL);
tm* ptmNow = localtime(&tmNow);
tm mon_begin;
mon_begin.tm_year = ptmNow->tm_year;
mon_begin.tm_mon = ptmNow->tm_mon;
mon_begin.tm_mday = 0;
mon_begin.tm_hour = 0;
mon_begin.tm_min = 0;
mon_begin.tm_sec = 0;

tm mon_next_begin;
if (ptmNow->tm_mon == 12) {
mon_next_begin.tm_year = ptmNow->tm_year + 1;
mon_next_begin.tm_mon = 1;
} else {
mon_next_begin.tm_year = ptmNow->tm_year;
mon_next_begin.tm_mon = ptmNow->tm_mon + 1;
int i = 3;
uint32_t nResult = 0;
for (char* token = std::strtok(ip_str, "."); token != nullptr && i >= 0; token = std::strtok(nullptr, ".")) {
uint32_t n = std::atoi(token);
nResult |= n << (8 * i--);
}
mon_next_begin.tm_mday = 0;
mon_next_begin.tm_hour = 0;
mon_next_begin.tm_min = 0;
mon_next_begin.tm_sec = 0;

time_t mon_begin_tm = mktime(&mon_begin);
time_t mon_end_tm = mktime(&mon_next_begin);
delete[] ip_str;

_start_tm = mon_begin_tm * 1000;
_next_start_tm = mon_end_tm * 1000;
return nResult;
}

int StringIdMaker::atomic_incr(int id) {
#ifdef WIN32
InterlockedIncrement((LONG*)&id);
#else
__sync_add_and_fetch(&id, 1);
#endif
return id;
void StringIdMaker::setStartTime(uint64_t millis) {
// std::time_t
// Although not defined, this is almost always an integral value holding the number of seconds
// (not counting leap seconds) since 00:00, Jan 1 1970 UTC, corresponding to POSIX time.
std::time_t tmNow = millis / 1000;
std::tm* ptmNow = std::localtime(&tmNow); // may not be thread-safe

std::tm curMonthBegin = {0};
curMonthBegin.tm_year = ptmNow->tm_year; // since 1900
curMonthBegin.tm_mon = ptmNow->tm_mon; // [0, 11]
curMonthBegin.tm_mday = 1; // [1, 31]
curMonthBegin.tm_hour = 0; // [0, 23]
curMonthBegin.tm_min = 0; // [0, 59]
curMonthBegin.tm_sec = 0; // [0, 60]

std::tm nextMonthBegin = {0};
if (ptmNow->tm_mon >= 11) {
nextMonthBegin.tm_year = ptmNow->tm_year + 1;
nextMonthBegin.tm_mon = 0;
} else {
nextMonthBegin.tm_year = ptmNow->tm_year;
nextMonthBegin.tm_mon = ptmNow->tm_mon + 1;
}
nextMonthBegin.tm_mday = 1;
nextMonthBegin.tm_hour = 0;
nextMonthBegin.tm_min = 0;
nextMonthBegin.tm_sec = 0;

mStartTime = std::mktime(&curMonthBegin) * 1000;
mNextStartTime = std::mktime(&nextMonthBegin) * 1000;
}
std::string StringIdMaker::get_unique_id() {
uint64_t now_time = get_curr_ms();

if (now_time > _next_start_tm) {
set_start_and_next_tm();
std::string StringIdMaker::createUniqID() {
uint64_t current = UtilAll::currentTimeMillis();
if (current >= mNextStartTime) {
setStartTime(current);
current = UtilAll::currentTimeMillis();
}
uint32_t tm_period = now_time - _start_tm;
seqid = atomic_incr(seqid) & 0xFF;

std::size_t prifix_len = 10; // 10 = prefix len
unsigned char* write_index = _buff + prifix_len;
uint32_t period = ByteOrder::swapIfLittleEndian(static_cast<uint32_t>(current - mStartTime));
uint16_t seqid = ByteOrder::swapIfLittleEndian(mCounter++);

memcpy(write_index, &tm_period, 4);
write_index += 4;
unsigned char bin_buf[6];
std::memcpy(bin_buf, &period, 4);
std::memcpy(bin_buf + 4, &seqid, 2);

memcpy(write_index, &seqid, 2);
char hex_buf[12];
hexdump(bin_buf, hex_buf, 6);

hexdump(_buff + prifix_len, (_0x_buff + (2 * prifix_len)), 6);
_0x_buff[32] = '\0';
return std::string(_0x_buff);
return std::string(kFixString, 20) + std::string(hex_buf, 12);
}

void StringIdMaker::hexdump(unsigned char* buffer, char* out_buff, unsigned long index) {
for (unsigned long i = 0; i < index; i++) {
sprintf(out_buff + 2 * i, "%02X ", buffer[i]);
void StringIdMaker::hexdump(unsigned char* in, char* out, std::size_t len) {
for (std::size_t i = 0; i < len; i++) {
unsigned char v = in[i];
out[i * 2] = sHexAlphabet[v >> 4];
out[i * 2 + 1] = sHexAlphabet[v & 0x0FU];
}
}
}

} // namespace rocketmq
76 changes: 33 additions & 43 deletions src/producer/StringIdMaker.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,61 +14,51 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

/*
ip: 4
pid: 4
随机数 :2
时间:4
自增数:2
*/
#ifndef __STRINGID_MAKER_H__
#define __STRINGID_MAKER_H__

#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <boost/serialization/singleton.hpp>
#include <atomic>
#include <cstdint>
#include <string>
#include <boost/asio.hpp>

#ifdef WIN32
#include <windows.h>
#else
#include <unistd.h>
#include <sys/time.h>
#endif

#ifdef WIN32
#include <windows.h>
#else
#include <unistd.h>
#include <sys/time.h>
#endif

namespace rocketmq {
class StringIdMaker : public boost::serialization::singleton<StringIdMaker> {
public:

class StringIdMaker {
private:
StringIdMaker();
~StringIdMaker();
std::string get_unique_id();

public:
static StringIdMaker& getInstance() {
// After c++11, the initialization occurs exactly once
static StringIdMaker singleton_;
return singleton_;
}

/* ID format:
* ip: 4 bytes
* pid: 2 bytes
* random: 4 bytes
* time: 4 bytes
* auto num: 2 bytes
*/
std::string createUniqID();

private:
uint32_t get_ip();
void init_prefix();
uint64_t get_curr_ms();
int atomic_incr(int id);
void set_start_and_next_tm();
void setStartTime(uint64_t millis);

void hexdump(unsigned char* buffer, char* out_buff, unsigned long index);
static uint32_t getIP();
static void hexdump(unsigned char* buffer, char* out_buff, unsigned long index);

private:
uint64_t _start_tm;
uint64_t _next_start_tm;
unsigned char _buff[16];
char _0x_buff[33];
int16_t seqid;
uint64_t mStartTime;
uint64_t mNextStartTime;
std::atomic<uint16_t> mCounter;

char kFixString[21];

static const char sHexAlphabet[16];
};
}

} // namespace rocketmq
#endif
Loading