Получение сообщений Kafka и их данных#

Важно:

Получение сообщений Kafka#

import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;

import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.GetRecordSupplier.consumerRecords;

public class MyTest {

    @Test
    public void myTest1() {
        List<ConsumerRecord<SomeKeyType, SomeValueType>> records
            = kafka().poll(consumerRecords(
                new SomeKeyTypeDeserializer(), //десериализатор для ключа
                new SomeValueTypeDeserializer()) //десериализатор для значения
                //Можно перечислить топики, из которых следует прочесть 
                //сообщения.
                //Если значение свойства DEFAULT_TOPICS_FOR_POLL непустое,
                //и нет необходимости указывать другие топики для чтения,
                //то метод можно не вызывать
                .fromTopics("testTopic", "testTopic2")
                //Можно указать, чтобы в результирующий лист не попадали
                //сообщения с пустыми значениями.
                //По умолчанию в результат попадают все сообщения.
                //Пустые значения могут быть из-за того, что они либо в самом
                //деле пустые, либо для каких-то сообщений указанный
                //десериализатор значения не подходит
                .excludeWithNullValues()
                //Можно указать, чтобы в результирующий лист не попадали
                //сообщения с пустыми ключами.
                //По умолчанию в результат попадают все сообщения.
                //Пустые ключи могут быть из-за того, что они либо в самом
                //деле пустые, либо для каких-то сообщений указанный
                //десериализатор ключа не подходит
                .excludeWithNullKeys()
                //Можно указать свойства и их значения,
                //которые могут дополнить/заменить свойства, 
                //получаемые при помощи KAFKA_CONSUMER_PROPERTIES.
                //ВНИМАНИЕ!!!!! Нельзя указывать свойство `auto.offset.reset`. 
                // Это приведет к IllegalArgumentException
                .setProperty(GROUP_ID_CONFIG, "someGroupId")
                //Можно указать действие, с момента начала выполнения которого
                //начнется чтение сообщений из топиков. Предполагается, что прочитанные сообщения - 
                //результат указанного действия. При этом свойство `auto.offset.reset` текущего консъюмера
                //меняется на `latest`. Если действие не указывать, будут прочитаны все сообщения из топиков,
                //и значение свойства `auto.offset.reset` текущего консъюмера равно `earliest`
                .pollLatestWith("Название или описание действия", () -> {
                    //тут можно выполнять шаги из других контекстов 
                    //или другой исполняемый код.
                })
                //прочие опции, уточняющие результат
        );
    }
}
Уточняющие параметры шага, который возвращает лист. Шаг предусматривает время на ожидание
//Можно указать один или несколько критериев,
//которым должен соответствовать
//каждый элемент результирующего листа.
// Так же доступны criteriaOr(criteria...),
// criteriaOnlyOne(criteria...)
// criteriaNot(criteria...)
.criteria("Описание критерия, которому должен соответствовать " +
    "каждый элемент, который попадет в результирующий лист", item -> {
    /*предикат, как работает критерий*/
})
//-------------------------------------------
// можно указать время,
// за которое нужные элементы должны быть получены
.timeOut(ofSeconds(5))
//-------------------------------------------
//Можно указать, что должно быть выброшено исключение,
// если непустой лист не удалось получить (не было ни одного элемента,
//который бы соответствовал перечисленным критериям,
// или непустой лист просто не был получен)
.throwOnNoResult()
//ТАКЖЕ ЕСТЬ СЛЕДУЮЩИЕ ОПЦИИ:
//-------------------------------------------
//1. можно указать сколько объектов,
//которые соответствуют критериям,
//нужно вернуть
.returnListOfSize(3)
//-------------------------------------------
//2. можно указать, до элемента с каким индексом
//нужно собрать результирующие элементы,
//индекс - индекс объекта в наборе элементов,
//которые соответствуют критериям
.returnBeforeIndex(7)
//.returnAfterIndex(8) либо после какого элемента
//----------------------------------------------
//3. Либо можно перечислить индексы элементов,
// которые следует вернуть.
//Индексы - индексы объектов в наборе элементов,
//которые соответствуют критериям
.returnItemsOfIndexes(0, 3, 5)
//-----------------------------------------------
//4. можно указать, при достижении какого количества
//ВСЕХ объектов, которые соответствуют критериям,
//должен быть возвращен результат
.returnIfEntireSize(isEqual(8))
//----------------------------------------------
//5. можно указать, при достижении каких условий,
//которым должен соответствовать лист ВСЕХ объектов,
//соответствующих критериям,
//можно возвращать результирующий лист/суб-лист
.returnOnCondition("Описание условия", list -> {
    /*предикат, как работает критерий*/
})
//так же доступны returnOnConditionOr(criteria...),
// returnOnConditionOnlyOne(criteria...)
// returnOnConditionNot(criteria...)
//------------------------------------------
//Если не нашлось столько подходящих объектов, чтобы вернуть результат,
//или весь суб-лист подходящих по критериям элементов,
// который должен вернуться как результат,
// не соответствует каким-то критериям -
// будет выброшено исключение с подробным описанием

Если не нужно использовать десериализаторы, то пример ниже

import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.GetRecordSupplier.consumerRecords;

public class MyTest {

    @Test
    public void myTest1() {
        List<ConsumerRecord<String, String>> records
            = kafka().poll(consumerRecords()
              //набор дополнительных опций тот же самый,
              //см пример выше
        );
    }
}

Получение данных из выборки сообщений Kafka#

Данные сообщений Kafka, собранные в лист#

import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.GetRecordSupplier.consumerRecords;

public class MyTest {

