Получение сообщений Kafka и их данных#
Важно:
Получение сообщений Kafka#
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.GetRecordSupplier.consumerRecords;
public class MyTest {
@Test
public void myTest1() {
List<ConsumerRecord<SomeKeyType, SomeValueType>> records
= kafka().poll(consumerRecords(
new SomeKeyTypeDeserializer(), //десериализатор для ключа
new SomeValueTypeDeserializer()) //десериализатор для значения
//Можно перечислить топики, из которых следует прочесть
//сообщения.
//Если значение свойства DEFAULT_TOPICS_FOR_POLL непустое,
//и нет необходимости указывать другие топики для чтения,
//то метод можно не вызывать
.fromTopics("testTopic", "testTopic2")
//Можно указать, чтобы в результирующий лист не попадали
//сообщения с пустыми значениями.
//По умолчанию в результат попадают все сообщения.
//Пустые значения могут быть из-за того, что они либо в самом
//деле пустые, либо для каких-то сообщений указанный
//десериализатор значения не подходит
.excludeWithNullValues()
//Можно указать, чтобы в результирующий лист не попадали
//сообщения с пустыми ключами.
//По умолчанию в результат попадают все сообщения.
//Пустые ключи могут быть из-за того, что они либо в самом
//деле пустые, либо для каких-то сообщений указанный
//десериализатор ключа не подходит
.excludeWithNullKeys()
//Можно указать свойства и их значения,
//которые могут дополнить/заменить свойства,
//получаемые при помощи KAFKA_CONSUMER_PROPERTIES.
//ВНИМАНИЕ!!!!! Нельзя указывать свойство `auto.offset.reset`.
// Это приведет к IllegalArgumentException
.setProperty(GROUP_ID_CONFIG, "someGroupId")
//Можно указать действие, с момента начала выполнения которого
//начнется чтение сообщений из топиков. Предполагается, что прочитанные сообщения -
//результат указанного действия. При этом свойство `auto.offset.reset` текущего консъюмера
//меняется на `latest`. Если действие не указывать, будут прочитаны все сообщения из топиков,
//и значение свойства `auto.offset.reset` текущего консъюмера равно `earliest`
.pollLatestWith("Название или описание действия", () -> {
//тут можно выполнять шаги из других контекстов
//или другой исполняемый код.
})
//прочие опции, уточняющие результат
);
}
}
Уточняющие параметры шага, который возвращает лист. Шаг предусматривает время на ожидание
//Можно указать один или несколько критериев,
//которым должен соответствовать
//каждый элемент результирующего листа.
// Так же доступны criteriaOr(criteria...),
// criteriaOnlyOne(criteria...)
// criteriaNot(criteria...)
.criteria("Описание критерия, которому должен соответствовать " +
"каждый элемент, который попадет в результирующий лист", item -> {
/*предикат, как работает критерий*/
})
//-------------------------------------------
// можно указать время,
// за которое нужные элементы должны быть получены
.timeOut(ofSeconds(5))
//-------------------------------------------
//Можно указать, что должно быть выброшено исключение,
// если непустой лист не удалось получить (не было ни одного элемента,
//который бы соответствовал перечисленным критериям,
// или непустой лист просто не был получен)
.throwOnNoResult()
//ТАКЖЕ ЕСТЬ СЛЕДУЮЩИЕ ОПЦИИ:
//-------------------------------------------
//1. можно указать сколько объектов,
//которые соответствуют критериям,
//нужно вернуть
.returnListOfSize(3)
//-------------------------------------------
//2. можно указать, до элемента с каким индексом
//нужно собрать результирующие элементы,
//индекс - индекс объекта в наборе элементов,
//которые соответствуют критериям
.returnBeforeIndex(7)
//.returnAfterIndex(8) либо после какого элемента
//----------------------------------------------
//3. Либо можно перечислить индексы элементов,
// которые следует вернуть.
//Индексы - индексы объектов в наборе элементов,
//которые соответствуют критериям
.returnItemsOfIndexes(0, 3, 5)
//-----------------------------------------------
//4. можно указать, при достижении какого количества
//ВСЕХ объектов, которые соответствуют критериям,
//должен быть возвращен результат
.returnIfEntireSize(isEqual(8))
//----------------------------------------------
//5. можно указать, при достижении каких условий,
//которым должен соответствовать лист ВСЕХ объектов,
//соответствующих критериям,
//можно возвращать результирующий лист/суб-лист
.returnOnCondition("Описание условия", list -> {
/*предикат, как работает критерий*/
})
//так же доступны returnOnConditionOr(criteria...),
// returnOnConditionOnlyOne(criteria...)
// returnOnConditionNot(criteria...)
//------------------------------------------
//Если не нашлось столько подходящих объектов, чтобы вернуть результат,
//или весь суб-лист подходящих по критериям элементов,
// который должен вернуться как результат,
// не соответствует каким-то критериям -
// будет выброшено исключение с подробным описанием
Если не нужно использовать десериализаторы, то пример ниже
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.GetRecordSupplier.consumerRecords;
public class MyTest {
@Test
public void myTest1() {
List<ConsumerRecord<String, String>> records
= kafka().poll(consumerRecords()
//набор дополнительных опций тот же самый,
//см пример выше
);
}
}
Получение данных из выборки сообщений Kafka#
Данные сообщений Kafka, собранные в лист#
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.GetRecordSupplier.consumerRecords;
public class MyTest {
@Test
public void myTest1() {
List<SomeData> someDataList
= kafka().poll(consumerRecords(//параметры
)
//опции, уточняющие результат
//
.thenGetList(
"Описание / название результирующего листа", //описание того ЧТО следует получить,
//в свободной форме или бизнес
//терминологии
consumerRecord -> {
//Алгоритм функции, которая получает данные
//из каждого сообщения
return someDataObject; //эти объекты
//будут собраны в результирующий лист
})
//опции, уточняющие результат
);
}
}
Уточняющие параметры шага, который возвращает лист. Шаг не предусматривает время на ожидание
//Можно указать один или несколько критериев,
//которым должен соответствовать
//каждый элемент результирующего листа.
// Так же доступны criteriaOr(criteria...),
// criteriaOnlyOne(criteria...)
// criteriaNot(criteria...)
.criteria("Описание критерия, которому должен соответствовать " +
"каждый элемент, который попадет в результирующий лист", item -> {
/*предикат, как работает критерий*/
})
//-------------------------------------------
//Можно указать, что должно быть выброшено исключение,
// если непустой лист не удалось получить (не было ни одного элемента,
//который бы соответствовал перечисленным критериям,
// или непустой лист просто не был получен)
.throwOnNoResult()
//ТАКЖЕ ЕСТЬ СЛЕДУЮЩИЕ ОПЦИИ:
//-------------------------------------------
//1. можно указать сколько объектов,
//которые соответствуют критериям,
//нужно вернуть
.returnListOfSize(3)
//-------------------------------------------
//2. можно указать, до элемента с каким индексом
//нужно собрать результирующие элементы,
//индекс - индекс объекта в наборе элементов,
//которые соответствуют критериям
.returnBeforeIndex(7)
//.returnAfterIndex(8) либо после какого элемента
//----------------------------------------------
//3. Либо можно перечислить индексы элементов,
// которые следует вернуть.
//Индексы - индексы объектов в наборе элементов,
//которые соответствуют критериям
.returnItemsOfIndexes(0, 3, 5)
//-------------------------------------------
//4. можно указать, при достижении какого количества
//ВСЕХ объектов, которые соответствуют критериям,
//должен быть возвращен результат
.returnIfEntireSize(isEqual(8))
//-------------------------------------------
//5. можно указать, при достижении каких условий,
//которым должен соответствовать лист ВСЕХ объектов,
//соответствующих критериям,
//можно возвращать результирующий лист/суб-лист
.returnOnCondition("Описание условия", list -> {
/*предикат, как работает критерий*/
})
//так же доступны returnOnConditionOr(criteria...),
// returnOnConditionOnlyOne(criteria...)
// returnOnConditionNot(criteria...)
//------------------------------------------
//Если не нашлось столько подходящих объектов, чтобы вернуть результат,
//или весь суб-лист подходящих по критериям элементов,
// который должен вернуться как результат,
// не соответствует каким-то критериям -
// будет выброшено исключение с подробным описанием
Объект данных из сообщений Kafka#
import java.util.List;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.GetRecordSupplier.consumerRecords;
public class MyTest {
@Test
public void myTest1() {
SomeData someData
= kafka().poll(consumerRecords(//параметры
)
//опции, уточняющие результат
//
.thenGetItem(
"Описание / название результирующего объекта", //описание того
// ЧТО следует получить,
//в свободной форме или бизнес
//терминологии
consumerRecord -> {
//Алгоритм функции, которая получает данные
//из каждого сообщения
return someDataObject; //один из этих
//объектов возвращается как результат,
//первый попавшийся или с учетом уточнений
})
//опции, уточняющие результат
);
}
}
Уточняющие параметры шага, который возвращает элемент Iterable>. Шаг не предусматривает время на ожидание
//Можно указать один или несколько критериев,
//которым должен соответствовать
//результирующий элемент из набора.
// Так же доступны criteriaOr(criteria...),
// criteriaOnlyOne(criteria...)
// criteriaNot(criteria...)
.criteria("Описание критерия, которому должен соответствовать " +
"результирующий элемент из набора", item -> {
/*предикат, как работает критерий*/
})
//------------------------------------------
//Можно указать, что должно быть выброшено исключение,
// если не был получен результирующий элемент (не было ни одного элемента,
//который бы соответствовал перечисленным критериям,
// или непустой набор просто не был получен)
.throwOnNoResult()
//ТАКЖЕ ЕСТЬ СЛЕДУЮЩИЕ ОПЦИИ:
//------------------------------------------
//1. Можно указать индекс элемента, который следует вернуть.
//Индекс - индекс объекта в наборе элементов,
//которые соответствуют критериям
.returnItemOfIndex(1)
//-----------------------------------------------
//2. можно указать, при достижении какого количества
//ВСЕХ объектов, которые соответствуют критериям,
//должен быть возвращен результат
.returnIfEntireSize(isEqual(8))
//------------------------------------------
//3. можно указать, при достижении каких условий,
//которым должен соответствовать набор ВСЕХ объектов,
//соответствующих критериям,
//можно возвращать результирующий элемент
.returnOnCondition("Описание условия", iterable -> {
/*предикат, как работает критерий*/
})
//так же доступны returnOnConditionOr(criteria...),
// returnOnConditionOnlyOne(criteria...)
// returnOnConditionNot(criteria...)
//------------------------------------------
//Если не нашлось столько подходящих объектов, чтобы вернуть результат,
//или весь набор подходящих по критериям элементов,
// из которого должен быть взят результат,
// не соответствует каким-то критериям -
// будет выброшено исключение с подробным описанием