Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,27 @@
The format is based on [Keep a Changelog](http://keepachangelog.com/)
and this project adheres to [Semantic Versioning](http://semver.org/).

## 2.3.0 (UNRELEASED)
#### New Features
- [MultiThreaded Consumer](https://github.com/SourceLabOrg/kafka-webview/pull/170) Add multi-threaded kafka consumer.

Previously a single consumer instance was used when paging through messages from a topic. Each partition was consumed sequentially in order to provide consistent results on each page. For topics with a large number of partitions this could take considerable time.

The underlying consumer implementation has been replaced with a multi-threaded version which will attempt to read each partition in parallel. The following configuration properties have been added to control this behavior:

```yml
app:
## Enable multi-threaded consumer support
## The previous single-threaded implementation is still available by setting this property to false.
## The previous implementation along with this property will be removed in future release.
multiThreadedConsumer: true

## Sets upper limit on the number of concurrent consumers (non-websocket) supported.
maxConcurrentWebConsumers: 32
```

If you run into issues, you can disable the new implementation and revert to the previous behavior by setting the `multiThreadedConsumer` property to `false`.

## 2.2.0 (03/20/2019)

#### Bug fixes
Expand Down
12 changes: 10 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,14 @@ app:

## Defines a prefix prepended to the Id of all consumers.
consumerIdPrefix: "KafkaWebViewConsumer"

## Enable multi-threaded consumer support
## The previous single-threaded implementation is still available by setting this property to false.
## The previous implementation along with this property will be removed in future release.
multiThreadedConsumer: true

## Sets upper limit on the number of concurrent consumers (non-websocket) supported.
maxConcurrentWebConsumers: 32

## Sets upper limit on the number of concurrent web socket consumers supported.
maxConcurrentWebSocketConsumers: 64
Expand Down Expand Up @@ -299,8 +307,8 @@ implementations.
# Releasing
Steps for performing a release:

1. Update release version: mvn versions:set -DnewVersion=X.Y.Z
2. Validate and then commit version: mvn versions:commit
1. Update release version: `mvn versions:set -DnewVersion=X.Y.Z`
2. Validate and then commit version: `mvn versions:commit`
3. Update CHANGELOG and README files.
4. Merge to master.
5. Deploy to Maven Central: mvn clean deploy -P release-kafka-webview
Expand Down
4 changes: 2 additions & 2 deletions dev-cluster/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,12 @@
<parent>
<artifactId>kafka-webview</artifactId>
<groupId>org.sourcelab</groupId>
<version>2.2.0</version>
<version>2.3.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>dev-cluster</artifactId>
<version>2.2.0</version>
<version>2.3.0</version>

<!-- Require Maven 3.3.9 -->
<prerequisites>
Expand Down
2 changes: 1 addition & 1 deletion kafka-webview-plugin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<groupId>org.sourcelab</groupId>
<artifactId>kafka-webview</artifactId>
<version>2.2.0</version>
<version>2.3.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-webview-plugin</artifactId>
Expand Down
4 changes: 2 additions & 2 deletions kafka-webview-ui/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
<parent>
<artifactId>kafka-webview</artifactId>
<groupId>org.sourcelab</groupId>
<version>2.2.0</version>
<version>2.3.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>kafka-webview-ui</artifactId>
<version>2.2.0</version>
<version>2.3.0</version>

<!-- Module Description and Ownership -->
<name>Kafka WebView UI</name>
Expand Down
8 changes: 8 additions & 0 deletions kafka-webview-ui/src/assembly/distribution/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,14 @@ app:
## Defines a prefix prepended to the Id of all consumers.
consumerIdPrefix: "KafkaWebViewConsumer"

## Enable multi-threaded consumer support
## The previous single-threaded implementation is still available by setting this property to false.
## The previous implementation along with this property will be removed in future release.
multiThreadedConsumer: true

## Sets upper limit on the number of concurrent consumers (non-websocket) supported.
maxConcurrentWebConsumers: 32

## Sets upper limit on the number of concurrent web socket consumers supported.
maxConcurrentWebSocketConsumers: 64

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,13 @@ public class AppProperties {
@Value("${app.key}")
private String appKey;

@Value("${app.maxConcurrentWebSocketConsumers}")
@Value("${app.multiThreadedConsumer:true}")
private boolean enableMultiThreadedConsumer;

@Value("${app.maxConcurrentWebConsumers:32}")
private Integer maxConcurrentWebConsumers = 32;

@Value("${app.maxConcurrentWebSocketConsumers:100}")
private Integer maxConcurrentWebSocketConsumers = 100;

@Value("${app.consumerIdPrefix}")
Expand Down Expand Up @@ -104,6 +110,14 @@ public boolean isAvroIncludeSchema() {
return avroIncludeSchema;
}

public boolean isEnableMultiThreadedConsumer() {
return enableMultiThreadedConsumer;
}

public Integer getMaxConcurrentWebConsumers() {
return maxConcurrentWebConsumers;
}

@Override
public String toString() {
return "AppProperties{"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,11 @@
package org.sourcelab.kafka.webview.ui.configuration;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.hubspot.jackson.datatype.protobuf.ProtobufModule;
import org.apache.kafka.common.serialization.Deserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sourcelab.kafka.webview.ui.manager.encryption.SecretManager;
import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaAdminFactory;
import org.sourcelab.kafka.webview.ui.manager.kafka.KafkaClientConfigUtil;
Expand All @@ -42,11 +45,15 @@
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
* Application Configuration for Plugin beans.
*/
@Component
public class PluginConfig {
private static final Logger logger = LoggerFactory.getLogger(PluginConfig.class);

/**
* Upload manager, for handling uploads of Plugins and Keystores.
Expand Down Expand Up @@ -97,11 +104,30 @@ public SecretManager getSecretManager(final AppProperties appProperties) {
*/
@Bean
public WebKafkaConsumerFactory getWebKafkaConsumerFactory(final AppProperties appProperties, final KafkaClientConfigUtil configUtil) {
final ExecutorService executorService;

// If we have multi-threaded consumer option enabled
if (appProperties.isEnableMultiThreadedConsumer()) {
logger.info("Enabled multi-threaded webconsumer with {} threads.", appProperties.getMaxConcurrentWebConsumers());

// Create fixed thread pool
executorService = Executors.newFixedThreadPool(
appProperties.getMaxConcurrentWebConsumers(),
new ThreadFactoryBuilder()
.setNameFormat("kafka-web-consumer-pool-%d")
.build()
);
} else {
// Null reference.
executorService = null;
}

return new WebKafkaConsumerFactory(
getDeserializerPluginFactory(appProperties),
getRecordFilterPluginFactory(appProperties),
getSecretManager(appProperties),
getKafkaConsumerFactory(configUtil)
getKafkaConsumerFactory(configUtil),
executorService
);
}

Expand Down
Loading