diff --git a/CHANGELOG.md b/CHANGELOG.md
index a915e02c..11a77ce2 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,27 @@
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).
+## 2.3.0 (UNRELEASED)
+#### New Features
+- [MultiThreaded Consumer](https://github.com/SourceLabOrg/kafka-webview/pull/170) Add multi-threaded kafka consumer.
+
+Previously a single consumer instance was used when paging through messages from a topic. Each partition was consumed sequentially in order to provide consistent results on each page. For topics with a large number of partitions this could take considerable time.
+
+The underlying consumer implementation has been replaced with a multi-threaded version which will attempt to read each partition in parallel. The following configuration properties have been added to control this behavior:
+
+```yml
+app:
+ ## Enable multi-threaded consumer support
+ ## The previous single-threaded implementation is still available by setting this property to false.
+ ## The previous implementation along with this property will be removed in future release.
+ multiThreadedConsumer: true
+
+ ## Sets upper limit on the number of concurrent consumers (non-websocket) supported.
+ maxConcurrentWebConsumers: 32
+```
+
+If you run into issues, you can disable the new implementation and revert to the previous behavior by setting the `multiThreadedConsumer` property to `false`.
+
## 2.2.0 (03/20/2019)
#### Bug fixes
diff --git a/README.md b/README.md
index eb6c7206..6ad03efc 100644
--- a/README.md
+++ b/README.md
@@ -67,6 +67,14 @@ app:
## Defines a prefix prepended to the Id of all consumers.
consumerIdPrefix: "KafkaWebViewConsumer"
+
+ ## Enable multi-threaded consumer support
+ ## The previous single-threaded implementation is still available by setting this property to false.
+ ## The previous implementation along with this property will be removed in future release.
+ multiThreadedConsumer: true
+
+ ## Sets upper limit on the number of concurrent consumers (non-websocket) supported.
+ maxConcurrentWebConsumers: 32
## Sets upper limit on the number of concurrent web socket consumers supported.
maxConcurrentWebSocketConsumers: 64
@@ -299,8 +307,8 @@ implementations.
# Releasing
Steps for performing a release:
-1. Update release version: mvn versions:set -DnewVersion=X.Y.Z
-2. Validate and then commit version: mvn versions:commit
+1. Update release version: `mvn versions:set -DnewVersion=X.Y.Z`
+2. Validate and then commit version: `mvn versions:commit`
3. Update CHANGELOG and README files.
4. Merge to master.
5. Deploy to Maven Central: mvn clean deploy -P release-kafka-webview
diff --git a/dev-cluster/pom.xml b/dev-cluster/pom.xml
index fb763b3f..0e851e61 100644
--- a/dev-cluster/pom.xml
+++ b/dev-cluster/pom.xml
@@ -5,12 +5,12 @@
kafka-webview
org.sourcelab
- 2.2.0
+ 2.3.0
4.0.0
dev-cluster
- 2.2.0
+ 2.3.0
diff --git a/kafka-webview-plugin/pom.xml b/kafka-webview-plugin/pom.xml
index 82b4ad29..b10f0b6d 100644
--- a/kafka-webview-plugin/pom.xml
+++ b/kafka-webview-plugin/pom.xml
@@ -5,7 +5,7 @@
org.sourcelab
kafka-webview
- 2.2.0
+ 2.3.0
4.0.0
kafka-webview-plugin
diff --git a/kafka-webview-ui/pom.xml b/kafka-webview-ui/pom.xml
index b7bbc0bd..6a305b20 100644
--- a/kafka-webview-ui/pom.xml
+++ b/kafka-webview-ui/pom.xml
@@ -5,11 +5,11 @@
kafka-webview
org.sourcelab
- 2.2.0
+ 2.3.0
4.0.0
kafka-webview-ui
- 2.2.0
+ 2.3.0
Kafka WebView UI
diff --git a/kafka-webview-ui/src/assembly/distribution/config.yml b/kafka-webview-ui/src/assembly/distribution/config.yml
index 0ff2baa3..ce7cd4f5 100644
--- a/kafka-webview-ui/src/assembly/distribution/config.yml
+++ b/kafka-webview-ui/src/assembly/distribution/config.yml
@@ -16,6 +16,14 @@ app:
## Defines a prefix prepended to the Id of all consumers.
consumerIdPrefix: "KafkaWebViewConsumer"
+ ## Enable multi-threaded consumer support
+ ## The previous single-threaded implementation is still available by setting this property to false.
+ ## The previous implementation along with this property will be removed in future release.
+ multiThreadedConsumer: true
+
+ ## Sets upper limit on the number of concurrent consumers (non-websocket) supported.
+ maxConcurrentWebConsumers: 32
+
## Sets upper limit on the number of concurrent web socket consumers supported.
maxConcurrentWebSocketConsumers: 64
diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/configuration/AppProperties.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/configuration/AppProperties.java
index 57c78244..0fb2b34a 100644
--- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/configuration/AppProperties.java
+++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/configuration/AppProperties.java
@@ -43,7 +43,13 @@ public class AppProperties {
@Value("${app.key}")
private String appKey;
- @Value("${app.maxConcurrentWebSocketConsumers}")
+ @Value("${app.multiThreadedConsumer:true}")
+ private boolean enableMultiThreadedConsumer;
+
+ @Value("${app.maxConcurrentWebConsumers:32}")
+ private Integer maxConcurrentWebConsumers = 32;
+
+ @Value("${app.maxConcurrentWebSocketConsumers:100}")
private Integer maxConcurrentWebSocketConsumers = 100;
@Value("${app.consumerIdPrefix}")
@@ -104,6 +110,14 @@ public boolean isAvroIncludeSchema() {
return avroIncludeSchema;
}
+ public boolean isEnableMultiThreadedConsumer() {
+ return enableMultiThreadedConsumer;
+ }
+
+ public Integer getMaxConcurrentWebConsumers() {
+ return maxConcurrentWebConsumers;
+ }
+
@Override
public String toString() {
return "AppProperties{"
diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/configuration/PluginConfig.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/configuration/PluginConfig.java
index 6c014ae7..2c4cdf48 100644
--- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/configuration/PluginConfig.java
+++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/configuration/PluginConfig.java
@@ -25,8 +25,11 @@
package org.sourcelab.kafka.webview.ui.configuration;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.hubspot.jackson.datatype.protobuf.ProtobufModule;
import org.apache.kafka.common.serialization.Deserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.sourcelab.kafka.webview.ui.manager.encryption.SecretManager;
import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaAdminFactory;
import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaClientConfigUtil;
@@ -42,11 +45,15 @@
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
/**
* Application Configuration for Plugin beans.
*/
@Component
public class PluginConfig {
+ private static final Logger logger = LoggerFactory.getLogger(PluginConfig.class);
/**
* Upload manager, for handling uploads of Plugins and Keystores.
@@ -97,11 +104,30 @@ public SecretManager getSecretManager(final AppProperties appProperties) {
*/
@Bean
public WebKafkaConsumerFactory getWebKafkaConsumerFactory(final AppProperties appProperties, final KafkaClientConfigUtil configUtil) {
+ final ExecutorService executorService;
+
+ // If we have multi-threaded consumer option enabled
+ if (appProperties.isEnableMultiThreadedConsumer()) {
+ logger.info("Enabled multi-threaded webconsumer with {} threads.", appProperties.getMaxConcurrentWebConsumers());
+
+ // Create fixed thread pool
+ executorService = Executors.newFixedThreadPool(
+ appProperties.getMaxConcurrentWebConsumers(),
+ new ThreadFactoryBuilder()
+ .setNameFormat("kafka-web-consumer-pool-%d")
+ .build()
+ );
+ } else {
+ // Null reference.
+ executorService = null;
+ }
+
return new WebKafkaConsumerFactory(
getDeserializerPluginFactory(appProperties),
getRecordFilterPluginFactory(appProperties),
getSecretManager(appProperties),
- getKafkaConsumerFactory(configUtil)
+ getKafkaConsumerFactory(configUtil),
+ executorService
);
}
diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/DefaultWebKafkaConsumer.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/DefaultWebKafkaConsumer.java
new file mode 100644
index 00000000..62696c97
--- /dev/null
+++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/DefaultWebKafkaConsumer.java
@@ -0,0 +1,348 @@
+/**
+ * MIT License
+ *
+ * Copyright (c) 2017, 2018, 2019 SourceLab.org (https://github.com/SourceLabOrg/kafka-webview/)
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.sourcelab.kafka.webview.ui.manager.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.sourcelab.kafka.webview.ui.manager.kafka.config.ClientConfig;
+import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerState;
+import org.sourcelab.kafka.webview.ui.manager.kafka.dto.KafkaResult;
+import org.sourcelab.kafka.webview.ui.manager.kafka.dto.KafkaResults;
+import org.sourcelab.kafka.webview.ui.manager.kafka.dto.PartitionOffset;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Wrapper around KafkaConsumer. This instance is intended to be short lived and only live
+ * during the life-time of a single web request.
+ *
+ * @deprecated Soon to be replaced by {@link ParallelWebKafkaConsumer} for its better performance.
+ */
+@Deprecated
+public class DefaultWebKafkaConsumer implements WebKafkaConsumer {
+ private static final Logger logger = LoggerFactory.getLogger(DefaultWebKafkaConsumer.class);
+
+ private final KafkaConsumer kafkaConsumer;
+ private final ClientConfig clientConfig;
+ private List cachedTopicsAndPartitions = null;
+ private final Duration pollTimeoutDuration;
+
+ /**
+ * Constructor.
+ * @param kafkaConsumer The underlying/wrapped KafkaConsumer instance.
+ * @param clientConfig The client configuration.
+ */
+ public DefaultWebKafkaConsumer(final KafkaConsumer kafkaConsumer, final ClientConfig clientConfig) {
+ this.kafkaConsumer = kafkaConsumer;
+ this.clientConfig = clientConfig;
+ this.pollTimeoutDuration = Duration.ofMillis(clientConfig.getPollTimeoutMs());
+ }
+
+ private List consume() {
+ final List kafkaResultList = new ArrayList<>();
+ final ConsumerRecords consumerRecords = kafkaConsumer.poll(pollTimeoutDuration);
+
+ logger.info("Consumed {} records", consumerRecords.count());
+ final Iterator recordIterator = consumerRecords.iterator();
+ while (recordIterator.hasNext()) {
+ // Get next record
+ final ConsumerRecord consumerRecord = recordIterator.next();
+
+ // Convert to KafkaResult.
+ final KafkaResult kafkaResult = new KafkaResult(
+ consumerRecord.partition(),
+ consumerRecord.offset(),
+ consumerRecord.timestamp(),
+ consumerRecord.key(),
+ consumerRecord.value()
+ );
+
+ // Add to list.
+ kafkaResultList.add(kafkaResult);
+ }
+
+ // Commit offsets
+ commit();
+ return kafkaResultList;
+ }
+
+ /**
+ * Retrieves next batch of records per partition.
+ * @return KafkaResults object containing any found records.
+ */
+ @Override
+ public KafkaResults consumePerPartition() {
+ final Map> resultsByPartition = new TreeMap<>();
+ for (final TopicPartition topicPartition: getAllPartitions()) {
+ // Subscribe to just that topic partition
+ kafkaConsumer.assign(Collections.singleton(topicPartition));
+
+ // consume
+ final List kafkaResults = consume();
+
+ logger.info("Consumed Partition {} Records: {}", topicPartition.partition(), kafkaResults.size());
+ resultsByPartition.put(topicPartition.partition(), kafkaResults);
+ }
+
+ // Reassign all partitions
+ kafkaConsumer.assign(getAllPartitions());
+
+ // Loop over results
+ final List allResults = new ArrayList<>();
+ for (final List results: resultsByPartition.values()) {
+ allResults.addAll(results);
+ }
+
+ // Create return object
+ return new KafkaResults(
+ allResults,
+ getConsumerState().getOffsets(),
+ getHeadOffsets(),
+ getTailOffsets()
+ );
+ }
+
+ /**
+ * Seek to the specified offsets.
+ * @param partitionOffsetMap Map of PartitionId to Offset to seek to.
+ * @return ConsumerState representing the consumer's positions.
+ */
+ @Override
+ public ConsumerState seek(final Map partitionOffsetMap) {
+ for (final Map.Entry entry: partitionOffsetMap.entrySet()) {
+ kafkaConsumer.seek(
+ new TopicPartition(clientConfig.getTopicConfig().getTopicName(), entry.getKey()),
+ entry.getValue()
+ );
+ }
+ commit();
+ return getConsumerState();
+ }
+
+ /**
+ * Seek consumer to specific timestamp
+ * @param timestamp Unix timestamp in milliseconds to seek to.
+ */
+ @Override
+ public ConsumerState seek(final long timestamp) {
+ // Find offsets for timestamp
+ final Map timestampMap = new HashMap<>();
+ for (final TopicPartition topicPartition: getAllPartitions()) {
+ timestampMap.put(topicPartition, timestamp);
+ }
+ final Map offsetMap = kafkaConsumer.offsetsForTimes(timestampMap);
+
+ // Build map of partition => offset
+ final Map partitionOffsetMap = new HashMap<>();
+ for (Map.Entry entry: offsetMap.entrySet()) {
+ partitionOffsetMap.put(entry.getKey().partition(), entry.getValue().offset());
+ }
+
+ // Now lets seek to those offsets
+ return seek(partitionOffsetMap);
+ }
+
+ private List getHeadOffsets() {
+ final Map results = kafkaConsumer.beginningOffsets(getAllPartitions());
+
+ final List offsets = new ArrayList<>();
+ for (final Map.Entry entry : results.entrySet()) {
+ offsets.add(new PartitionOffset(entry.getKey().partition(), entry.getValue()));
+ }
+ return offsets;
+ }
+
+ private List getTailOffsets() {
+ final Map results = kafkaConsumer.endOffsets(getAllPartitions());
+
+ final List offsets = new ArrayList<>();
+ for (final Map.Entry entry : results.entrySet()) {
+ offsets.add(new PartitionOffset(entry.getKey().partition(), entry.getValue()));
+ }
+ return offsets;
+ }
+
+ private ConsumerState getConsumerState() {
+ final List offsets = new ArrayList<>();
+
+ for (final TopicPartition topicPartition: getAllPartitions()) {
+ final long offset = kafkaConsumer.position(topicPartition);
+ offsets.add(new PartitionOffset(topicPartition.partition(), offset));
+ }
+
+ return new ConsumerState(clientConfig.getTopicConfig().getTopicName(), offsets);
+ }
+
+ private List getAllPartitions() {
+ // If we have not pulled this yet
+ if (cachedTopicsAndPartitions == null) {
+ // Determine which partitions to subscribe to, for now do all
+ final List partitionInfos = kafkaConsumer.partitionsFor(clientConfig.getTopicConfig().getTopicName());
+
+ // Pull out partitions, convert to topic partitions
+ cachedTopicsAndPartitions = new ArrayList<>();
+ for (final PartitionInfo partitionInfo : partitionInfos) {
+ // Skip filtered partitions
+ if (!clientConfig.isPartitionFiltered(partitionInfo.partition())) {
+ cachedTopicsAndPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+ }
+ }
+ }
+ return cachedTopicsAndPartitions;
+ }
+
+ private void commit() {
+ kafkaConsumer.commitSync();
+ }
+
+ /**
+ * Closes out the consumer.
+ */
+ @Override
+ public void close() {
+ kafkaConsumer.close();
+ }
+
+ /**
+ * Seek to the previous 'page' of records.
+ */
+ @Override
+ public void previous() {
+ // Get all available partitions
+ final List topicPartitions = getAllPartitions();
+
+ // Get head offsets for each partition
+ final Map headOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
+
+ // Loop over each partition
+ for (final TopicPartition topicPartition: topicPartitions) {
+ // Calculate our previous offsets
+ final long headOffset = headOffsets.get(topicPartition);
+ final long currentOffset = kafkaConsumer.position(topicPartition);
+ long newOffset = currentOffset - (clientConfig.getMaxResultsPerPartition() * 2);
+
+ // Can't go before the head position!
+ if (newOffset < headOffset) {
+ newOffset = headOffset;
+ }
+
+ logger.info("Partition: {} Previous Offset: {} New Offset: {}", topicPartition.partition(), currentOffset, newOffset);
+
+ // Seek to earlier offset
+ kafkaConsumer.seek(topicPartition, newOffset);
+ }
+ commit();
+ }
+
+ /**
+ * Seek to the next 'page' of records.
+ */
+ @Override
+ public void next() {
+ // Get all available partitions
+ final List topicPartitions = getAllPartitions();
+
+ // Get head offsets for each partition
+ final Map tailOffsets = kafkaConsumer.endOffsets(topicPartitions);
+
+ // Loop over each partition
+ for (final TopicPartition topicPartition: topicPartitions) {
+ // Calculate our previous offsets
+ final long tailOffset = tailOffsets.get(topicPartition);
+ final long currentOffset = kafkaConsumer.position(topicPartition);
+ long newOffset = currentOffset + clientConfig.getMaxResultsPerPartition();
+
+ if (newOffset < tailOffset) {
+ newOffset = tailOffset;
+ }
+ logger.info("Partition: {} Previous Offset: {} New Offset: {}", topicPartition.partition(), currentOffset, newOffset);
+
+ // Seek to earlier offset
+ kafkaConsumer.seek(topicPartition, newOffset);
+ }
+ commit();
+ }
+
+ /**
+ * Seek to the HEAD of a topic.
+ */
+ @Override
+ public ConsumerState toHead() {
+ // Get all available partitions
+ final List topicPartitions = getAllPartitions();
+
+ // Get head offsets for each partition
+ final Map headOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
+
+ // Loop over each partition
+ for (final TopicPartition topicPartition: topicPartitions) {
+ final long newOffset = headOffsets.get(topicPartition);
+ logger.info("Resetting Partition: {} To Head Offset: {}", topicPartition.partition(), newOffset);
+
+ // Seek to earlier offset
+ kafkaConsumer.seek(topicPartition, newOffset);
+ }
+ commit();
+
+ return getConsumerState();
+ }
+
+ /**
+ * Seek to the TAIL of a topic.
+ */
+ @Override
+ public ConsumerState toTail() {
+ // Get all available partitions
+ final List topicPartitions = getAllPartitions();
+
+ // Get head offsets for each partition
+ final Map tailOffsets = kafkaConsumer.endOffsets(topicPartitions);
+
+ // Loop over each partition
+ for (final TopicPartition topicPartition: topicPartitions) {
+ final long newOffset = tailOffsets.get(topicPartition);
+ logger.info("Resetting Partition: {} To Tail Offset: {}", topicPartition.partition(), newOffset);
+
+ // Seek to earlier offset
+ kafkaConsumer.seek(topicPartition, newOffset);
+ }
+ commit();
+
+ return getConsumerState();
+ }
+}
diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/ParallelWebKafkaConsumer.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/ParallelWebKafkaConsumer.java
new file mode 100644
index 00000000..66eea7b0
--- /dev/null
+++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/ParallelWebKafkaConsumer.java
@@ -0,0 +1,372 @@
+/**
+ * MIT License
+ *
+ * Copyright (c) 2017, 2018, 2019 SourceLab.org (https://github.com/SourceLabOrg/kafka-webview/)
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in all
+ * copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+ * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+ * SOFTWARE.
+ */
+
+package org.sourcelab.kafka.webview.ui.manager.kafka;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.sourcelab.kafka.webview.ui.manager.kafka.config.ClientConfig;
+import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerState;
+import org.sourcelab.kafka.webview.ui.manager.kafka.dto.KafkaResult;
+import org.sourcelab.kafka.webview.ui.manager.kafka.dto.KafkaResults;
+import org.sourcelab.kafka.webview.ui.manager.kafka.dto.PartitionOffset;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Wrapper around KafkaConsumer. This instance is intended to be short lived and only live
+ * during the life-time of a single web request.
+ *
+ * In order to provide a relatively "sane" ability to "page" through results in a consistent way, this
+ * consumes from each partition in parallel and merges the results.
+ *
+ * The parallelization factor is determined by the ExecutorService provided to the constructor.
+ */
+public class ParallelWebKafkaConsumer implements WebKafkaConsumer {
+
+ private static final Logger logger = LoggerFactory.getLogger(ParallelWebKafkaConsumer.class);
+
+ private final KafkaConsumerFactory kafkaConsumerFactory;
+ private final ClientConfig clientConfig;
+ private final Duration pollTimeoutDuration;
+ private final ExecutorService executorService;
+
+ private List cachedTopicsAndPartitions = null;
+
+ /**
+ * Constructor.
+ * @param kafkaConsumerFactory Factor for creating KafkaConsumer instances.
+ * @param clientConfig Client configuration.
+ * @param executorService ExecutorService to submit parallel consuming tasks to.
+ */
+ public ParallelWebKafkaConsumer(
+ final KafkaConsumerFactory kafkaConsumerFactory,
+ final ClientConfig clientConfig,
+ final ExecutorService executorService
+ ) {
+ this.kafkaConsumerFactory = kafkaConsumerFactory;
+ this.clientConfig = clientConfig;
+ this.pollTimeoutDuration = Duration.ofMillis(clientConfig.getPollTimeoutMs());
+ this.executorService = executorService;
+ }
+
+ @Override
+ public KafkaResults consumePerPartition() {
+ try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
+ final List allTopicPartitions = getAllPartitions(kafkaConsumer);
+
+ // To preserve order
+ final Map>> completableFuturesByPartition = new TreeMap<>();
+
+ // Loop over each topic partition
+ for (final TopicPartition topicPartition : allTopicPartitions) {
+
+ // Create a new async task
+ final CompletableFuture> future = CompletableFuture.supplyAsync(() -> {
+ // Create a new consumer for each task
+ try (final KafkaConsumer perTopicConsumer = createNewConsumer()) {
+ // Subscribe to just the single topic partition
+ perTopicConsumer.assign(Collections.singleton(topicPartition));
+
+ // consume messages from that partition
+ return consume(perTopicConsumer);
+ }
+ }, executorService);
+
+ // Keep references to our ASync Tasks
+ completableFuturesByPartition.put(topicPartition.partition(), future);
+ }
+
+ // Merge results.
+ final List allResults = new ArrayList<>();
+ completableFuturesByPartition.forEach((partition, future) -> {
+ allResults.addAll(future.join());
+ });
+
+ // Create return object
+ return new KafkaResults(
+ allResults,
+ getConsumerState(kafkaConsumer).getOffsets(),
+ getHeadOffsets(kafkaConsumer),
+ getTailOffsets(kafkaConsumer)
+ );
+ }
+ }
+
+ @Override
+ public ConsumerState seek(final Map partitionOffsetMap) {
+ try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
+ for (final Map.Entry entry : partitionOffsetMap.entrySet()) {
+ kafkaConsumer.seek(
+ new TopicPartition(clientConfig.getTopicConfig().getTopicName(), entry.getKey()),
+ entry.getValue()
+ );
+ }
+ commit(kafkaConsumer);
+ return getConsumerState(kafkaConsumer);
+ }
+ }
+
+ @Override
+ public ConsumerState seek(final long timestamp) {
+ try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
+ // Find offsets for timestamp
+ final Map timestampMap = new HashMap<>();
+ for (final TopicPartition topicPartition : getAllPartitions(kafkaConsumer)) {
+ timestampMap.put(topicPartition, timestamp);
+ }
+ final Map offsetMap = kafkaConsumer.offsetsForTimes(timestampMap);
+
+ // Build map of partition => offset
+ final Map partitionOffsetMap = new HashMap<>();
+ for (Map.Entry entry : offsetMap.entrySet()) {
+ partitionOffsetMap.put(entry.getKey().partition(), entry.getValue().offset());
+ }
+
+ // Now lets seek to those offsets
+ return seek(partitionOffsetMap);
+ }
+ }
+
+ @Override
+ public void close() {
+ // no-op.
+ // Since a single ExecutorService is shared between instances of this class,
+ // we should not close it out.
+ }
+
+ @Override
+ public void previous() {
+ try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
+ // Get all available partitions
+ final List topicPartitions = getAllPartitions(kafkaConsumer);
+
+ // Get head offsets for each partition
+ final Map headOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
+
+ // Loop over each partition
+ for (final TopicPartition topicPartition : topicPartitions) {
+ // Calculate our previous offsets
+ final long headOffset = headOffsets.get(topicPartition);
+ final long currentOffset = kafkaConsumer.position(topicPartition);
+ long newOffset = currentOffset - (clientConfig.getMaxResultsPerPartition() * 2);
+
+ // Can't go before the head position!
+ if (newOffset < headOffset) {
+ newOffset = headOffset;
+ }
+
+ logger.info("Partition: {} Previous Offset: {} New Offset: {}", topicPartition.partition(), currentOffset, newOffset);
+
+ // Seek to earlier offset
+ kafkaConsumer.seek(topicPartition, newOffset);
+ }
+ commit(kafkaConsumer);
+ }
+ }
+
+ @Override
+ public void next() {
+ try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
+ // Get all available partitions
+ final List topicPartitions = getAllPartitions(kafkaConsumer);
+
+ // Get head offsets for each partition
+ final Map tailOffsets = kafkaConsumer.endOffsets(topicPartitions);
+
+ // Loop over each partition
+ for (final TopicPartition topicPartition : topicPartitions) {
+ // Calculate our previous offsets
+ final long tailOffset = tailOffsets.get(topicPartition);
+ final long currentOffset = kafkaConsumer.position(topicPartition);
+ long newOffset = currentOffset + clientConfig.getMaxResultsPerPartition();
+
+ if (newOffset < tailOffset) {
+ newOffset = tailOffset;
+ }
+ logger.info("Partition: {} Previous Offset: {} New Offset: {}", topicPartition.partition(), currentOffset, newOffset);
+
+ // Seek to earlier offset
+ kafkaConsumer.seek(topicPartition, newOffset);
+ }
+ commit(kafkaConsumer);
+ }
+ }
+
+ @Override
+ public ConsumerState toHead() {
+ try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
+ // Get all available partitions
+ final List topicPartitions = getAllPartitions(kafkaConsumer);
+
+ // Get head offsets for each partition
+ final Map headOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
+
+ // Loop over each partition
+ for (final TopicPartition topicPartition : topicPartitions) {
+ final long newOffset = headOffsets.get(topicPartition);
+ logger.info("Resetting Partition: {} To Head Offset: {}", topicPartition.partition(), newOffset);
+
+ // Seek to earlier offset
+ kafkaConsumer.seek(topicPartition, newOffset);
+ }
+ commit(kafkaConsumer);
+ return getConsumerState(kafkaConsumer);
+ }
+ }
+
+ @Override
+ public ConsumerState toTail() {
+ try (final KafkaConsumer kafkaConsumer = createNewConsumer()) {
+ // Get all available partitions
+ final List topicPartitions = getAllPartitions(kafkaConsumer);
+
+ // Get head offsets for each partition
+ final Map tailOffsets = kafkaConsumer.endOffsets(topicPartitions);
+
+ // Loop over each partition
+ for (final TopicPartition topicPartition : topicPartitions) {
+ final long newOffset = tailOffsets.get(topicPartition);
+ logger.info("Resetting Partition: {} To Tail Offset: {}", topicPartition.partition(), newOffset);
+
+ // Seek to earlier offset
+ kafkaConsumer.seek(topicPartition, newOffset);
+ }
+ commit(kafkaConsumer);
+
+ return getConsumerState(kafkaConsumer);
+ }
+ }
+
+ private KafkaConsumer createNewConsumer() {
+ return kafkaConsumerFactory.createConsumerAndSubscribe(clientConfig);
+ }
+
+ private ConsumerState getConsumerState(final KafkaConsumer kafkaConsumer) {
+ final List offsets = new ArrayList<>();
+
+ for (final TopicPartition topicPartition: getAllPartitions(kafkaConsumer)) {
+ final long offset = kafkaConsumer.position(topicPartition);
+ offsets.add(new PartitionOffset(topicPartition.partition(), offset));
+ }
+
+ return new ConsumerState(clientConfig.getTopicConfig().getTopicName(), offsets);
+ }
+
+ /**
+ * Mark synchronized to prevent multi-threaded weirdness.
+ */
+ private List getAllPartitions(final KafkaConsumer kafkaConsumer) {
+ // If we have not pulled this yet
+ if (cachedTopicsAndPartitions == null) {
+ // Attempt to prevent multi-threaded weirdness.
+ synchronized (this) {
+ if (cachedTopicsAndPartitions == null) {
+ // Determine which partitions to subscribe to, for now do all
+ final List partitionInfos = kafkaConsumer.partitionsFor(clientConfig.getTopicConfig().getTopicName());
+
+ // Pull out partitions, convert to topic partitions
+ final List tempHolder = new ArrayList<>();
+ for (final PartitionInfo partitionInfo : partitionInfos) {
+ // Skip filtered partitions
+ if (!clientConfig.isPartitionFiltered(partitionInfo.partition())) {
+ tempHolder.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+ }
+ }
+ cachedTopicsAndPartitions = Collections.unmodifiableList(tempHolder);
+ }
+ }
+
+ }
+ return cachedTopicsAndPartitions;
+ }
+
+ private List getHeadOffsets(final KafkaConsumer kafkaConsumer) {
+ final Map results = kafkaConsumer.beginningOffsets(getAllPartitions(kafkaConsumer));
+
+ final List offsets = new ArrayList<>();
+ for (final Map.Entry entry : results.entrySet()) {
+ offsets.add(new PartitionOffset(entry.getKey().partition(), entry.getValue()));
+ }
+ return offsets;
+ }
+
+ private List getTailOffsets(final KafkaConsumer kafkaConsumer) {
+ final Map results = kafkaConsumer.endOffsets(getAllPartitions(kafkaConsumer));
+
+ final List offsets = new ArrayList<>();
+ for (final Map.Entry entry : results.entrySet()) {
+ offsets.add(new PartitionOffset(entry.getKey().partition(), entry.getValue()));
+ }
+ return offsets;
+ }
+
+
+ private void commit(final KafkaConsumer kafkaConsumer) {
+ kafkaConsumer.commitSync();
+ }
+
+ private List consume(final KafkaConsumer kafkaConsumer) {
+ final List kafkaResultList = new ArrayList<>();
+ final ConsumerRecords consumerRecords = kafkaConsumer.poll(pollTimeoutDuration);
+
+ logger.info("Consumed {} records", consumerRecords.count());
+ final Iterator recordIterator = consumerRecords.iterator();
+ while (recordIterator.hasNext()) {
+ // Get next record
+ final ConsumerRecord consumerRecord = recordIterator.next();
+
+ // Convert to KafkaResult.
+ final KafkaResult kafkaResult = new KafkaResult(
+ consumerRecord.partition(),
+ consumerRecord.offset(),
+ consumerRecord.timestamp(),
+ consumerRecord.key(),
+ consumerRecord.value()
+ );
+
+ // Add to list.
+ kafkaResultList.add(kafkaResult);
+ }
+
+ // Commit offsets
+ commit(kafkaConsumer);
+ return kafkaResultList;
+ }
+}
diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/WebKafkaConsumer.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/WebKafkaConsumer.java
index 6fc6b960..fd170f90 100644
--- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/WebKafkaConsumer.java
+++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/WebKafkaConsumer.java
@@ -24,314 +24,57 @@
package org.sourcelab.kafka.webview.ui.manager.kafka;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.sourcelab.kafka.webview.ui.manager.kafka.config.ClientConfig;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.ConsumerState;
-import org.sourcelab.kafka.webview.ui.manager.kafka.dto.KafkaResult;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.KafkaResults;
-import org.sourcelab.kafka.webview.ui.manager.kafka.dto.PartitionOffset;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
/**
- * Wrapper around KafkaConsumer. This instance is intended to be short lived and only live
- * during the life-time of a single web request.
+ * Interface for a KafkaConsumer used for consuming from Kafka from on-line web requests.
*/
-public class WebKafkaConsumer implements AutoCloseable {
- private static final Logger logger = LoggerFactory.getLogger(WebKafkaConsumer.class);
-
- private final KafkaConsumer kafkaConsumer;
- private final ClientConfig clientConfig;
- private List cachedTopicsAndPartitions = null;
- private final Duration pollTimeoutDuration;
-
- /**
- * Constructor.
- * @param kafkaConsumer The underlying/wrapped KafkaConsumer instance.
- * @param clientConfig The client configuration.
- */
- public WebKafkaConsumer(final KafkaConsumer kafkaConsumer, final ClientConfig clientConfig) {
- this.kafkaConsumer = kafkaConsumer;
- this.clientConfig = clientConfig;
- this.pollTimeoutDuration = Duration.ofMillis(clientConfig.getPollTimeoutMs());
- }
-
- private List consume() {
- final List kafkaResultList = new ArrayList<>();
- final ConsumerRecords consumerRecords = kafkaConsumer.poll(pollTimeoutDuration);
-
- logger.info("Consumed {} records", consumerRecords.count());
- final Iterator recordIterator = consumerRecords.iterator();
- while (recordIterator.hasNext()) {
- // Get next record
- final ConsumerRecord consumerRecord = recordIterator.next();
-
- // Convert to KafkaResult.
- final KafkaResult kafkaResult = new KafkaResult(
- consumerRecord.partition(),
- consumerRecord.offset(),
- consumerRecord.timestamp(),
- consumerRecord.key(),
- consumerRecord.value()
- );
-
- // Add to list.
- kafkaResultList.add(kafkaResult);
- }
-
- // Commit offsets
- commit();
- return kafkaResultList;
- }
+public interface WebKafkaConsumer extends AutoCloseable {
/**
* Retrieves next batch of records per partition.
* @return KafkaResults object containing any found records.
*/
- public KafkaResults consumePerPartition() {
- final Map> resultsByPartition = new TreeMap<>();
- for (final TopicPartition topicPartition: getAllPartitions()) {
- // Subscribe to just that topic partition
- kafkaConsumer.assign(Collections.singleton(topicPartition));
-
- // consume
- final List kafkaResults = consume();
-
- logger.info("Consumed Partition {} Records: {}", topicPartition.partition(), kafkaResults.size());
- resultsByPartition.put(topicPartition.partition(), kafkaResults);
- }
-
- // Reassign all partitions
- kafkaConsumer.assign(getAllPartitions());
-
- // Loop over results
- final List allResults = new ArrayList<>();
- for (final List results: resultsByPartition.values()) {
- allResults.addAll(results);
- }
-
- // Create return object
- return new KafkaResults(
- allResults,
- getConsumerState().getOffsets(),
- getHeadOffsets(),
- getTailOffsets()
- );
- }
+ KafkaResults consumePerPartition();
/**
* Seek to the specified offsets.
* @param partitionOffsetMap Map of PartitionId to Offset to seek to.
* @return ConsumerState representing the consumer's positions.
*/
- public ConsumerState seek(final Map partitionOffsetMap) {
- for (final Map.Entry entry: partitionOffsetMap.entrySet()) {
- kafkaConsumer.seek(
- new TopicPartition(clientConfig.getTopicConfig().getTopicName(), entry.getKey()),
- entry.getValue()
- );
- }
- commit();
- return getConsumerState();
- }
+ ConsumerState seek(final Map partitionOffsetMap);
/**
* Seek consumer to specific timestamp
* @param timestamp Unix timestamp in milliseconds to seek to.
*/
- public ConsumerState seek(final long timestamp) {
- // Find offsets for timestamp
- final Map timestampMap = new HashMap<>();
- for (final TopicPartition topicPartition: getAllPartitions()) {
- timestampMap.put(topicPartition, timestamp);
- }
- final Map offsetMap = kafkaConsumer.offsetsForTimes(timestampMap);
-
- // Build map of partition => offset
- final Map partitionOffsetMap = new HashMap<>();
- for (Map.Entry entry: offsetMap.entrySet()) {
- partitionOffsetMap.put(entry.getKey().partition(), entry.getValue().offset());
- }
-
- // Now lets seek to those offsets
- return seek(partitionOffsetMap);
- }
-
- private List getHeadOffsets() {
- final Map results = kafkaConsumer.beginningOffsets(getAllPartitions());
-
- final List offsets = new ArrayList<>();
- for (final Map.Entry entry : results.entrySet()) {
- offsets.add(new PartitionOffset(entry.getKey().partition(), entry.getValue()));
- }
- return offsets;
- }
-
- private List getTailOffsets() {
- final Map results = kafkaConsumer.endOffsets(getAllPartitions());
-
- final List offsets = new ArrayList<>();
- for (final Map.Entry entry : results.entrySet()) {
- offsets.add(new PartitionOffset(entry.getKey().partition(), entry.getValue()));
- }
- return offsets;
- }
-
- private ConsumerState getConsumerState() {
- final List offsets = new ArrayList<>();
-
- for (final TopicPartition topicPartition: getAllPartitions()) {
- final long offset = kafkaConsumer.position(topicPartition);
- offsets.add(new PartitionOffset(topicPartition.partition(), offset));
- }
-
- return new ConsumerState(clientConfig.getTopicConfig().getTopicName(), offsets);
- }
-
- private List getAllPartitions() {
- // If we have not pulled this yet
- if (cachedTopicsAndPartitions == null) {
- // Determine which partitions to subscribe to, for now do all
- final List partitionInfos = kafkaConsumer.partitionsFor(clientConfig.getTopicConfig().getTopicName());
-
- // Pull out partitions, convert to topic partitions
- cachedTopicsAndPartitions = new ArrayList<>();
- for (final PartitionInfo partitionInfo : partitionInfos) {
- // Skip filtered partitions
- if (!clientConfig.isPartitionFiltered(partitionInfo.partition())) {
- cachedTopicsAndPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
- }
- }
- }
- return cachedTopicsAndPartitions;
- }
-
- private void commit() {
- kafkaConsumer.commitSync();
- }
+ ConsumerState seek(final long timestamp);
/**
* Closes out the consumer.
*/
- public void close() {
- kafkaConsumer.close();
- }
+ void close();
/**
* Seek to the previous 'page' of records.
*/
- public void previous() {
- // Get all available partitions
- final List topicPartitions = getAllPartitions();
-
- // Get head offsets for each partition
- final Map headOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
-
- // Loop over each partition
- for (final TopicPartition topicPartition: topicPartitions) {
- // Calculate our previous offsets
- final long headOffset = headOffsets.get(topicPartition);
- final long currentOffset = kafkaConsumer.position(topicPartition);
- long newOffset = currentOffset - (clientConfig.getMaxResultsPerPartition() * 2);
-
- // Can't go before the head position!
- if (newOffset < headOffset) {
- newOffset = headOffset;
- }
-
- logger.info("Partition: {} Previous Offset: {} New Offset: {}", topicPartition.partition(), currentOffset, newOffset);
-
- // Seek to earlier offset
- kafkaConsumer.seek(topicPartition, newOffset);
- }
- commit();
- }
+ void previous();
/**
* Seek to the next 'page' of records.
*/
- public void next() {
- // Get all available partitions
- final List topicPartitions = getAllPartitions();
-
- // Get head offsets for each partition
- final Map tailOffsets = kafkaConsumer.endOffsets(topicPartitions);
-
- // Loop over each partition
- for (final TopicPartition topicPartition: topicPartitions) {
- // Calculate our previous offsets
- final long tailOffset = tailOffsets.get(topicPartition);
- final long currentOffset = kafkaConsumer.position(topicPartition);
- long newOffset = currentOffset + clientConfig.getMaxResultsPerPartition();
-
- if (newOffset < tailOffset) {
- newOffset = tailOffset;
- }
- logger.info("Partition: {} Previous Offset: {} New Offset: {}", topicPartition.partition(), currentOffset, newOffset);
-
- // Seek to earlier offset
- kafkaConsumer.seek(topicPartition, newOffset);
- }
- commit();
- }
+ void next();
/**
* Seek to the HEAD of a topic.
*/
- public ConsumerState toHead() {
- // Get all available partitions
- final List topicPartitions = getAllPartitions();
-
- // Get head offsets for each partition
- final Map headOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
-
- // Loop over each partition
- for (final TopicPartition topicPartition: topicPartitions) {
- final long newOffset = headOffsets.get(topicPartition);
- logger.info("Resetting Partition: {} To Head Offset: {}", topicPartition.partition(), newOffset);
-
- // Seek to earlier offset
- kafkaConsumer.seek(topicPartition, newOffset);
- }
- commit();
-
- return getConsumerState();
- }
+ ConsumerState toHead();
/**
* Seek to the TAIL of a topic.
*/
- public ConsumerState toTail() {
- // Get all available partitions
- final List topicPartitions = getAllPartitions();
-
- // Get head offsets for each partition
- final Map tailOffsets = kafkaConsumer.endOffsets(topicPartitions);
-
- // Loop over each partition
- for (final TopicPartition topicPartition: topicPartitions) {
- final long newOffset = tailOffsets.get(topicPartition);
- logger.info("Resetting Partition: {} To Tail Offset: {}", topicPartition.partition(), newOffset);
-
- // Seek to earlier offset
- kafkaConsumer.seek(topicPartition, newOffset);
- }
- commit();
-
- return getConsumerState();
- }
+ ConsumerState toTail();
}
diff --git a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/WebKafkaConsumerFactory.java b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/WebKafkaConsumerFactory.java
index c1a7977a..6d3c13e3 100644
--- a/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/WebKafkaConsumerFactory.java
+++ b/kafka-webview-ui/src/main/java/org/sourcelab/kafka/webview/ui/manager/kafka/WebKafkaConsumerFactory.java
@@ -53,6 +53,7 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
/**
* Factory class for creating new Kafka Consumers to be used by WebRequests.
@@ -70,6 +71,9 @@ public class WebKafkaConsumerFactory {
private final SecretManager secretManager;
private final KafkaConsumerFactory kafkaConsumerFactory;
+ // Multi-threaded consumer
+ private final ExecutorService multiThreadedConsumerThreadPool;
+
// For parsing json options
private final ObjectMapper mapper = new ObjectMapper();
@@ -80,11 +84,13 @@ public WebKafkaConsumerFactory(
final PluginFactory deserializerPluginFactory,
final PluginFactory recordFilterPluginFactory,
final SecretManager secretManager,
- final KafkaConsumerFactory kafkaConsumerFactory) {
+ final KafkaConsumerFactory kafkaConsumerFactory,
+ final ExecutorService multiThreadedConsumerThreadPool) {
this.deserializerPluginFactory = deserializerPluginFactory;
this.recordFilterPluginFactory = recordFilterPluginFactory;
this.secretManager = secretManager;
this.kafkaConsumerFactory = kafkaConsumerFactory;
+ this.multiThreadedConsumerThreadPool = multiThreadedConsumerThreadPool;
}
/**
@@ -105,11 +111,15 @@ public WebKafkaConsumer createWebClient(
.withStartingPosition(StartingPosition.newResumeFromExistingState())
.build();
- // Create kafka consumer
- final KafkaConsumer kafkaConsumer = createKafkaConsumer(clientConfig);
-
- // Create consumer
- return new WebKafkaConsumer(kafkaConsumer, clientConfig);
+ // If we've been passed an executor service
+ if (multiThreadedConsumerThreadPool != null) {
+ // Assume we want to use multi-threaded consumer.
+ return new ParallelWebKafkaConsumer(kafkaConsumerFactory, clientConfig, multiThreadedConsumerThreadPool);
+ } else {
+ // Create single threaded kafka consumer
+ final KafkaConsumer kafkaConsumer = createKafkaConsumer(clientConfig);
+ return new DefaultWebKafkaConsumer(kafkaConsumer, clientConfig);
+ }
}
/**
diff --git a/kafka-webview-ui/src/main/resources/config/base.yml b/kafka-webview-ui/src/main/resources/config/base.yml
index f383b192..93d04745 100644
--- a/kafka-webview-ui/src/main/resources/config/base.yml
+++ b/kafka-webview-ui/src/main/resources/config/base.yml
@@ -48,6 +48,8 @@ app:
name: Kafka Web View
uploadPath: "./data/uploads"
key: "SuperSecretKey"
+ multiThreadedConsumer: true
+ maxConcurrentWebConsumers: 32
maxConcurrentWebSocketConsumers: 64
consumerIdPrefix: "KafkaWebViewConsumer"
requireSsl: true
diff --git a/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/WebKafkaConsumerFactoryTest.java b/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/WebKafkaConsumerFactoryTest.java
index a445e092..623a5961 100644
--- a/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/WebKafkaConsumerFactoryTest.java
+++ b/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/WebKafkaConsumerFactoryTest.java
@@ -25,18 +25,22 @@
package org.sourcelab.kafka.webview.ui.manager.kafka;
import com.salesforce.kafka.test.junit4.SharedKafkaTestResource;
+import junitparams.JUnitParamsRunner;
+import junitparams.Parameters;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
+import org.junit.AfterClass;
import org.junit.Before;
+import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
+import org.junit.runner.RunWith;
import org.sourcelab.kafka.webview.ui.manager.encryption.SecretManager;
import org.sourcelab.kafka.webview.ui.manager.kafka.config.FilterDefinition;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.KafkaResult;
import org.sourcelab.kafka.webview.ui.manager.kafka.dto.KafkaResults;
import org.sourcelab.kafka.webview.ui.manager.plugin.PluginFactory;
import org.sourcelab.kafka.webview.ui.model.Cluster;
-import org.sourcelab.kafka.webview.ui.model.Filter;
import org.sourcelab.kafka.webview.ui.model.MessageFormat;
import org.sourcelab.kafka.webview.ui.model.View;
import org.sourcelab.kafka.webview.ui.plugin.filter.RecordFilter;
@@ -47,19 +51,43 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+@RunWith(JUnitParamsRunner.class)
public class WebKafkaConsumerFactoryTest {
@ClassRule
public static SharedKafkaTestResource sharedKafkaTestResource = new SharedKafkaTestResource();
+ /**
+ * Shared executor service for multi-threaded consuming.
+ */
+ private static ExecutorService executorService;
+
private String topic1;
private String topic2;
+ /**
+ * Create executor service.
+ */
+ @BeforeClass
+ public static void setup() {
+ executorService = Executors.newFixedThreadPool(10);
+ }
+
+ /**
+ * Cleanup executor service after finished.
+ */
+ @AfterClass
+ public static void tearDown() {
+ executorService.shutdownNow();
+ }
+
@Before
public void beforeTest() {
this.topic1 = "FirstTopic" + System.currentTimeMillis();
@@ -92,16 +120,21 @@ public void beforeTest() {
.produceRecords(10, topic2, 1);
}
+ public Object[] provideFactoryInstances() {
+ return new Object[]{
+ new Object[]{createDefaultFactory()},
+ new Object[]{createMultiThreadedFactory()},
+ };
+ }
+
/**
* Smoke test over webClient, using no record filters or partition filtering.
*/
@Test
- public void smokeTestWebClient_noFilters_allPartitions() {
+ @Parameters(method = "provideFactoryInstances")
+ public void smokeTestWebClient_noFilters_allPartitions(final WebKafkaConsumerFactory factory) {
final int resultsPerPartition = 5;
- // Create factory instance.
- final WebKafkaConsumerFactory factory = createDefaultFactory();
-
// Create default view.
final View view = createDefaultView(topic1);
@@ -130,12 +163,10 @@ public void smokeTestWebClient_noFilters_allPartitions() {
* Smoke test over webClient, using no record filters but only a single partition.
*/
@Test
- public void smokeTestWebClient_noFilters_artitionFilter() {
+ @Parameters(method = "provideFactoryInstances")
+ public void smokeTestWebClient_noFilters_partitionFilter(final WebKafkaConsumerFactory factory) {
final int resultsPerPartition = 5;
- // Create factory instance.
- final WebKafkaConsumerFactory factory = createDefaultFactory();
-
// Create default view.
final View view = createDefaultView(topic1);
@@ -164,56 +195,6 @@ public void smokeTestWebClient_noFilters_artitionFilter() {
}
}
- /**
- * Smoke test over webClient, using record filter to skip partition 1.
- *
- * TODO finish this one up.
- */
- //@Test
- public void smokeTestWebClient_withFilter_allPartitions() {
- final int resultsPerPartition = 5;
-
- // Create factory instance.
- final WebKafkaConsumerFactory factory = createDefaultFactory();
-
- // Create default view.
- final View view = createDefaultView(topic1);
-
- // Set results per partition to 5
- view.setResultsPerPartition(resultsPerPartition);
-
- final Filter filter = new Filter();
- filter.setName("My Partition Filter");
- filter.setJar("TODO");
- filter.setClasspath(TestPartitionFilter.class.getCanonicalName());
-
- final Map options = new HashMap<>();
- options.put("partition", "1");
-
- // Define filter
- final FilterDefinition filterDefinition = new FilterDefinition(filter, options);
- final List filterDefinitions = new ArrayList<>();
- filterDefinitions.add(filterDefinition);
-
- // Create SessionId
- final SessionIdentifier sessionId = SessionIdentifier.newWebIdentifier(12L, "MySession");
-
- // Ok lets see what happens
- try (final WebKafkaConsumer webKafkaConsumer = factory.createWebClient(view, filterDefinitions, sessionId)) {
- // Validate we got something back
- assertNotNull(webKafkaConsumer);
-
- // Consume everything
- final List results = consumeAllResults(webKafkaConsumer, resultsPerPartition);
- assertEquals("Should have 10 records", 10, results.size());
-
- // Validate is from partition 1 only
- for (final KafkaResult kafkaResult: results) {
- assertEquals("Should be from partition 1", 1, kafkaResult.getPartition());
- }
- }
- }
-
private List consumeAllResults(
final WebKafkaConsumer webKafkaConsumer,
final int resultsPerPartition) {
@@ -259,7 +240,25 @@ private WebKafkaConsumerFactory createDefaultFactory() {
deserializerPluginFactory,
filterPluginFactoryPluginFactory,
secretManager,
- kafkaConsumerFactory
+ kafkaConsumerFactory,
+ null
+ );
+ }
+
+ private WebKafkaConsumerFactory createMultiThreadedFactory() {
+ final PluginFactory deserializerPluginFactory = new PluginFactory<>("not/used", Deserializer.class);
+ final PluginFactory filterPluginFactoryPluginFactory = new PluginFactory<>("not/used", RecordFilter.class);
+ final SecretManager secretManager = new SecretManager("Passphrase");
+ final KafkaConsumerFactory kafkaConsumerFactory = new KafkaConsumerFactory(
+ new KafkaClientConfigUtil("not/used", "MyPrefix")
+ );
+
+ return new WebKafkaConsumerFactory(
+ deserializerPluginFactory,
+ filterPluginFactoryPluginFactory,
+ secretManager,
+ kafkaConsumerFactory,
+ executorService
);
}
diff --git a/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/WebKafkaConsumerTest.java b/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/WebKafkaConsumerTest.java
index 582d335c..fee22c63 100644
--- a/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/WebKafkaConsumerTest.java
+++ b/kafka-webview-ui/src/test/java/org/sourcelab/kafka/webview/ui/manager/kafka/WebKafkaConsumerTest.java
@@ -84,7 +84,7 @@ public void doTest() {
final KafkaConsumer kafkaConsumer = kafkaConsumerFactory.createConsumerAndSubscribe(clientConfig);
// Create consumer
- final WebKafkaConsumer webKafkaConsumer = new WebKafkaConsumer(kafkaConsumer, clientConfig);
+ final WebKafkaConsumer webKafkaConsumer = new DefaultWebKafkaConsumer(kafkaConsumer, clientConfig);
// Poll
final KafkaResults results = webKafkaConsumer.consumePerPartition();
diff --git a/pom.xml b/pom.xml
index fc19c365..e251cd83 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,7 +7,7 @@
org.sourcelab
kafka-webview
pom
- 2.2.0
+ 2.3.0