Skip to content

Commit e6c903a

Browse files
vmtuan12ebyhr
authored andcommitted
Support max.request.size and batch.size config in Kafka Event Listener
1 parent 5b81408 commit e6c903a

File tree

4 files changed

+50
-1
lines changed

4 files changed

+50
-1
lines changed

docs/src/main/sphinx/admin/event-listeners-kafka.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,13 @@ Use the following properties for further configuration.
9696
distinction in Kafka, if multiple Trino clusters send events to the same
9797
Kafka system.
9898
-
99+
* - `kafka-event-listener.max-request-size`
100+
- [Size value](prop-type-data-size) that specifies the maximum request size the Kafka producer can send;
101+
messages exceeding this size will fail.
102+
- `5MB`
103+
* - `kafka-event-listener.batch-size`
104+
- [Size value](prop-type-data-size) that specifies the size to batch before sending records to Kafka.
105+
- `16KB`
99106
* - `kafka-event-listener.publish-created-event`
100107
- [Boolean](prop-type-boolean) switch to control publishing of query creation
101108
events.

plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/KafkaEventListenerConfig.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import io.airlift.configuration.ConfigDescription;
2020
import io.airlift.configuration.DefunctConfig;
2121
import io.airlift.configuration.validation.FileExists;
22+
import io.airlift.units.DataSize;
2223
import io.airlift.units.Duration;
2324
import io.airlift.units.MinDuration;
2425
import jakarta.validation.constraints.AssertTrue;
@@ -33,6 +34,8 @@
3334

3435
import static com.google.common.collect.ImmutableList.toImmutableList;
3536
import static com.google.common.collect.ImmutableSet.toImmutableSet;
37+
import static io.airlift.units.DataSize.Unit.KILOBYTE;
38+
import static io.airlift.units.DataSize.Unit.MEGABYTE;
3639
import static java.util.Objects.requireNonNull;
3740
import static java.util.concurrent.TimeUnit.SECONDS;
3841

@@ -48,6 +51,8 @@ public class KafkaEventListenerConfig
4851
private Optional<String> splitCompletedTopicName = Optional.empty();
4952
private String brokerEndpoints;
5053
private Optional<String> clientId = Optional.empty();
54+
private DataSize maxRequestSize = DataSize.of(5, MEGABYTE); // Greater than default value because the size of completed events are quite large
55+
private DataSize batchSize = DataSize.of(16, KILOBYTE); // Default value of batch.size
5156
private Set<String> excludedFields = Collections.emptySet();
5257
private Duration requestTimeout = new Duration(10, SECONDS);
5358
private boolean terminateOnInitializationFailure = true;
@@ -91,6 +96,32 @@ public KafkaEventListenerConfig setClientId(String clientId)
9196
return this;
9297
}
9398

99+
public DataSize getMaxRequestSize()
100+
{
101+
return maxRequestSize;
102+
}
103+
104+
@ConfigDescription("The maximum size of a request/message in bytes")
105+
@Config("kafka-event-listener.max-request-size")
106+
public KafkaEventListenerConfig setMaxRequestSize(DataSize maxRequestSize)
107+
{
108+
this.maxRequestSize = maxRequestSize;
109+
return this;
110+
}
111+
112+
public DataSize getBatchSize()
113+
{
114+
return batchSize;
115+
}
116+
117+
@ConfigDescription("Value that specifies the size to batch before sending records to Kafka")
118+
@Config("kafka-event-listener.batch-size")
119+
public KafkaEventListenerConfig setBatchSize(DataSize batchSize)
120+
{
121+
this.batchSize = batchSize;
122+
return this;
123+
}
124+
94125
public Optional<String> getCompletedTopicName()
95126
{
96127
return completedTopicName;

plugin/trino-kafka-event-listener/src/main/java/io/trino/plugin/eventlistener/kafka/producer/BaseKafkaProducerFactory.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@ protected Map<String, Object> baseConfig(KafkaEventListenerConfig config)
3939
kafkaClientConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
4040
kafkaClientConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
4141
kafkaClientConfig.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "zstd");
42-
kafkaClientConfig.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, "5242880");
42+
kafkaClientConfig.put(ProducerConfig.MAX_REQUEST_SIZE_CONFIG, Long.toString(config.getMaxRequestSize().toBytes()));
43+
kafkaClientConfig.put(ProducerConfig.BATCH_SIZE_CONFIG, Long.toString(config.getBatchSize().toBytes()));
4344
kafkaClientConfig.put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, Long.toString(config.getRequestTimeout().toMillis()));
4445
return kafkaClientConfig;
4546
}

plugin/trino-kafka-event-listener/src/test/java/io/trino/plugin/eventlistener/kafka/TestKafkaEventListenerConfig.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
import com.google.common.collect.ImmutableList;
1818
import com.google.common.collect.ImmutableMap;
19+
import io.airlift.units.DataSize;
1920
import io.airlift.units.Duration;
2021
import org.junit.jupiter.api.Test;
2122

@@ -30,6 +31,9 @@
3031
import static io.airlift.configuration.testing.ConfigAssertions.assertFullMapping;
3132
import static io.airlift.configuration.testing.ConfigAssertions.assertRecordedDefaults;
3233
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
34+
import static io.airlift.units.DataSize.Unit.BYTE;
35+
import static io.airlift.units.DataSize.Unit.KILOBYTE;
36+
import static io.airlift.units.DataSize.Unit.MEGABYTE;
3337
import static org.assertj.core.api.Assertions.assertThat;
3438

3539
final class TestKafkaEventListenerConfig
@@ -46,6 +50,8 @@ void testDefaults()
4650
.setCreatedTopicName(null)
4751
.setSplitCompletedTopicName(null)
4852
.setBrokerEndpoints(null)
53+
.setMaxRequestSize(DataSize.of(5, MEGABYTE))
54+
.setBatchSize(DataSize.of(16, KILOBYTE))
4955
.setClientId(null)
5056
.setExcludedFields(Set.of())
5157
.setRequestTimeout(new Duration(10, TimeUnit.SECONDS))
@@ -66,6 +72,8 @@ void testExplicitPropertyMappings()
6672
.put("kafka-event-listener.publish-completed-event", "false")
6773
.put("kafka-event-listener.publish-split-completed-event", "true")
6874
.put("kafka-event-listener.broker-endpoints", "kafka-host-1:9093,kafka-host-2:9093")
75+
.put("kafka-event-listener.max-request-size", "1048576B")
76+
.put("kafka-event-listener.batch-size", "81920B")
6977
.put("kafka-event-listener.created-event.topic", "query_created")
7078
.put("kafka-event-listener.completed-event.topic", "query_completed")
7179
.put("kafka-event-listener.split-completed-event.topic", "split_completed")
@@ -84,6 +92,8 @@ void testExplicitPropertyMappings()
8492
.setPublishCompletedEvent(false)
8593
.setPublishSplitCompletedEvent(true)
8694
.setBrokerEndpoints("kafka-host-1:9093,kafka-host-2:9093")
95+
.setMaxRequestSize(DataSize.of(1048576, BYTE))
96+
.setBatchSize(DataSize.of(81920, BYTE))
8797
.setCreatedTopicName("query_created")
8898
.setCompletedTopicName("query_completed")
8999
.setSplitCompletedTopicName("split_completed")

0 commit comments

Comments
 (0)