Skip to content
64 changes: 54 additions & 10 deletions src/Jobs/MonitorRebalancesJob.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,21 @@ private async Task ReconcileAsync(Rebalance rebalance, CancellationToken ct)

if (payment == null)
{
// LND returned NotFound (or the call errored). Only act on the NotFound signal
// when our row is still in a non-terminal state: in that case LND never saw the
// payment, so the optimistic InFlight/Probing/Pending is wrong. For terminal rows
// we leave them alone — a transient track error shouldn't reopen a Failed row.
if (IsNonTerminal(rebalance.Status))
// LND returned NotFound (or the call errored). The right action depends on which
// non-terminal state the row is in:
//
// - Pending/Probing: the local ExecuteAsync persisted PaymentHashHex right after
// AddInvoice, but SendPaymentV2 doesn't fire until after the probe succeeds —
// so "no record in LND" is the EXPECTED state during this window. Flipping the
// row to Failed here would kill an in-progress probe. Leave it alone; the local
// execution is the source of truth until it dispatches SendPaymentV2.
//
// - InFlight: we believe SendPaymentV2 has been dispatched, so LND should know.
// If it doesn't, the payment never reached LND (process crashed mid-dispatch,
// or LND lost it). Flip to Failed so the row doesn't stay InFlight forever.
//
// - Terminal statuses: leave alone — a transient track error must not reopen them.
if (rebalance.Status == RebalanceStatus.InFlight)
{
var oldStatus = rebalance.Status;
rebalance.Status = RebalanceStatus.Failed;
Expand All @@ -147,6 +157,37 @@ await _auditService.LogSystemAsync(
NewStatus = rebalance.Status.ToString(),
});
}
else if (rebalance.Status is RebalanceStatus.Pending or RebalanceStatus.Probing)
{
var window = TimeSpan.FromHours(Constants.REBALANCE_RECONCILE_TERMINAL_WINDOW_HOURS);
if (rebalance.UpdateDatetime < DateTimeOffset.UtcNow - window)
{
_logger.LogWarning(
"Rebalance {RebalanceId} (status={Status}, hash={PaymentHashHex}) has no record in LND and is older than {WindowHours}h; flipping to Failed",
rebalance.Id, rebalance.Status, rebalance.PaymentHashHex, window.TotalHours);
var oldStatus = rebalance.Status;
rebalance.Status = RebalanceStatus.Failed;
_rebalanceRepository.Update(rebalance);
await _auditService.LogSystemAsync(
AuditActionType.RebalanceCompleted,
AuditEventType.Failure,
AuditObjectType.Rebalance,
rebalance.Id.ToString(),
new
{
Reason = "MonitorReconciliation",
Detail = "LND has no record of this payment hash and the row is older than the reconciliation window",
OldStatus = oldStatus.ToString(),
NewStatus = rebalance.Status.ToString(),
});
}
else
{
_logger.LogDebug(
"Rebalance {RebalanceId} not yet visible in LND (status={Status}, hash={PaymentHashHex}); local execution still owns it, leaving as-is",
rebalance.Id, rebalance.Status, rebalance.PaymentHashHex);
}
}
else
{
_logger.LogInformation(
Expand All @@ -157,6 +198,14 @@ await _auditService.LogSystemAsync(
return;
}

if (rebalance.Status is RebalanceStatus.Pending or RebalanceStatus.Probing)
{
_logger.LogInformation(
"Rebalance {RebalanceId} (status={Status}) has a payment record in LND (status={LndStatus}, hash={PaymentHashHex}); local execution still owns it, leaving as-is",
rebalance.Id, rebalance.Status, payment.Status, rebalance.PaymentHashHex);
return;
}

if (IsNonTerminalLndStatus(payment.Status))
{
// LND still in flight — emitted at info so an operator can spot rebalances stuck
Expand Down Expand Up @@ -206,11 +255,6 @@ await _auditService.LogSystemAsync(
});
}

private static bool IsNonTerminal(RebalanceStatus status) =>
status is RebalanceStatus.Pending
or RebalanceStatus.Probing
or RebalanceStatus.InFlight;

private static bool IsNonTerminalLndStatus(Payment.Types.PaymentStatus status) =>
status is Payment.Types.PaymentStatus.Initiated
or Payment.Types.PaymentStatus.InFlight;
Expand Down
37 changes: 37 additions & 0 deletions src/Services/RebalanceService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,43 @@ public async Task<Rebalance> ExecuteAsync(int rebalanceId, CancellationToken ct

try
{
// Avoid double paying a rebalance that has already succeeded.
if (!string.IsNullOrWhiteSpace(rebalance.PaymentHashHex))
{
byte[] paymentHash = Convert.FromHexString(rebalance.PaymentHashHex);
var oldPayment = await _lightningService.TrackPaymentV2Async(node, paymentHash, ct);
if (oldPayment != null)
{
if (oldPayment.Status is Payment.Types.PaymentStatus.Succeeded)
{
_logger.LogInformation(
"Rebalance {RebalanceId} already succeeded in LND (hash={PaymentHashHex}, feeSats={FeeSats}, ppm={Ppm})",
rebalance.Id, rebalance.PaymentHashHex, oldPayment.FeeMsat / 1_000L,
(long?)Math.Round((decimal)oldPayment.FeeMsat / rebalance.SatsAmount * 1_000_000m, MidpointRounding.AwayFromZero));
ApplyTerminalPayment(rebalance, oldPayment);
_rebalanceRepository.Update(rebalance);
await _auditService.LogAsync(AuditActionType.RebalanceCompleted, AuditEventType.Success,
AuditObjectType.Rebalance, rebalance.Id.ToString(),
new
{
rebalance.FeePaidSats,
rebalance.EffectivePpm,
ActualAmountSats = rebalance.SatsAmount,
rebalance.AttemptNumber,
});
return rebalance;
}
else if (oldPayment.Status is Payment.Types.PaymentStatus.InFlight or Payment.Types.PaymentStatus.Initiated)
{
_logger.LogInformation(
"Rebalance {RebalanceId} already in-flight in LND (hash={PaymentHashHex}, status={LndStatus})",
rebalance.Id, rebalance.PaymentHashHex, oldPayment.Status);
await ScheduleRetryIfEligibleAsync(rebalance);
return rebalance;
}
}
}

var memo = $"NG rebalance #{rebalance.Id} attempt {rebalance.AttemptNumber}";
var invoiceExpiry = ComputeInvoiceExpirySeconds(rebalance);
var invoice = await _lightningService.AddInvoiceAsync(node, rebalance.SatsAmount, memo, invoiceExpiry);
Expand Down
147 changes: 146 additions & 1 deletion test/NodeGuard.Tests/Jobs/MonitorRebalancesJobTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
using Microsoft.Extensions.Logging;
using NodeGuard.Data.Models;
using NodeGuard.Data.Repositories.Interfaces;
using NodeGuard.Helpers;
using NodeGuard.Services;
using Quartz;

Expand Down Expand Up @@ -50,7 +51,11 @@ public class MonitorRebalancesJobTests
PubKey = "030000000000000000000000000000000000000000000000000000000000000001",
};

private static Rebalance MakeRebalance(RebalanceStatus status, string? hashHex = "abcdef01", Node? node = null)
private static Rebalance MakeRebalance(
RebalanceStatus status,
string? hashHex = "abcdef01",
Node? node = null,
DateTimeOffset? updateDatetime = null)
=> new()
{
Id = 42,
Expand All @@ -61,6 +66,9 @@ private static Rebalance MakeRebalance(RebalanceStatus status, string? hashHex =
SatsAmount = 100_000,
RequestedAmountSats = 100_000,
MaxFeePct = 0.05,
// Default to "fresh" so tests don't accidentally trip the age-based stuck-row
// fallback in the Pending/Probing null branch.
UpdateDatetime = updateDatetime ?? DateTimeOffset.UtcNow,
};

[Fact]
Expand Down Expand Up @@ -149,6 +157,143 @@ public async Task Execute_InFlight_LndNotFound_RowMarkedFailed()
_rebalanceRepo.Verify(r => r.Update(reb), Times.Once);
}

[Fact]
public async Task Execute_Probing_LndNotFound_RowLeftAlone()
{
// ExecuteAsync persists PaymentHashHex right after AddInvoice but does not call
// SendPaymentV2 until the probe succeeds. While Status=Probing, "no record in LND" is
// the expected state — flipping to Failed here would kill an in-progress probe.
var reb = MakeRebalance(RebalanceStatus.Probing);
_rebalanceRepo.Setup(r => r.GetReconcilable(It.IsAny<TimeSpan>())).ReturnsAsync(new List<Rebalance> { reb });
_lightning.Setup(x => x.TrackPaymentV2Async(It.IsAny<Node>(), It.IsAny<byte[]>(), It.IsAny<CancellationToken>()))
.ReturnsAsync((Payment?)null);

await CreateJob().Execute(_ctx.Object);

reb.Status.Should().Be(RebalanceStatus.Probing);
_rebalanceRepo.Verify(r => r.Update(It.IsAny<Rebalance>()), Times.Never);
_audit.Verify(a => a.LogSystemAsync(
It.IsAny<AuditActionType>(), It.IsAny<AuditEventType>(),
It.IsAny<AuditObjectType>(), It.IsAny<string>(), It.IsAny<object>()),
Times.Never);
}

[Fact]
public async Task Execute_Pending_LndNotFound_RowLeftAlone()
{
// Same reasoning as Probing — the brief Pending+hash window inside ExecuteAsync
// between AddInvoice and the Status=Probing flip. SendPaymentV2 hasn't fired; LND
// can't possibly know about the hash yet.
var reb = MakeRebalance(RebalanceStatus.Pending);
_rebalanceRepo.Setup(r => r.GetReconcilable(It.IsAny<TimeSpan>())).ReturnsAsync(new List<Rebalance> { reb });
_lightning.Setup(x => x.TrackPaymentV2Async(It.IsAny<Node>(), It.IsAny<byte[]>(), It.IsAny<CancellationToken>()))
.ReturnsAsync((Payment?)null);

await CreateJob().Execute(_ctx.Object);

reb.Status.Should().Be(RebalanceStatus.Pending);
_rebalanceRepo.Verify(r => r.Update(It.IsAny<Rebalance>()), Times.Never);
_audit.Verify(a => a.LogSystemAsync(
It.IsAny<AuditActionType>(), It.IsAny<AuditEventType>(),
It.IsAny<AuditObjectType>(), It.IsAny<string>(), It.IsAny<object>()),
Times.Never);
}

[Fact]
public async Task Execute_Pending_LndNotFound_OlderThanWindow_FlipsToFailed()
{
// Safety net for a Pending row stuck past the reconciliation window — typically a
// crash mid-ExecuteAsync with no Quartz retry queued. The age-based fallback flips it
// to Failed so it doesn't sit non-terminal forever.
var stale = DateTimeOffset.UtcNow.AddHours(-(Constants.REBALANCE_RECONCILE_TERMINAL_WINDOW_HOURS + 1));
var reb = MakeRebalance(RebalanceStatus.Pending, updateDatetime: stale);
_rebalanceRepo.Setup(r => r.GetReconcilable(It.IsAny<TimeSpan>())).ReturnsAsync(new List<Rebalance> { reb });
_lightning.Setup(x => x.TrackPaymentV2Async(It.IsAny<Node>(), It.IsAny<byte[]>(), It.IsAny<CancellationToken>()))
.ReturnsAsync((Payment?)null);

await CreateJob().Execute(_ctx.Object);

reb.Status.Should().Be(RebalanceStatus.Failed);
_rebalanceRepo.Verify(r => r.Update(reb), Times.Once);
_audit.Verify(a => a.LogSystemAsync(
AuditActionType.RebalanceCompleted, AuditEventType.Failure,
AuditObjectType.Rebalance, reb.Id.ToString(), It.IsAny<object>()),
Times.Once);
}

[Fact]
public async Task Execute_Pending_LndFailedPayment_RowLeftAlone()
{
// The oscillation case: ScheduleRetryIfEligibleAsync left the row at Pending with the
// PRIOR attempt's hash. LND still has that prior payment record (Failed). The monitor
// must NOT write the prior failure onto a queued-for-retry row — local execution
// (ExecuteAsync's pre-flight) is the only thing allowed to touch it.
var reb = MakeRebalance(RebalanceStatus.Pending);
_rebalanceRepo.Setup(r => r.GetReconcilable(It.IsAny<TimeSpan>())).ReturnsAsync(new List<Rebalance> { reb });
_lightning.Setup(x => x.TrackPaymentV2Async(It.IsAny<Node>(), It.IsAny<byte[]>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new Payment
{
Status = Payment.Types.PaymentStatus.Failed,
FailureReason = PaymentFailureReason.FailureReasonNoRoute,
});

await CreateJob().Execute(_ctx.Object);

reb.Status.Should().Be(RebalanceStatus.Pending);
_rebalanceRepo.Verify(r => r.Update(It.IsAny<Rebalance>()), Times.Never);
_audit.Verify(a => a.LogSystemAsync(
It.IsAny<AuditActionType>(), It.IsAny<AuditEventType>(),
It.IsAny<AuditObjectType>(), It.IsAny<string>(), It.IsAny<object>()),
Times.Never);
}

[Fact]
public async Task Execute_Pending_LndSucceededPayment_RowLeftAlone()
{
// The lying-stream variant of the oscillation: prior attempt actually settled in LND
// but the local stream said Failed. The monitor still must NOT adopt that success
// here — the row is Pending (queued for retry), and ExecuteAsync's pre-flight in the
// retry path is the component responsible for adopting the prior success.
var reb = MakeRebalance(RebalanceStatus.Pending);
_rebalanceRepo.Setup(r => r.GetReconcilable(It.IsAny<TimeSpan>())).ReturnsAsync(new List<Rebalance> { reb });
_lightning.Setup(x => x.TrackPaymentV2Async(It.IsAny<Node>(), It.IsAny<byte[]>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new Payment
{
Status = Payment.Types.PaymentStatus.Succeeded,
FeeMsat = 1_000,
PaymentPreimage = "deadbeef",
});

await CreateJob().Execute(_ctx.Object);

reb.Status.Should().Be(RebalanceStatus.Pending);
_rebalanceRepo.Verify(r => r.Update(It.IsAny<Rebalance>()), Times.Never);
_audit.Verify(a => a.LogSystemAsync(
It.IsAny<AuditActionType>(), It.IsAny<AuditEventType>(),
It.IsAny<AuditObjectType>(), It.IsAny<string>(), It.IsAny<object>()),
Times.Never);
}

[Fact]
public async Task Execute_Probing_LndHasRecord_RowLeftAlone()
{
// While Status=Probing, the local ExecuteAsync owns the row — the monitor stays back
// regardless of what LND has on the hash. Covers the normal in-progress probe.
var reb = MakeRebalance(RebalanceStatus.Probing);
_rebalanceRepo.Setup(r => r.GetReconcilable(It.IsAny<TimeSpan>())).ReturnsAsync(new List<Rebalance> { reb });
_lightning.Setup(x => x.TrackPaymentV2Async(It.IsAny<Node>(), It.IsAny<byte[]>(), It.IsAny<CancellationToken>()))
.ReturnsAsync(new Payment { Status = Payment.Types.PaymentStatus.InFlight });

await CreateJob().Execute(_ctx.Object);

reb.Status.Should().Be(RebalanceStatus.Probing);
_rebalanceRepo.Verify(r => r.Update(It.IsAny<Rebalance>()), Times.Never);
_audit.Verify(a => a.LogSystemAsync(
It.IsAny<AuditActionType>(), It.IsAny<AuditEventType>(),
It.IsAny<AuditObjectType>(), It.IsAny<string>(), It.IsAny<object>()),
Times.Never);
}

[Fact]
public async Task Execute_RecentlyFailed_LndNotFound_RowLeftAlone()
{
Expand Down
Loading
Loading