diff --git a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/ClientConfiguration.java b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/ClientConfiguration.java index 1de128e63e..d30947a155 100644 --- a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/ClientConfiguration.java +++ b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/ClientConfiguration.java @@ -48,6 +48,8 @@ public final class ClientConfiguration private static final long MAX_SAS_TOKEN_EXPIRY_TIME_SECONDS = 10 * 365 * 24 * 60 * 60; //10 years + private static final long DEFAULT_MESSAGE_EXPIRATION_CHECK_PERIOD = 10000; + private boolean useWebsocket; @Getter @@ -91,6 +93,9 @@ public final class ClientConfiguration @Getter private String threadNamePrefix = null; + @Getter + private long messageExpiredCheckPeriod; + private boolean useIdentifiableThreadNames = true; private boolean logRoutineDisconnectsAsErrors = true; @@ -229,6 +234,7 @@ private void setClientOptionValues(ClientOptions clientOptions) this.threadNameSuffix = clientOptions != null ? clientOptions.getThreadNameSuffix() : null; this.useIdentifiableThreadNames = clientOptions == null || clientOptions.isUsingIdentifiableThreadNames(); this.logRoutineDisconnectsAsErrors = clientOptions == null || clientOptions.isLoggingRoutineDisconnectsAsErrors(); + this.messageExpiredCheckPeriod = clientOptions != null ? clientOptions.getMessageExpirationCheckPeriod() : DEFAULT_MESSAGE_EXPIRATION_CHECK_PERIOD; if (proxySettings != null) { diff --git a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/ClientOptions.java b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/ClientOptions.java index 22b0fbe0a5..2af8c80e46 100644 --- a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/ClientOptions.java +++ b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/ClientOptions.java @@ -181,6 +181,28 @@ public final class ClientOptions @Builder.Default private final boolean logRoutineDisconnectsAsErrors = true; + /** + * The period (in milliseconds) for how often the client will check queued and in-flight messages for expiry. + * + * Higher values mean that messages will be checked for expiry less often but this client will use less CPU. Higher + * values also mean that messages may not execute their callback with MESSAGE_EXPIRED close to the expected + * expiry time. + * + * Lower values mean that messages will be checked for expiry more often but this client will use more CPU. Lower + * values also mean that messages will execute their callback with MESSAGE_EXPIRED closer to the expected + * expiry time. + * + * By default, this value is 10 seconds. + * + * If set to 0, message expiry will never be checked. + * + * If this client will be used in a multiplexed connection, this value is ignored in favor of the same setting in + * {@link MultiplexingClientOptions}. + */ + @Getter + @Builder.Default + private final long messageExpirationCheckPeriod = 10000; + public boolean isUsingIdentifiableThreadNames() { // Using a manually written method here to override the name that Lombok would have given it diff --git a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/DeviceIO.java b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/DeviceIO.java index ee87d3d31e..a232559908 100644 --- a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/DeviceIO.java +++ b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/DeviceIO.java @@ -79,10 +79,23 @@ final class DeviceIO implements IotHubConnectionStatusChangeCallback int sendInterval, boolean useIdentifiableThreadNames, String threadNamePrefix, - String threadNameSuffix) + String threadNameSuffix, + long messageExpirationCheckPeriod) { this.state = IotHubConnectionStatus.DISCONNECTED; - this.transport = new IotHubTransport(hostName, protocol, sslContext, proxySettings, this, keepAliveInterval, sendInterval, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix); + this.transport = new IotHubTransport( + hostName, + protocol, + sslContext, + proxySettings, + this, + keepAliveInterval, + sendInterval, + useIdentifiableThreadNames, + threadNamePrefix, + threadNameSuffix, + messageExpirationCheckPeriod); + this.sendTask = new IotHubSendTask(this.transport, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix); this.receiveTask = new IotHubReceiveTask(this.transport, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix); this.reconnectTask = new IotHubReconnectTask(this.transport, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix); diff --git a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/MultiplexingClient.java b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/MultiplexingClient.java index 5f0b08d7d4..ddd9d9b90c 100644 --- a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/MultiplexingClient.java +++ b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/MultiplexingClient.java @@ -34,6 +34,7 @@ public class MultiplexingClient static final int DEFAULT_MAX_MESSAGES_TO_SEND_PER_THREAD = 10; private static final long DEFAULT_REGISTRATION_TIMEOUT_MILLISECONDS = 60 * 1000; // 1 minute private static final long DEFAULT_UNREGISTRATION_TIMEOUT_MILLISECONDS = 60 * 1000; // 1 minute + private static final long DEFAULT_MESSAGE_EXPIRATION_CHECK_PERIOD = 10000; // keys are deviceIds. Helps to optimize look ups later on which device Ids are already registered. private final Map multiplexedDeviceClients; @@ -104,6 +105,7 @@ public MultiplexingClient(String hostName, IotHubClientProtocol protocol, Multip String threadNamePrefix = options != null ? options.getThreadNamePrefix() : null; String threadNameSuffix = options != null ? options.getThreadNameSuffix() : null; boolean useIdentifiableThreadNames = options == null || options.isUsingIdentifiableThreadNames(); + long messageExpiredCheckPeriod = options != null ? options.getMessageExpirationCheckPeriod() : DEFAULT_MESSAGE_EXPIRATION_CHECK_PERIOD; if (sendPeriod < 0) { @@ -130,7 +132,18 @@ else if (receivePeriod == 0) //default builder value for this option, signals th // Optional settings from MultiplexingClientOptions SSLContext sslContext = options != null ? options.getSslContext() : null; - this.deviceIO = new DeviceIO(hostName, protocol, sslContext, proxySettings, keepAliveInterval, sendInterval, useIdentifiableThreadNames, threadNamePrefix, threadNameSuffix); + this.deviceIO = new DeviceIO( + hostName, + protocol, + sslContext, + proxySettings, + keepAliveInterval, + sendInterval, + useIdentifiableThreadNames, + threadNamePrefix, + threadNameSuffix, + messageExpiredCheckPeriod); + this.deviceIO.setMaxNumberOfMessagesSentPerSendThread(sendMessagesPerThread); this.deviceIO.setSendPeriodInMilliseconds(sendPeriod); this.deviceIO.setReceivePeriodInMilliseconds(receivePeriod); diff --git a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/MultiplexingClientOptions.java b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/MultiplexingClientOptions.java index ff5f449722..9469272a89 100644 --- a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/MultiplexingClientOptions.java +++ b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/MultiplexingClientOptions.java @@ -98,6 +98,25 @@ public final class MultiplexingClientOptions @Builder.Default private final boolean useIdentifiableThreadNames = true; + /** + * The period (in milliseconds) for how often the client will check queued and in-flight messages for expiry. + * + * Higher values mean that messages will be checked for expiry less often but this client will use less CPU. Higher + * values also mean that messages may not execute their callback with MESSAGE_EXPIRED close to the expected + * expiry time. + * + * Lower values mean that messages will be checked for expiry more often but this client will use more CPU. Lower + * values also mean that messages will execute their callback with MESSAGE_EXPIRED closer to the expected + * expiry time. + * + * By default, this value is 10 seconds. + * + * If set to 0, message expiry will never be checked. + */ + @Getter + @Builder.Default + private final long messageExpirationCheckPeriod = 10000; + public boolean isUsingIdentifiableThreadNames() { // Using a manually written method here to override the name that Lombok would have given it diff --git a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java index ab99e0820c..5c6ca241fb 100644 --- a/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java +++ b/iothub/device/iot-device-client/src/main/java/com/microsoft/azure/sdk/iot/device/transport/IotHubTransport.java @@ -79,6 +79,9 @@ public class IotHubTransport implements IotHubListener // should stop spawning send/receive threads when this layer is disconnected or disconnected retrying private final IotHubConnectionStatusChangeCallback deviceIOConnectionStatusChangeCallback; + // Lock on reading and writing on the waitingPackets queue + final private Object waitingPacketsLock = new Object(); + // Lock on reading and writing on the inProgressPackets map final private Object inProgressMessagesLock = new Object(); @@ -130,6 +133,10 @@ public class IotHubTransport implements IotHubListener private final Thread correlationCallbackCleanupThread = new Thread(() -> checkForOldMessages()); private static final int CORRELATION_CALLBACK_CLEANUP_PERIOD_MILLISECONDS = 60 * 60 * 1000; + // A job that runs periodically to remove any expired messages from the in progress and waiting queue + private final Thread expiredMessagesCleanupThread = new Thread(() -> checkForExpiredOutgoingMessages()); + private final long messageExpirationCheckPeriod; + /** * Constructor for an IotHubTransport object with default values * @@ -160,6 +167,7 @@ public IotHubTransport(ClientConfiguration defaultConfig, IotHubConnectionStatus this.useIdentifiableThreadNames = defaultConfig.isUsingIdentifiableThreadNames(); this.threadNamePrefix = defaultConfig.getThreadNamePrefix(); this.threadNameSuffix = defaultConfig.getThreadNameSuffix(); + this.messageExpirationCheckPeriod = defaultConfig.getMessageExpiredCheckPeriod(); } public IotHubTransport( @@ -172,7 +180,8 @@ public IotHubTransport( int sendInterval, boolean useIdentifiableThreadNames, String threadNamePrefix, - String threadNameSuffix) throws IllegalArgumentException + String threadNameSuffix, + long messageExpirationCheckPeriod) throws IllegalArgumentException { this.protocol = protocol; this.hostName = hostName; @@ -187,6 +196,7 @@ public IotHubTransport( this.useIdentifiableThreadNames = useIdentifiableThreadNames; this.threadNamePrefix = threadNamePrefix; this.threadNameSuffix = threadNameSuffix; + this.messageExpirationCheckPeriod = messageExpirationCheckPeriod; } public Semaphore getSendThreadSemaphore() @@ -763,35 +773,38 @@ public void sendMessages() int timeSlice = maxNumberOfMessagesToSendPerThread; - while (this.connectionStatus == IotHubConnectionStatus.CONNECTED && timeSlice-- > 0) + synchronized (this.waitingPacketsLock) { - IotHubTransportPacket packet = waitingPacketsQueue.poll(); - - if (packet != null) + while (this.connectionStatus == IotHubConnectionStatus.CONNECTED && timeSlice-- > 0) { - Message message = packet.getMessage(); - log.trace("Dequeued a message from waiting queue to be sent ({})", message); + IotHubTransportPacket packet = waitingPacketsQueue.poll(); - if (message != null && this.isMessageValid(packet)) + if (packet != null) { - sendPacket(packet); + Message message = packet.getMessage(); + log.trace("Dequeued a message from waiting queue to be sent ({})", message); - try + if (message != null && this.isMessageValid(packet)) { - String correlationId = message.getCorrelationId(); + sendPacket(packet); - if (!correlationId.isEmpty()) + try { - CorrelationCallbackContext callbackContext = correlationCallbacks.get(correlationId); - if (callbackContext != null && callbackContext.getCallback() != null) + String correlationId = message.getCorrelationId(); + + if (!correlationId.isEmpty()) { - callbackContext.getCallback().onRequestSent(message, callbackContext.getUserContext()); + CorrelationCallbackContext callbackContext = correlationCallbacks.get(correlationId); + if (callbackContext != null && callbackContext.getCallback() != null) + { + callbackContext.getCallback().onRequestSent(message, callbackContext.getUserContext()); + } } } - } - catch (Exception e) - { - log.warn("Exception thrown while calling the onRequestSent callback in sendMessages", e); + catch (Exception e) + { + log.warn("Exception thrown while calling the onRequestSent callback in sendMessages", e); + } } } } @@ -816,28 +829,31 @@ String getDeviceClientUniqueIdentifier() private void checkForExpiredMessages() { - //Check waiting packets, remove any that have expired. - IotHubTransportPacket packet = this.waitingPacketsQueue.poll(); - Queue packetsToAddBackIntoWaitingPacketsQueue = new LinkedBlockingQueue<>(); - while (packet != null) + synchronized (this.waitingPacketsQueue) { - if (packet.getMessage().isExpired()) + //Check waiting packets, remove any that have expired. + IotHubTransportPacket packet = this.waitingPacketsQueue.poll(); + Queue packetsToAddBackIntoWaitingPacketsQueue = new LinkedBlockingQueue<>(); + while (packet != null) { - packet.setStatus(IotHubStatusCode.MESSAGE_EXPIRED); - this.addToCallbackQueue(packet); - } - else - { - //message not expired, requeue it - packetsToAddBackIntoWaitingPacketsQueue.add(packet); + if (packet.getMessage().isExpired()) + { + packet.setStatus(IotHubStatusCode.MESSAGE_EXPIRED); + this.addToCallbackQueue(packet); + } + else + { + //message not expired, requeue it + packetsToAddBackIntoWaitingPacketsQueue.add(packet); + } + + packet = this.waitingPacketsQueue.poll(); } - packet = this.waitingPacketsQueue.poll(); + //Requeue all the non-expired messages. + this.waitingPacketsQueue.addAll(packetsToAddBackIntoWaitingPacketsQueue); } - //Requeue all the non-expired messages. - this.waitingPacketsQueue.addAll(packetsToAddBackIntoWaitingPacketsQueue); - //Check in progress messages synchronized (this.inProgressMessagesLock) { @@ -894,6 +910,26 @@ private void checkForOldMessages() } } + private void checkForExpiredOutgoingMessages() + { + try + { + Thread.currentThread().setName("azure-iot-sdk-IotHubMessageExpiryCheckerTask"); + while (true) + { + Thread.sleep(this.messageExpirationCheckPeriod); + checkForExpiredMessages(); + invokeCallbacks(); + } + + } + catch (InterruptedException e) + { + // The exception can be ignored since this thread is interrupted when the client is closing. + // Once interrupted, simply end this thread. + } + } + /** * Invokes the callbacks for all completed requests. */ @@ -1717,10 +1753,24 @@ else if (this.getDefaultConfig() != null { // Thread has already started. No need to report this exception } + + try + { + // 0 means that the user doesn't want to ever run this check + if (messageExpirationCheckPeriod != 0) + { + expiredMessagesCleanupThread.start(); + } + } + catch (IllegalThreadStateException e) + { + // Thread has already started. No need to report this exception + } } else if (newConnectionStatus == IotHubConnectionStatus.DISCONNECTED) { correlationCallbackCleanupThread.interrupt(); + expiredMessagesCleanupThread.interrupt(); } } } @@ -1766,20 +1816,8 @@ private void updateStatus(IotHubConnectionStatus newConnectionStatus, IotHubConn invokeConnectionStatusChangeCallback(newConnectionStatus, previousStatus, reason, throwable, deviceId); } - if (newConnectionStatus == IotHubConnectionStatus.CONNECTED) - { - try - { - correlationCallbackCleanupThread.start(); - } - catch (IllegalThreadStateException e) - { - // Thread has already started. No need to report this exception - } - } - else if (newConnectionStatus == IotHubConnectionStatus.DISCONNECTED) + if (newConnectionStatus == IotHubConnectionStatus.DISCONNECTED) { - correlationCallbackCleanupThread.interrupt(); finalizeMultiplexedDevicesMessages(deviceId); } } @@ -1913,28 +1951,6 @@ private boolean hasOperationTimedOut(long startTime) return (System.currentTimeMillis() - startTime) > this.getDefaultConfig().getOperationTimeout(); } - /** - * Returns if the provided packet has lasted longer than the device operation timeout - * - * @return true if the packet has been in the queues for longer than the device operation timeout and false otherwise - */ - private boolean hasOperationTimedOut(long startTime, String deviceId) - { - if (startTime == 0) - { - return false; - } - - ClientConfiguration config = this.getConfig(deviceId); - if (config == null) - { - log.debug("Operation has not timed out since the device it was associated with has been unregistered already."); - return false; - } - - return (System.currentTimeMillis() - startTime) > config.getOperationTimeout(); - } - /** * Adds the packet to the callback queue if the provided packet has a callback. The packet is ignored otherwise. *