Send-операции Kafka#
Важно:
import static org.apache.kafka.clients.producer.ProducerConfig.MAX_REQUEST_SIZE_CONFIG;
import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.send.KafkaSendRecordsActionSupplier.producerRecord;
public class MyTest {
@Test
public void myTest1() {
kafka().send(producerRecord(
new SomeTypeSerializer(), //сериализатор для значения
//оправляемого сообщения
someTypeInstance) //Значение отправляемого сообщения
//Можно указать топик, в который следует отправить сообщение.
//Если значение свойства DEFAULT_TOPIC_FOR_SEND непустое,
//и нет необходимости указывать другой топик для отправки,
//то метод можно не вызывать
.topic("testTopic")
//Метод вызывается, когда нужно передать кол-бэк,
//и он отличается от того, который полуют при помощи
//свойства KAFKA_CALL_BACK
.callback(customCallBack)
//Можно указать свойства и их значения,
//которые могут дополнить/заменить свойства,
//получаемые при помощи KAFKA_PRODUCER_PROPERTIES
.setProperty(MAX_REQUEST_SIZE_CONFIG, requestSizeConf)
//по умолчанию считается, что ключ сообщения
//будет оправлен в виде текста, и ключ пустой
.setKey("Some String") //можно указать ключ в виде объекта,
.setKey(someKeyInstance, new SomeKeyTypeSerializer()) //либо
//в виде объекта вместе с сериализатором,
// если тип нового объекта-ключа отличается от предыдущего
//типа объектов-ключей.
//
//Опционально
.partition(1)
//опционально
.timestamp(10L)
//опционально можно передать заголовки
.header("Header key", "Value1") //в разных форматах
.header(new RecordHeader("Header key2", "Value2".getBytes()))
);
}
}
Ниже упрощенный пример, если надо оправить сообщение с текстовым значением
import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.send.KafkaSendRecordsActionSupplier.producerRecord;
public class MyTest {
@Test
public void myTest1() {
kafka().send(producerRecord("I'm a String!")
//опциональные уточняющие
//параметры те же самые, что в примере выше
);
}
}