    @Test
    public void myTest1() {
        List<SomeData> someDataList
            = kafka().poll(consumerRecords(//параметры
            )
            //опции, уточняющие результат
            //
            .thenGetList(
                "Описание / название результирующего листа", //описание того ЧТО следует получить,
                //в свободной форме или бизнес
                //терминологии
                consumerRecord -> {
                    //Алгоритм функции, которая получает данные
                    //из каждого сообщения
                    return someDataObject; //эти объекты
                    //будут собраны в результирующий лист
                })
                //опции, уточняющие результат
        );
    }
}
Уточняющие параметры шага, который возвращает лист. Шаг не предусматривает время на ожидание
//Можно указать один или несколько критериев,
//которым должен соответствовать
//каждый элемент результирующего листа.
// Так же доступны criteriaOr(criteria...),
// criteriaOnlyOne(criteria...)
// criteriaNot(criteria...)
.criteria("Описание критерия, которому должен соответствовать " +
    "каждый элемент, который попадет в результирующий лист", item -> {
    /*предикат, как работает критерий*/
})
//-------------------------------------------
//Можно указать, что должно быть выброшено исключение,
// если непустой лист не удалось получить (не было ни одного элемента,
//который бы соответствовал перечисленным критериям,
// или непустой лист просто не был получен)
.throwOnNoResult()
//ТАКЖЕ ЕСТЬ СЛЕДУЮЩИЕ ОПЦИИ:
//-------------------------------------------
//1. можно указать сколько объектов,
//которые соответствуют критериям,
//нужно вернуть
.returnListOfSize(3)
//-------------------------------------------
//2. можно указать, до элемента с каким индексом
//нужно собрать результирующие элементы,
//индекс - индекс объекта в наборе элементов,
//которые соответствуют критериям
.returnBeforeIndex(7)
//.returnAfterIndex(8) либо после какого элемента
//----------------------------------------------
//3. Либо можно перечислить индексы элементов,
// которые следует вернуть.
//Индексы - индексы объектов в наборе элементов,
//которые соответствуют критериям
.returnItemsOfIndexes(0, 3, 5)
//-------------------------------------------
//4. можно указать, при достижении какого количества
//ВСЕХ объектов, которые соответствуют критериям,
//должен быть возвращен результат
.returnIfEntireSize(isEqual(8))
//-------------------------------------------
//5. можно указать, при достижении каких условий,
//которым должен соответствовать лист ВСЕХ объектов,
//соответствующих критериям,
//можно возвращать результирующий лист/суб-лист
.returnOnCondition("Описание условия", list -> {
    /*предикат, как работает критерий*/
})
//так же доступны returnOnConditionOr(criteria...),
// returnOnConditionOnlyOne(criteria...)
// returnOnConditionNot(criteria...)
//------------------------------------------
//Если не нашлось столько подходящих объектов, чтобы вернуть результат,
//или весь суб-лист подходящих по критериям элементов,
// который должен вернуться как результат,
// не соответствует каким-то критериям -
// будет выброшено исключение с подробным описанием

Объект данных из сообщений Kafka#

import java.util.List;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.GetRecordSupplier.consumerRecords;

public class MyTest {

    @Test
    public void myTest1() {
        SomeData someData
            = kafka().poll(consumerRecords(//параметры
            )
            //опции, уточняющие результат
            //
            .thenGetItem(
                "Описание / название результирующего объекта", //описание того 
                // ЧТО следует получить,
                //в свободной форме или бизнес
                //терминологии
                consumerRecord -> {
                    //Алгоритм функции, которая получает данные
                    //из каждого сообщения
                    return someDataObject; //один из этих 
                    //объектов возвращается как результат,
                    //первый попавшийся или с учетом уточнений
                })
                //опции, уточняющие результат
        );
    }
}
Уточняющие параметры шага, который возвращает элемент Iterable. Шаг не предусматривает время на ожидание
//Можно указать один или несколько критериев,
//которым должен соответствовать
//результирующий элемент из набора.
// Так же доступны criteriaOr(criteria...),
// criteriaOnlyOne(criteria...)
// criteriaNot(criteria...)
.criteria("Описание критерия, которому должен соответствовать " +
    "результирующий элемент из набора", item -> {
    /*предикат, как работает критерий*/
})
//------------------------------------------
//Можно указать, что должно быть выброшено исключение,
// если не был получен результирующий элемент (не было ни одного элемента,
//который бы соответствовал перечисленным критериям,
// или непустой набор просто не был получен)
.throwOnNoResult()
//ТАКЖЕ ЕСТЬ СЛЕДУЮЩИЕ ОПЦИИ:
//------------------------------------------
//1. Можно указать индекс элемента, который следует вернуть.
//Индекс - индекс объекта в наборе элементов,
//которые соответствуют критериям
.returnItemOfIndex(1)
//-----------------------------------------------
//2. можно указать, при достижении какого количества
//ВСЕХ объектов, которые соответствуют критериям,
//должен быть возвращен результат
.returnIfEntireSize(isEqual(8))
//------------------------------------------
//3. можно указать, при достижении каких условий,
//которым должен соответствовать набор ВСЕХ объектов,
//соответствующих критериям,
//можно возвращать результирующий элемент
.returnOnCondition("Описание условия", iterable -> {
    /*предикат, как работает критерий*/
})
//так же доступны returnOnConditionOr(criteria...),
// returnOnConditionOnlyOne(criteria...)
// returnOnConditionNot(criteria...)
//------------------------------------------
//Если не нашлось столько подходящих объектов, чтобы вернуть результат,
//или весь набор подходящих по критериям элементов,
// из которого должен быть взят результат,
// не соответствует каким-то критериям -
// будет выброшено исключение с подробным описанием