Skip to content
5 changes: 5 additions & 0 deletions src/chainparams.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ static Consensus::LLMQParams llmq_test = {
.signingActiveQuorumCount = 2, // just a few ones to allow easier testing

.keepOldConnections = 3,
.recoveryMembers = 3,
};

// this one is for devnets only
Expand All @@ -192,6 +193,7 @@ static Consensus::LLMQParams llmq_devnet = {
.signingActiveQuorumCount = 3, // just a few ones to allow easier testing

.keepOldConnections = 4,
.recoveryMembers = 6,
};

static Consensus::LLMQParams llmq50_60 = {
Expand All @@ -210,6 +212,7 @@ static Consensus::LLMQParams llmq50_60 = {
.signingActiveQuorumCount = 24, // a full day worth of LLMQs

.keepOldConnections = 25,
.recoveryMembers = 25,
};

static Consensus::LLMQParams llmq400_60 = {
Expand All @@ -228,6 +231,7 @@ static Consensus::LLMQParams llmq400_60 = {
.signingActiveQuorumCount = 4, // two days worth of LLMQs

.keepOldConnections = 5,
.recoveryMembers = 100,
};

// Used for deployment and min-proto-version signalling, so it needs a higher threshold
Expand All @@ -247,6 +251,7 @@ static Consensus::LLMQParams llmq400_85 = {
.signingActiveQuorumCount = 4, // four days worth of LLMQs

.keepOldConnections = 5,
.recoveryMembers = 100,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the number of .recoveryMembers chosen arbitrarily? I noticed the smaller quorums use a higher percent of the overall LLMQ size.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah...mostly arbitrary. I chose the numbers in an absolute way because there wouldn't be a good percentage that would work well on all LLMQ types.

};


Expand Down
3 changes: 3 additions & 0 deletions src/consensus/params.h
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@ struct LLMQParams {
// Used for inter-quorum communication. This is the number of quorums for which we should keep old connections. This
// should be at least one more then the active quorums set.
int keepOldConnections;

// How many members should we try to send all sigShares to before we give up.
int recoveryMembers;
};

/**
Expand Down
179 changes: 168 additions & 11 deletions src/llmq/quorums_signing_shares.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <init.h>
#include <net_processing.h>
#include <netmessagemaker.h>
#include <spork.h>
#include <validation.h>

#include <cxxtimer.hpp>
Expand Down Expand Up @@ -236,6 +237,23 @@ void CSigSharesManager::ProcessMessage(CNode* pfrom, const std::string& strComma
return;
}

if (sporkManager.IsSporkActive(SPORK_21_QUORUM_ALL_CONNECTED)) {
if (strCommand == NetMsgType::QSIGSHARE) {
std::vector<CSigShare> sigShares;
vRecv >> sigShares;

if (sigShares.size() > MAX_MSGS_SIG_SHARES) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- too many sigs in QSIGSHARE message. cnt=%d, max=%d, node=%d\n", __func__, sigShares.size(), MAX_MSGS_SIG_SHARES, pfrom->GetId());
BanNode(pfrom->GetId());
return;
}

for (auto& sigShare : sigShares) {
ProcessMessageSigShare(pfrom->GetId(), sigShare, connman);
}
}
}

if (strCommand == NetMsgType::QSIGSESANN) {
std::vector<CSigSesAnn> msgs;
vRecv >> msgs;
Expand Down Expand Up @@ -465,6 +483,57 @@ bool CSigSharesManager::ProcessMessageBatchedSigShares(CNode* pfrom, const CBatc
return true;
}

void CSigSharesManager::ProcessMessageSigShare(NodeId fromId, const CSigShare& sigShare, CConnman& connman)
{
auto quorum = quorumManager->GetQuorum(sigShare.llmqType, sigShare.quorumHash);
if (!quorum) {
return;
}
if (!CLLMQUtils::IsQuorumActive(sigShare.llmqType, quorum->qc.quorumHash)) {
// quorum is too old
return;
}
if (!quorum->IsMember(activeMasternodeInfo.proTxHash)) {
// we're not a member so we can't verify it (we actually shouldn't have received it)
return;
}
if (quorum->quorumVvec == nullptr) {
// TODO we should allow to ask other nodes for the quorum vvec if we missed it in the DKG
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- we don't have the quorum vvec for %s, no verification possible. node=%d\n", __func__,
quorum->qc.quorumHash.ToString(), fromId);
return;
}

if (sigShare.quorumMember >= quorum->members.size()) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember out of bounds\n", __func__);
BanNode(fromId);
return;
}
if (!quorum->qc.validMembers[sigShare.quorumMember]) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- quorumMember not valid\n", __func__);
BanNode(fromId);
return;
}

{
LOCK(cs);

if (sigShares.Has(sigShare.GetKey())) {
return;
}

if (quorumSigningManager->HasRecoveredSigForId((Consensus::LLMQType)sigShare.llmqType, sigShare.id)) {
return;
}

auto& nodeState = nodeStates[fromId];
nodeState.pendingIncomingSigShares.Add(sigShare.GetKey(), sigShare);
}

LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- signHash=%s, id=%s, msgHash=%s, member=%d, node=%d\n", __func__,
sigShare.GetSignHash().ToString(), sigShare.id.ToString(), sigShare.msgHash.ToString(), sigShare.quorumMember, fromId);
}

bool CSigSharesManager::PreVerifyBatchedSigShares(NodeId nodeId, const CSigSharesNodeState::SessionInfo& session, const CBatchedSigShares& batchedSigShares, bool& retBan)
{
retBan = false;
Expand Down Expand Up @@ -668,8 +737,10 @@ void CSigSharesManager::ProcessSigShare(NodeId nodeId, const CSigShare& sigShare

// prepare node set for direct-push in case this is our sig share
std::set<NodeId> quorumNodes;
if (sigShare.quorumMember == quorum->GetMemberIndex(activeMasternodeInfo.proTxHash)) {
quorumNodes = connman.GetMasternodeQuorumNodes((Consensus::LLMQType) sigShare.llmqType, sigShare.quorumHash);
if (!sporkManager.IsSporkActive(SPORK_21_QUORUM_ALL_CONNECTED)) {
if (sigShare.quorumMember == quorum->GetMemberIndex(activeMasternodeInfo.proTxHash)) {
quorumNodes = connman.GetMasternodeQuorumNodes((Consensus::LLMQType) sigShare.llmqType, sigShare.quorumHash);
}
}

if (quorumSigningManager->HasRecoveredSigForId(llmqType, sigShare.id)) {
Expand Down Expand Up @@ -780,6 +851,21 @@ void CSigSharesManager::TryRecoverSig(const CQuorumCPtr& quorum, const uint256&
quorumSigningManager->ProcessRecoveredSig(-1, rs, quorum, connman);
}

CDeterministicMNCPtr CSigSharesManager::SelectMemberForRecovery(const CQuorumCPtr& quorum, const uint256 &id, int attempt)
{
assert(attempt < quorum->members.size());

std::vector<std::pair<uint256, CDeterministicMNCPtr>> v;
v.reserve(quorum->members.size());
for (const auto& dmn : quorum->members) {
auto h = ::SerializeHash(std::make_pair(dmn->proTxHash, id));
v.emplace_back(h, dmn);
}
std::sort(v.begin(), v.end());

return v[attempt].second;
}

void CSigSharesManager::CollectSigSharesToRequest(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToRequest)
{
AssertLockHeld(cs);
Expand Down Expand Up @@ -928,6 +1014,43 @@ void CSigSharesManager::CollectSigSharesToSend(std::unordered_map<NodeId, std::u
}
}

void CSigSharesManager::CollectSigSharesToSend(std::unordered_map<NodeId, std::vector<CSigShare>>& sigSharesToSend, const std::vector<CNode*>& vNodes)
{
AssertLockHeld(cs);

std::unordered_map<uint256, CNode*> proTxToNode;
for (const auto& pnode : vNodes) {
if (pnode->verifiedProRegTxHash.IsNull()) {
continue;
}
proTxToNode.emplace(pnode->verifiedProRegTxHash, pnode);
}

auto curTime = GetTime();

for (auto& p : signedSessions) {
if (p.second.attempt > p.second.quorum->params.recoveryMembers) {
continue;
}

if (curTime >= p.second.nextAttemptTime) {
p.second.nextAttemptTime = curTime + SEND_FOR_RECOVERY_TIMEOUT;
auto dmn = SelectMemberForRecovery(p.second.quorum, p.second.sigShare.id, p.second.attempt);
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- sending to %s, signHash=%s\n", __func__,
dmn->proTxHash.ToString(), p.second.sigShare.GetSignHash().ToString());
p.second.attempt++;

auto it = proTxToNode.find(dmn->proTxHash);
if (it == proTxToNode.end()) {
continue;
}

auto& m = sigSharesToSend[it->second->GetId()];
m.emplace_back(p.second.sigShare);
}
}
}

void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>>& sigSharesToAnnounce)
{
AssertLockHeld(cs);
Expand Down Expand Up @@ -983,7 +1106,8 @@ void CSigSharesManager::CollectSigSharesToAnnounce(std::unordered_map<NodeId, st
bool CSigSharesManager::SendMessages()
{
std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>> sigSharesToRequest;
std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>> sigSharesToSend;
std::unordered_map<NodeId, std::unordered_map<uint256, CBatchedSigShares, StaticSaltedHasher>> sigShareBatchesToSend;
std::unordered_map<NodeId, std::vector<CSigShare>> sigSharesToSend;
std::unordered_map<NodeId, std::unordered_map<uint256, CSigSharesInv, StaticSaltedHasher>> sigSharesToAnnounce;
std::unordered_map<NodeId, std::vector<CSigSesAnn>> sigSessionAnnouncements;

Expand All @@ -1006,18 +1130,24 @@ bool CSigSharesManager::SendMessages()
return session->sendSessionId;
};

std::vector<CNode*> vNodesCopy = g_connman->CopyNodeVector(CConnman::FullyConnectedOnly);

{
LOCK(cs);
CollectSigSharesToRequest(sigSharesToRequest);
CollectSigSharesToSend(sigSharesToSend);
CollectSigSharesToAnnounce(sigSharesToAnnounce);
if (!sporkManager.IsSporkActive(SPORK_21_QUORUM_ALL_CONNECTED)) {
CollectSigSharesToRequest(sigSharesToRequest);
CollectSigSharesToSend(sigShareBatchesToSend);
CollectSigSharesToAnnounce(sigSharesToAnnounce);
} else {
CollectSigSharesToSend(sigSharesToSend, vNodesCopy);
}

for (auto& p : sigSharesToRequest) {
for (auto& p2 : p.second) {
p2.second.sessionId = addSigSesAnnIfNeeded(p.first, p2.first);
}
}
for (auto& p : sigSharesToSend) {
for (auto& p : sigShareBatchesToSend) {
for (auto& p2 : p.second) {
p2.second.sessionId = addSigSesAnnIfNeeded(p.first, p2.first);
}
Expand All @@ -1031,8 +1161,6 @@ bool CSigSharesManager::SendMessages()

bool didSend = false;

std::vector<CNode*> vNodesCopy = g_connman->CopyNodeVector(CConnman::FullyConnectedOnly);

for (auto& pnode : vNodesCopy) {
CNetMsgMaker msgMaker(pnode->GetSendVersion());

Expand Down Expand Up @@ -1076,8 +1204,8 @@ bool CSigSharesManager::SendMessages()
}
}

auto jt = sigSharesToSend.find(pnode->GetId());
if (jt != sigSharesToSend.end()) {
auto jt = sigShareBatchesToSend.find(pnode->GetId());
if (jt != sigShareBatchesToSend.end()) {
size_t totalSigsCount = 0;
std::vector<CBatchedSigShares> msgs;
for (auto& p : jt->second) {
Expand Down Expand Up @@ -1119,6 +1247,25 @@ bool CSigSharesManager::SendMessages()
didSend = true;
}
}

auto lt = sigSharesToSend.find(pnode->GetId());
if (lt != sigSharesToSend.end()) {
std::vector<CSigShare> msgs;
for (auto& sigShare : lt->second) {
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::SendMessages -- QSIGSHARE signHash=%s, node=%d\n",
sigShare.GetSignHash().ToString(), pnode->GetId());
msgs.emplace_back(std::move(sigShare));
if (msgs.size() == MAX_MSGS_SIG_SHARES) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs), false);
msgs.clear();
didSend = true;
}
}
if (!msgs.empty()) {
g_connman->PushMessage(pnode, msgMaker.Make(NetMsgType::QSIGSHARE, msgs), false);
didSend = true;
}
}
}

// looped through all nodes, release them
Expand Down Expand Up @@ -1285,6 +1432,7 @@ void CSigSharesManager::RemoveSigSharesForSession(const uint256& signHash)
sigSharesRequested.EraseAllForSignHash(signHash);
sigSharesToAnnounce.EraseAllForSignHash(signHash);
sigShares.EraseAllForSignHash(signHash);
signedSessions.erase(signHash);
timeSeenForSessions.erase(signHash);
}

Expand Down Expand Up @@ -1431,6 +1579,15 @@ void CSigSharesManager::Sign(const CQuorumCPtr& quorum, const uint256& id, const
LogPrint(BCLog::LLMQ_SIGS, "CSigSharesManager::%s -- signed sigShare. signHash=%s, id=%s, msgHash=%s, llmqType=%d, quorum=%s, time=%s\n", __func__,
signHash.ToString(), sigShare.id.ToString(), sigShare.msgHash.ToString(), quorum->params.type, quorum->qc.quorumHash.ToString(), t.count());
ProcessSigShare(-1, sigShare, *g_connman, quorum);

if (sporkManager.IsSporkActive(SPORK_21_QUORUM_ALL_CONNECTED)) {
LOCK(cs);
auto& session = signedSessions[sigShare.GetSignHash()];
session.sigShare = sigShare;
session.quorum = quorum;
session.nextAttemptTime = 0;
session.attempt = 0;
}
}

// causes all known sigShares to be re-announced
Expand Down
Loading