Kafka¶
Подключение¶
implementation 'ru.tinkoff.kora:kafka'
Контейнер для KafkaConsumer¶
Kora предоставляет небольшую обёртку над KafkaConsumer
, позволяющую легко запустить обработку входящих событий.
Конструктор контейнера выглядит следующим образом:
public KafkaSubscribeConsumerContainer(KafkaConsumerConfig config,
Deserializer<K> keyDeserializer,
Deserializer<V> valueDeserializer,
BaseKafkaRecordsHandler<K, V> handler) {
this.factory = new KafkaConsumerFactory<>(config);
this.handler = handler;
this.keyDeserializer = keyDeserializer;
this.valueDeserializer = valueDeserializer;
this.config = config;
}
BaseKafkaRecordsHandler<K,V>
это базовый функциональный интерфейс обработчика:
package ru.tinkoff.kora.kafka.common.consumer.containers.handlers;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
@FunctionalInterface
public interface BaseKafkaRecordsHandler<K, V> {
void handle(ConsumerRecords<K, V> records, KafkaConsumer<K, V> consumer);
}
KafkaConsumerConfig
- обёртка над используемым KafkaConsumer Properties
:
public record KafkaConsumerConfig(
Properties driverProperties,
@Nullable List<String> topics,
@Nullable Pattern topicsPattern,
Either<Duration, String> offset,
Duration pollTimeout,
int threads
) {
}
- driverProperties -
Properties
из официального клиента кафки, документацию по ним можно посмотреть по ссылке: https://kafka.apache.org/documentation/#consumerconfigs - topics - список топиков на которые нужно подписаться, через запятую
- topicsPattern - регулярка, по которой можно подписаться на топики.
- offset - стратегия, которую нужно применить при подключении через assign.
Допустимые значениеearliest
- перейти на самый ранний доступный offset,latest
- перейти на последний доступный offset, строка в форматеDuration
, например5m
- сдвиг на определённое время назад - pollTimeout - таймаут для poll(), значение по умолчанию - 5 секунд
- threads - количество потоков, выделенных на консюмеры. 1 поток = 1 консюмер группы, значение по умолчанию - 1
Пример конфигурации для подписки на топики:
kafka {
first {
pollTimeout: 3s
topics: "first,second,third"
driverProperties {
"bootstrap.servers": "localhost:9092"
"group.id": "some_consumer_group"
}
}
}
Пример конфигурации для подключения к топикам без группы. В этом примере консьюмер будет подключен ко всем партициям в топике и офсет сдвинут на 10 минут назад.
kafka {
first {
pollTimeout: 3s,
topics: "first",
offset: 10m
driverProperties {
"bootstrap.servers": "localhost:9092"
}
}
}
Генерация по аннотации¶
В большинстве случаев проще всего будет воспользоваться аннотацией @KafkaListener
, например, как в коде ниже:
@Component
final class Consumers {
@KafkaListener("kafka.first")
void processRecord(ConsumerRecord<String, String> record) {
//some handler code
}
}
На этапе компиляции будет сгенерирован модуль ConsumersModule
, отмеченный аннотацией @Module
(подробнее про это можно почитать здесь),
Пример сгенерированного модуля:
@Module
public interface ConsumersModule {
default KafkaConsumerContainer<String, String> processRecord(
Consumers _controller,
KafkaConsumerConfig _consumerConfig,
Deserializer<String> keyDeserializer, Deserializer<String> valueDeserializer) {
return new KafkaSubscribeConsumerContainer<>(
_consumerConfig,
keyDeserializer,
valueDeserializer,
HandlerWrapper.wrapHandler(_controller::processRecordWithConsumer)
);
}
}
HandlerWrapper
приводит контроллеры к базовому BaseKafkaRecordsHandler
, при необходимости добавляя автоматический коммит.
Так как KafkaConsumerContainer
является реализацией Lifecycle
, при запуске он будет инициализирован. В данном случае - подпишется на указанные топики и запустит poll loop с вызовом обработчика.
Подробнее про компоненты с жизненным циклом можно прочитать в соответствующем разделе документации.
Конфигурирование консюмеров¶
В случае, если нужно разное поведение для разных топиков, существует возможность создавать несколько подобных контейнеров, каждый со своим индивидуальным конфигом. Выглядит это примерно так:
@Component
final class Consumers {
@KafkaListener("kafka.first")
void processRecord(ConsumerRecord<String, String> record) {
//some handler code
}
@KafkaListener("kafka.other")
void processRecords(ConsumerRecords<String, String> records, Consumer<String,String> consumer) {
//some handler code
consumer.commitAsync();
}
}
Значение в аннотации указывает, из какой части файла конфигурации нужно брать настройки. В том, что касается получения конфигурации — работает аналогично @ConfigSource
Поддерживаемые сигнатуры:¶
@KafkaListener("kafka.first")
void processRecordsWithConsumer(ConsumerRecords<String, CustomEvent> records, Consumer<String, CustomEvent> consumer) {}
Принимает ConsumerRecords
и Consumer
, коммитить оффсет нужно вручную.
@KafkaListener("kafka.first")
void processRecordWithConsumer(ConsumerRecord<String, String> records, Consumer<String, String> consumer) {}
Принимает ConsumerRecord
и Consumer
. Как и в предыдущем случае, commit
нужно вызывать вручную. Вызывается для каждого ConsumerRecord
полученного при вызове poll()
@KafkaListener("kafka.first"
void processRecords(ConsumerRecords<String, String> records) {}
Принимает ConsumerRecords
, после вызова обработчика вызывается commitSync()
.
@KafkaListener("kafka.first")
void processRecord(ConsumerRecord<String, String> record) {}
Принимает ConsumerRecord
, после обработки всех ConsumerRecord
вызывается commitSync()
.
@KafkaListener("kafka.first")
void processValue(CustomEvent value) {}
Принимает ConsumerRecord.value
, после обработки всех событий вызывается commitSync()
.
@KafkaListener("kafka.first")
void processKeyValue(String key, CustomEvent value) {}
То же, что и предыдущий кейс, но добавляется key из ConsumerRecord
Исключения в обработчике¶
Если метод помеченный @KafkaListener
выбросит исключение, то Consumer будет перезапущен, потому что нет общего решения, как реагировать на это и разработчик должен сам решить как эту ситуацию
обрабатывать.
Обработка ошибок десериализации¶
Если вы используете сигнатуру с ConsumerRecord
или ConsumerRecords
, то вы получите исключение десериализации значения в момент вызова методов key
или value
.
В этот момент стоит его обработать нужным вам образом.
Выбрасываются следующие исключения:
ru.tinkoff.kora.kafka.common.exceptions.RecordKeyDeserializationException
ru.tinkoff.kora.kafka.common.exceptions.RecordValueDeserializationException
Из этих исключений можно получить сырой ConsumerRecord<byte[], byte[]>
Если вы используете сигнатуру с распакованными key
/value
, то можно добавить последним аргументом Exception
, Throwable
, RecordKeyDeserializationException
или RecordValueDeserializationException
.
@KafkaListener("kafka.first")
public void process(@Nullable String key, @Nullable String value, @Nullable Exception exception) {
if(exception!=null){
//handle exception
}else{
//handle key/value
}
}
Обратите внимание, что все аргументы становятся необязательными, то есть мы ожидаем что у нас либо будут ключ и значение, либо исключение
Настройка key/value deserializer¶
Для более точной настройки десериализаторов поддерживаются теги.
Теги можно установить на параметре-ключе, параметре-значении, а так же на параметрах типа ConsumerRecord
и ConsumerRecords
.
Эти теги будут установлены на зависимостях контейнера.
Примеры:
@KafkaListener("kafka.first")
void process1(@Tag(Sometag1.class) String key,@Tag(Sometag2.class) String value){}
@KafkaListener("kafka.first")
void process2(ConsumerRecord<@Tag(Sometag1.class) String, @Tag(Sometag2.class) String> record){}
@KafkaListener("kafka.first")
void process2(ConsumerRecords<@Tag(Sometag1.class) String, @Tag(Sometag2.class) String> record){}
Прочее¶
Для обработчиков, не использующих ключ, по умолчанию используется Deserializer<byte[]>
т.к. он просто возвращает не обработанные байты.
Контейнер для KafkaProducer¶
Для использования KafkaProducer
в коде их необходимо объявить.
Создается продюсер с помощью интерфейса:
@ru.tinkoff.kora.kafka.common.annotation.KafkaPublisher("kafka.producer.config")
public interface MyKafkaProducer extends org.apache.kafka.clients.producer.Producer<byte[], byte[]> {
}
По такому интерфейсу будет сгенерирован модуль, содержащий:
ru.tinkoff.kora.kafka.common.producer.PublisherConfig
, собранный из конфигурации по пути "kafka.producer.config"- реализацию
MyKafkaProducer
, делегирующую во внутреннийKafkaProducer
, с добавлением телеметрии KafkaProducer
с тегом@Tag(MyKafkaProducer.class)
, если нужен именно этот класс. При использовании будет отсутствовать телеметрия
Транзакции¶
Если необходимо использовать транзакции с kafka, следует наследовать интерфейс ru.tinkoff.kora.kafka.common.producer.TransactionalProducer
.
Он предоставляет один метод begin
, который следует использовать в блоке try-with-resources:
@ru.tinkoff.kora.kafka.common.annotation.KafkaPublisher("kafka.producer.config")
public interface MyKafkaProducer extends ru.tinkoff.kora.kafka.common.producer.TransactionalProducer<byte[], byte[]> {
}
void someTransactionalMethod() {
try (var producer = this.myKafkaProducer.begin()) {
producer.send(record);
if (something) {
producer.abortTransaction();
} else {
producer.commitTransaction();// will be called on try-with-resources close
}
}
}
Управление Serializer¶
Для уточнения какой Serializer
взять из контейнера есть возможность использовать теги.
Теги необходимо устанавливать на параметры TransactionalProducer
или Producer
:
@ru.tinkoff.kora.kafka.common.annotation.KafkaPublisher("kafka.producer.config")
public interface MyKafkaProducer extends org.apache.kafka.clients.producer.Producer<@Tag(MyTag1.class) byte[], @Tag(MyTag2.class) byte[]> {
}