Skip to content
This repository was archived by the owner on Dec 31, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,9 @@ Advanced users may wish to process these publish acknowledgements manually to ac
AckHandler ackHandler = new AckHandler() { {
public void onAck(String ackedNuid, Exception err) {
if (err != null) {
log.error("Error publishing msg id %s: %s\n, ackedNuid, err.getMessage());
System.err.printf("Error publishing msg id %s: %s\n", ackedNuid, err.getMessage());
} else {
log.info("Received ack for msg id %s\n", ackedNuid);
System.out.printf("Received ack for msg id %s\n", ackedNuid);
}
}
}
Expand Down
15 changes: 0 additions & 15 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -104,21 +104,6 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.codehaus.groovy</groupId>
<artifactId>groovy-all</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
3 changes: 0 additions & 3 deletions src/main/java/io/nats/streaming/NatsStreaming.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,8 @@
package io.nats.streaming;

import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class NatsStreaming {
static final Logger logger = LoggerFactory.getLogger(NatsStreaming.class);
static final String DEFAULT_NATS_URL = io.nats.client.Nats.DEFAULT_URL;
static final int DEFAULT_CONNECT_WAIT = 2; // Seconds
static final String DEFAULT_DISCOVER_PREFIX = "_STAN.discover";
Expand Down
61 changes: 14 additions & 47 deletions src/main/java/io/nats/streaming/StreamingConnectionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,11 @@
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class StreamingConnectionImpl implements StreamingConnection, io.nats.client.MessageHandler {

static final String ERR_MANUAL_ACK = NatsStreaming.PFX + "cannot manually ack in auto-ack mode";

private static final Logger logger = LoggerFactory.getLogger(StreamingConnectionImpl.class);

private final ReadWriteLock mu = new ReentrantReadWriteLock();

private String clientId;
Expand Down Expand Up @@ -123,7 +119,7 @@ StreamingConnectionImpl connect() throws IOException, InterruptedException {
String discoverSubject = String.format("%s.%s", opts.getDiscoverPrefix(), clusterId);
ConnectRequest req = ConnectRequest.newBuilder().setClientID(clientId)
.setHeartbeatInbox(hbInbox).build();
// logger.trace("Sending ConnectRequest:\n{}", req.toString().trim());

byte[] bytes = req.toByteArray();
Message reply;
reply = nc.request(discoverSubject, bytes, opts.getConnectTimeout().toMillis());
Expand All @@ -136,7 +132,6 @@ StreamingConnectionImpl connect() throws IOException, InterruptedException {
// (StreamingConnectionImpl.SERVER_ERR_INVALID_CLIENT)
throw new IOException(cr.getError());
}
logger.trace("Received ConnectResponse:\n{}", cr);

// Capture cluster configuration endpoints to publish and
// subscribe/unsubscribe.
Expand Down Expand Up @@ -194,14 +189,12 @@ io.nats.client.Connection createNatsConnection() throws IOException {

@Override
public void close() throws IOException, InterruptedException {
logger.trace("In STAN close()");
io.nats.client.Connection nc;
this.lock();
try {
// Capture for NATS calls below
if (getNatsConnection() == null) {
// We are already closed
logger.debug("stan: NATS connection already closed");
return;
}

Expand All @@ -218,29 +211,25 @@ public void close() throws IOException, InterruptedException {
try {
getAckSubscription().unsubscribe();
} catch (Exception e) {
logger.debug(
"stan: error unsubscribing from acks ('{}')", e.getMessage(), e);
// ignore
}
}

if (getHbSubscription() != null) {
try {
getHbSubscription().unsubscribe();
} catch (Exception e) {
logger.debug("stan: error unsubscribing from heartbeats ('{}')",
e.getMessage(), e);
// ignore
}
}

CloseRequest req = CloseRequest.newBuilder().setClientID(clientId).build();
logger.trace("CLOSE request: [{}]", req);
byte[] bytes = req.toByteArray();
Message reply;
reply = nc.request(closeRequests, bytes, opts.getConnectTimeout().toMillis());
if (reply == null) {
throw new IOException(NatsStreaming.ERR_CLOSE_REQ_TIMEOUT);
}
logger.trace("CLOSE response: [{}]", reply);
if (reply.getData() != null) {
CloseResponse cr = CloseResponse.parseFrom(reply.getData());

Expand All @@ -253,7 +242,7 @@ public void close() throws IOException, InterruptedException {
try {
nc.close();
} catch (Exception ignore) {
logger.debug("NATS connection was null in close()");
// ignore
}
}
} // first finally
Expand Down Expand Up @@ -282,10 +271,8 @@ void processHeartBeat(Message msg) {
if (nc != null) {
try {
nc.publish(msg.getReplyTo(), null);
logger.debug("Sent heartbeat response");
} catch (IOException e) {
logger.warn("stan: error publishing heartbeat response: {}", e.getMessage());
logger.debug("Full stack trace:", e);
// ignore exception; nothing we can do.
}
}
}
Expand All @@ -306,9 +293,7 @@ public void publish(String subject, byte[] data) throws IOException, Interrupted
throw new IOException(err);
}
} catch (InterruptedException e) {
logger.debug("stan: publish interrupted");
logger.debug("Full stack trace:", e);
// Thread.currentThread().interrupt();
// TODO: ignore for now, but re-evaluatate this

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a way of tracking these TODOs? GitHub issues?

}
}

Expand Down Expand Up @@ -364,9 +349,8 @@ private String publish(String subject, byte[] data, AckHandler ah, BlockingQueue
try {
pac.put(PubAck.getDefaultInstance());
} catch (InterruptedException e) {
logger.warn("Publish operation interrupted", e);
// TODO: Reevaluate this.
// Eat this because you can't really do anything with it
// Thread.currentThread().interrupt();
}

try {
Expand Down Expand Up @@ -437,7 +421,6 @@ public Subscription subscribe(String subject, String queue, io.nats.streaming.Me
SubscriptionRequest sr = createSubscriptionRequest(sub);

Message reply;
// logger.trace("Sending SubscriptionRequest:\n{}", sr);
reply = nc.request(subRequests, sr.toByteArray(), 2L, TimeUnit.SECONDS);
if (reply == null) {
sub.inboxSub.unsubscribe();
Expand All @@ -451,7 +434,6 @@ public Subscription subscribe(String subject, String queue, io.nats.streaming.Me
sub.inboxSub.unsubscribe();
throw e;
}
// logger.trace("Received SubscriptionResponse:\n{}", response);
if (!response.getError().isEmpty()) {
sub.inboxSub.unsubscribe();
throw new IOException(response.getError());
Expand Down Expand Up @@ -507,10 +489,10 @@ void processAck(Message msg) {
Exception ex = null;
try {
pa = PubAck.parseFrom(msg.getData());
// logger.trace("Received PubAck:\n{}", pa);
} catch (InvalidProtocolBufferException e) {
logger.error("stan: error unmarshaling PubAck");
logger.debug("Full stack trace: ", e);
// If we are speaking to a server we don't understand, let the
// user know.
System.err.println("Protocol error: " + e.getStackTrace());
return;
}

Expand All @@ -530,7 +512,7 @@ void processAck(Message msg) {
try {
ackClosure.ch.put(ackError);
} catch (InterruptedException e) {
logger.debug("stan: processAck interrupted");
// ignore
}
}
}
Expand All @@ -542,9 +524,7 @@ public void run() {
try {
processAckTimeout(guid);
} catch (Exception e) {
// catch exception to prevent the timer to be closed
logger.error("stan: error encountered during processAckTimeout, will cancel this timer task", e);
// cancel this task
// catch exception to prevent the timer to be closed, but cancel this task
cancel();
}
}
Expand All @@ -558,8 +538,6 @@ void processAckTimeout(String guid) {
}
if (ackClosure.ah != null) {
ackClosure.ah.onAck(guid, new TimeoutException(NatsStreaming.ERR_TIMEOUT));
} else if (ackClosure.ch != null && !ackClosure.ch.offer(NatsStreaming.ERR_TIMEOUT)) {
logger.warn("stan: processAckTimeout unable to write timeout error to ack channel");
}
}

Expand Down Expand Up @@ -590,9 +568,7 @@ AckClosure removeAck(String guid) {
// remove from queue to unblock publish
pac.take();
} catch (InterruptedException e) {
logger.warn("stan: interrupted during removeAck for {}", guid);
logger.debug("Full stack trace:", e);
// Thread.currentThread().interrupt();
// TODO: Ignore, but re-evaluate this
}
}

Expand All @@ -616,14 +592,10 @@ void processMsg(io.nats.client.Message raw) {
io.nats.client.Connection nc;

try {
// logger.trace("In processMsg, msg = {}", raw);
MsgProto msgp = MsgProto.parseFrom(raw.getData());
// logger.trace("processMsg received MsgProto:\n{}", msgp);
stanMsg = createStanMessage(msgp);
} catch (InvalidProtocolBufferException e) {
logger.error("stan: error unmarshaling msg");
logger.debug("msg: {}", raw);
logger.debug("full stack trace:", e);
// TODO: Ignore, but re-evaluate this
}

// Lookup the subscription
Expand Down Expand Up @@ -669,15 +641,10 @@ void processMsg(io.nats.client.Message raw) {
Ack ack = Ack.newBuilder().setSubject(stanMsg.getSubject())
.setSequence(stanMsg.getSequence()).build();
try {
// logger.trace("processMsg publishing Ack for sequence: {}",
// stanMsg.getSequence());
nc.publish(ackSubject, ack.toByteArray());
// logger.trace("processMsg published Ack:\n{}", ack);
} catch (IOException e) {
// FIXME(dlc) - Async error handler? Retry?
// This really won't happen since the publish is executing in the NATS thread.
logger.error("Exception while publishing auto-ack: {}", e.getMessage());
logger.debug("Stack trace: ", e);
}
}
}
Expand Down
7 changes: 1 addition & 6 deletions src/main/java/io/nats/streaming/SubscriptionImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@
import java.io.IOException;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class SubscriptionImpl implements Subscription {
private static final Logger logger = LoggerFactory.getLogger(SubscriptionImpl.class);

static final long DEFAULT_ACK_WAIT = 30 * 1000;
static final int DEFAULT_MAX_IN_FLIGHT = 1024;
Expand Down Expand Up @@ -125,7 +122,6 @@ public void close(boolean unsubscribe) throws IOException {
inboxSub.unsubscribe();
} catch (Exception e) {
// Silently ignore this, we can't do anything about it
logger.debug("stan: exception unsubscribing from inbox ('{}')", e.getMessage());
}
inboxSub = null;
}
Expand Down Expand Up @@ -165,7 +161,7 @@ public void close(boolean unsubscribe) throws IOException {
bytes = usr.toByteArray();

io.nats.client.Message reply;
// logger.trace("Sending UnsubscribeRequest:\n{}", usr);

try {
reply = nc.request(reqSubject, bytes, sc.opts.connectTimeout.toMillis());
if (reply == null) {
Expand All @@ -180,7 +176,6 @@ public void close(boolean unsubscribe) throws IOException {
}

SubscriptionResponse response = SubscriptionResponse.parseFrom(reply.getData());
// logger.trace("Received Unsubscribe SubscriptionResponse:\n{}", response);
if (!response.getError().isEmpty()) {
throw new IOException(PFX + response.getError());
}
Expand Down
4 changes: 0 additions & 4 deletions src/main/java/io/nats/streaming/SubscriptionOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,12 @@
import java.time.Instant;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* A SubscriptionOptions object defines the configurable parameters of a STAN Subscription object.
*/
public class SubscriptionOptions {

static final Logger logger = LoggerFactory.getLogger(SubscriptionOptions.class);

// DurableName, if set will survive client restarts.
private final String durableName;
// Controls the number of messages the cluster will have inflight without an ACK.
Expand Down
16 changes: 3 additions & 13 deletions src/test/java/io/nats/streaming/ITConnectionTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,9 @@
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.ExpectedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Category(IntegrationTest.class)
public class ITConnectionTest {
private static final Logger logger = (Logger) LoggerFactory.getLogger(ITConnectionTest.class);

static final LogVerifier verifier = new LogVerifier();

private final ExecutorService service = Executors.newCachedThreadPool();

Expand Down Expand Up @@ -524,7 +519,6 @@ public void testSubscriptionStartPositionLast() throws Exception {
received.incrementAndGet();
assertEquals("Wrong message sequence received", toSend, msg.getSequence());
savedMsgs.add(msg);
logger.debug("msg={}", msg);
latch.countDown();
};

Expand All @@ -540,7 +534,7 @@ public void testSubscriptionStartPositionLast() throws Exception {
// Make sure we got our message
assertTrue("Did not receive our message", latch.await(5, TimeUnit.SECONDS));
if (received.get() != 1) {
logger.error("Should have received 1 message with sequence {}, "
System.err.printf("Should have received 1 message with sequence {}, "
+ "but got these {} messages:\n", toSend, savedMsgs.size());
for (Message savedMsg : savedMsgs) {
System.err.println(savedMsg);
Expand Down Expand Up @@ -1477,12 +1471,10 @@ public void testPubMultiQueueSubWithSlowSubscriberAndFlapping()
// Track received for each receiver
if (msg.getSubscription().equals(subs[0])) {
s1Received.incrementAndGet();
// logger.error("Sub1[{}]: {}\n", s1Received.get(), msg);
} else if (msg.getSubscription().equals(subs[1])) {
// Slow down this subscriber
sleep(50, TimeUnit.MILLISECONDS);
s2Received.incrementAndGet();
// logger.error("Sub2[{}]: {}\n", s2Received.get(), msg);
} else {
fail("Received message on unknown subscription");
}
Expand Down Expand Up @@ -1551,16 +1543,14 @@ public void testPubMultiQueueSubWithSlowSubscriber()
// Track received for each receiver
if (msg.getSubscription().equals(subs[0])) {
s1Received.incrementAndGet();
// logger.error("Sub1[{}]: {}\n", s1Received.get(), msg);
} else if (msg.getSubscription().equals(subs[1])) {
// Block this subscriber
try {
s2BlockedLatch.await();
} catch (InterruptedException e) {
logger.warn("Interrupted", e);
System.err.println("Interrupted:" + e);
}
s2Received.incrementAndGet();
// logger.error("Sub2[{}]: {}\n", s2Received.get(), msg);
} else {
fail("Received message on unknown subscription");
}
Expand Down Expand Up @@ -1838,7 +1828,7 @@ public void testNoDuplicatesOnSubscriberStart() throws Exception {
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
logger.warn("publish interrupted");
System.err.println("publish interrupted");
Thread.currentThread().interrupt();
}
}
Expand Down
Loading