Получение данных из Kafka#

Отличие описанного ниже от того, что описано в Получение сообщений Kafka и их данных в том, что здесь нет промежуточного действия в виде получения сообщений, из которых извлекаются данные. Данные получаются как бы напрямую.

Важно:

Получение объекта данных Kafka#

import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;

import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollIterableItemSupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        SomeData someDataItem = kafka().poll(consumedItem(
                "Описание / название результирующего объекта", //описание того 
                // ЧТО следует получить,
                //в свободной форме или бизнес
                //терминологии
                new SomeKeyTypeDeserializer(), //десериализатор для ключа
                new SomeValueTypeDeserializer(), //десериализатор для значения
                consumerRecord -> {
                    //Алгоритм функции, которая получает данные
                    //из каждого сообщения
                    return someDataObject; //один из этих 
                    //объектов возвращается как результат,
                    //первый попавшийся или с учетом уточнений
                })
                //Можно перечислить топики, из которых следует прочесть 
                //данные сообщений.
                //Если значение свойства DEFAULT_TOPICS_FOR_POLL непустое,
                //и нет необходимости указывать другие топики для чтения,
                //то метод можно не вызывать
                .fromTopics("testTopic", "testTopic2")
                //Можно указать свойства и их значения,
                //которые могут дополнить/заменить свойства, 
                //получаемые при помощи KAFKA_CONSUMER_PROPERTIES.
                //ВНИМАНИЕ!!!!! Нельзя указывать свойство `auto.offset.reset`. 
                // Это приведет к IllegalArgumentException
                .setProperty(GROUP_ID_CONFIG, "someGroupId")
                //Можно указать действие, с момента начала выполнения которого
                //начнется чтение сообщений из топиков. Предполагается, что прочитанные сообщения - 
                //результат указанного действия. При этом свойство `auto.offset.reset` текущего консъюмера
                //меняется на `latest`. Если действие не указывать, будут прочитаны все сообщения из топиков,
                //и значение свойства `auto.offset.reset` текущего консъюмера равно `earliest`
                .pollLatestWith("Название или описание действия", () -> {
                    //тут можно выполнять шаги из других контекстов 
                    //или другой исполняемый код.
                })
                //другие уточняющие параметры
        );
    }
}
Уточняющие параметры шага, который возвращает элемент Iterable. Шаг предусматривает время на ожидание
//Можно указать один или несколько критериев,
//которым должен соответствовать
//результирующий элемент из набора.
// Так же доступны criteriaOr(criteria...),
// criteriaOnlyOne(criteria...)
// criteriaNot(criteria...)
.criteria("Описание критерия, которому должен соответствовать " +
    "результирующий элемент из набора", item -> {
    /*предикат, как работает критерий*/
})
//------------------------------------------
// можно указать время,
// за которое нужные элемент должен быть получен
.timeOut(ofSeconds(5))
//------------------------------------------
//Можно указать, что должно быть выброшено исключение,
// если не был получен результирующий элемент (не было ни одного элемента,
//который бы соответствовал перечисленным критериям,
// или непустой набор просто не был получен)
.throwOnNoResult()
//ТАКЖЕ ЕСТЬ СЛЕДУЮЩИЕ ОПЦИИ:
//------------------------------------------
//1. Можно указать индекс элемента, который следует вернуть.
//Индекс - индекс объекта в наборе элементов,
//которые соответствуют критериям
.returnItemOfIndex(1)
//-----------------------------------------------
//2. можно указать, при достижении какого количества
//ВСЕХ объектов, которые соответствуют критериям,
//должен быть возвращен результат
.returnIfEntireSize(isEqual(8))
//------------------------------------------
//3. можно указать, при достижении каких условий,
//которым должен соответствовать набор ВСЕХ объектов,
//соответствующих критериям,
//можно возвращать результирующий элемент
.returnOnCondition("Описание условия", iterable -> {
    /*предикат, как работает критерий*/
})
//так же доступны returnOnConditionOr(criteria...),
// returnOnConditionOnlyOne(criteria...)
// returnOnConditionNot(criteria...)
//------------------------------------------
//Если не нашлось столько подходящих объектов, чтобы вернуть результат,
//или весь набор подходящих по критериям элементов,
// из которого должен быть взят результат,
// не соответствует каким-то критериям -
// будет выброшено исключение с подробным описанием

Ниже другие варианты получения объекта из очередей

import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollIterableItemSupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        SomeData someDataItem = kafka().poll(consumedItemKeyData(
                "Описание / название результирующего объекта", //описание того 
                // ЧТО следует получить,
                //в свободной форме или бизнес
                //терминологии
                new SomeKeyTypeDeserializer(), //десериализатор для ключа
                keyData -> {
                    //Алгоритм функции, которая получает данные
                    //из каждого ключа сообщения
                    return someDataObject; //один из этих 
                    //объектов возвращается как результат,
                    //первый попавшийся или с учетом уточнений
                })
            //уточняющие параметры
        );
    }
}
import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollIterableItemSupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        //возврат одного из ключей сообщений
        SomeKeyType someKey = kafka().poll(consumedKey(
            new SomeKeyTypeDeserializer()) //десериализатор для ключа
            //уточняющие параметры
        );
    }
}
import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollIterableItemSupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        //возврат одного из ключей сообщений
        //в виде текста
        String someKey = kafka().poll(consumedKey()
            //уточняющие параметры
        );
    }
}
import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollIterableItemSupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        SomeData someDataItem = kafka().poll(consumedItemValueData(
                "Описание / название результирующего объекта", //описание того 
                // ЧТО следует получить,
                //в свободной форме или бизнес
                //терминологии
                new SomeValueTypeDeserializer(), //десериализатор для значения
                valueData -> {
                    //Алгоритм функции, которая получает данные
                    //из каждого значения сообщения
                    return someDataObject; //один из этих 
                    //объектов возвращается как результат,
                    //первый попавшийся или с учетом уточнений
                })
            //уточняющие параметры
        );
    }
}
import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollIterableItemSupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        //возврат одного из значений сообщений
        SomeValueType someDataItem = kafka().poll(consumedValue(
            new SomeValueTypeDeserializer()) //десериализатор для значения
            //уточняющие параметры
        );
    }
}
import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollIterableItemSupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        //возврат одного из значений сообщений
        //в виде текста
        SomeValueType someDataItem = kafka().poll(consumedValue()
            //уточняющие параметры
        );
    }
}

Получение объектов данных Kafka#

В виде листа#

import java.util.List;

import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;

import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollIterableSupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        List<SomeData> someDataList
            = kafka().poll(consumedList("Описание / название результирующего листа", //описание того 
                // ЧТО следует получить,
                //в свободной форме или бизнес
                //терминологии
                new SomeKeyTypeDeserializer(), //десериализатор для ключа
                new SomeValueTypeDeserializer(), //десериализатор для значения
                cr -> {
                    //Алгоритм функции, которая получает данные
                    //из каждого сообщения
                    return someDataObject; //эти объекты
                    //будут собраны в результирующий лист
                })
                //Можно перечислить топики, из которых следует прочесть 
                //данные сообщений.
                //Если значение свойства DEFAULT_TOPICS_FOR_POLL непустое,
                //и нет необходимости указывать другие топики для чтения,
                //то метод можно не вызывать
                .fromTopics("testTopic", "testTopic2")
                //Можно указать свойства и их значения,
                //которые могут дополнить/заменить свойства, 
                //получаемые при помощи KAFKA_CONSUMER_PROPERTIES.
                //ВНИМАНИЕ!!!!! Нельзя указывать свойство `auto.offset.reset`. 
                // Это приведет к IllegalArgumentException
                .setProperty(GROUP_ID_CONFIG, "someGroupId")
                //Можно указать действие, с момента начала выполнения которого
                //начнется чтение сообщений из топиков. Предполагается, что прочитанные сообщения - 
                //результат указанного действия. При этом свойство `auto.offset.reset` текущего консъюмера
                //меняется на `latest`. Если действие не указывать, будут прочитаны все сообщения из топиков,
                //и значение свойства `auto.offset.reset` текущего консъюмера равно `earliest`
                .pollLatestWith("Название или описание действия", () -> {
                    //тут можно выполнять шаги из других контекстов 
                    //или другой исполняемый код.
                })
                //другие уточняющие параметры
        );
    }
}
Уточняющие параметры шага, который возвращает лист. Шаг предусматривает время на ожидание
//Можно указать один или несколько критериев,
//которым должен соответствовать
//каждый элемент результирующего листа.
// Так же доступны criteriaOr(criteria...),
// criteriaOnlyOne(criteria...)
// criteriaNot(criteria...)
.criteria("Описание критерия, которому должен соответствовать " +
    "каждый элемент, который попадет в результирующий лист", item -> {
    /*предикат, как работает критерий*/
})
//-------------------------------------------
// можно указать время,
// за которое нужные элементы должны быть получены
.timeOut(ofSeconds(5))
//-------------------------------------------
//Можно указать, что должно быть выброшено исключение,
// если непустой лист не удалось получить (не было ни одного элемента,
//который бы соответствовал перечисленным критериям,
// или непустой лист просто не был получен)
.throwOnNoResult()
//ТАКЖЕ ЕСТЬ СЛЕДУЮЩИЕ ОПЦИИ:
//-------------------------------------------
//1. можно указать сколько объектов,
//которые соответствуют критериям,
//нужно вернуть
.returnListOfSize(3)
//-------------------------------------------
//2. можно указать, до элемента с каким индексом
//нужно собрать результирующие элементы,
//индекс - индекс объекта в наборе элементов,
//которые соответствуют критериям
.returnBeforeIndex(7)
//.returnAfterIndex(8) либо после какого элемента
//----------------------------------------------
//3. Либо можно перечислить индексы элементов,
// которые следует вернуть.
//Индексы - индексы объектов в наборе элементов,
//которые соответствуют критериям
.returnItemsOfIndexes(0, 3, 5)
//-----------------------------------------------
//4. можно указать, при достижении какого количества
//ВСЕХ объектов, которые соответствуют критериям,
//должен быть возвращен результат
.returnIfEntireSize(isEqual(8))
//----------------------------------------------
//5. можно указать, при достижении каких условий,
//которым должен соответствовать лист ВСЕХ объектов,
//соответствующих критериям,
//можно возвращать результирующий лист/суб-лист
.returnOnCondition("Описание условия", list -> {
    /*предикат, как работает критерий*/
})
//так же доступны returnOnConditionOr(criteria...),
// returnOnConditionOnlyOne(criteria...)
// returnOnConditionNot(criteria...)
//------------------------------------------
//Если не нашлось столько подходящих объектов, чтобы вернуть результат,
//или весь суб-лист подходящих по критериям элементов,
// который должен вернуться как результат,
// не соответствует каким-то критериям -
// будет выброшено исключение с подробным описанием

Ниже другие варианты получения объектов из очередей

import java.util.List;

import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollIterableSupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        List<SomeData> someDataList = kafka().poll(consumedListKeyData(
                "Описание / название результирующего листа", //описание того 
                // ЧТО следует получить,
                //в свободной форме или бизнес
                //терминологии
                new SomeKeyTypeDeserializer(), //десериализатор для ключа
                keyData -> {
                    //Алгоритм функции, которая получает данные
                    //из каждого ключа сообщения
                    return someDataObject; //эти объекты
                    //будут собраны в результирующий лист
                })
            //уточняющие параметры
        );
    }
}
import java.util.List;

import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollIterableSupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        //возврат листа ключей сообщений
        List<SomeKeyType> someKeyList = kafka().poll(consumedKeys(
            new SomeKeyTypeDeserializer()) //десериализатор для ключа
            //уточняющие параметры
        );
    }
}
import java.util.List;

import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollIterableSupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        //возврат листа ключей сообщений
        //в виде текстов
        List<String> someKeyList = kafka().poll(consumedKeys()
            //уточняющие параметры
        );
    }
}
import java.util.List;

import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollIterableSupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        List<SomeData> someDataList = kafka().poll(consumedListValueData(
                "Описание / название результирующего листа", //описание того 
                // ЧТО следует получить,
                //в свободной форме или бизнес
                //терминологии
                new SomeValueTypeDeserializer(), //десериализатор для значения
                valueData -> {
                    //Алгоритм функции, которая получает данные
                    //из каждого значения сообщения
                    return someDataObject; //эти объекты
                    //будут собраны в результирующий лист
                })
            //уточняющие параметры
        );
    }
}
import java.util.List;

import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollIterableSupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        //возврат листа значений сообщений
        List<SomeValueType> someValueList = kafka().poll(consumedValues(
            new SomeValueTypeDeserializer()) //десериализатор для значения
            //уточняющие параметры
        );
    }
}
import java.util.List;

import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollIterableSupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        //возврат листа значений сообщений
        //в виде текстов
        List<String> someValueList = kafka().poll(consumedValues()
            //уточняющие параметры
        );
    }
}

В виде массива#

import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;

import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollArraySupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        SomeData[] someDataArray
            = kafka().poll(consumedArray("Описание / название результирующего массива", //описание того 
                // ЧТО следует получить,
                //в свободной форме или бизнес
                //терминологии
                new SomeKeyTypeDeserializer(), //десериализатор для ключа
                new SomeValueTypeDeserializer(), //десериализатор для значения
                SomeData.class, //класс результирующего массива
                //так же можно использовать объект
                //com.fasterxml.jackson.core.type.TypeReference
                cr -> {
                    //Алгоритм функции, которая получает данные
                    //из каждого сообщения
                    return someDataObject; //эти объекты
                    //будут собраны в результирующий массив
                })
                //Можно перечислить топики, из которых следует прочесть 
                //данные сообщений.
                //Если значение свойства DEFAULT_TOPICS_FOR_POLL непустое,
                //и нет необходимости указывать другие топики для чтения,
                //то метод можно не вызывать
                .fromTopics("testTopic", "testTopic2")
                //Можно указать свойства и их значения,
                //которые могут дополнить/заменить свойства, 
                //получаемые при помощи KAFKA_CONSUMER_PROPERTIES.
                //ВНИМАНИЕ!!!!! Нельзя указывать свойство `auto.offset.reset`. 
                // Это приведет к IllegalArgumentException
                .setProperty(GROUP_ID_CONFIG, "someGroupId")
                //Можно указать действие, с момента начала выполнения которого
                //начнется чтение сообщений из топиков. Предполагается, что прочитанные сообщения - 
                //результат указанного действия. При этом свойство `auto.offset.reset` текущего консъюмера
                //меняется на `latest`. Если действие не указывать, будут прочитаны все сообщения из топиков,
                //и значение свойства `auto.offset.reset` текущего консъюмера равно `earliest`
                .pollLatestWith("Название или описание действия", () -> {
                    //тут можно выполнять шаги из других контекстов 
                    //или другой исполняемый код.
                })
                //опции, уточняющие результат
        );
    }
}
Уточняющие параметры шага, который возвращает массив. Шаг предусматривает время на ожидание
//Можно указать один или несколько критериев,
//которым должен соответствовать
//каждый элемент результирующего массива.
// Так же доступны criteriaOr(criteria...),
// criteriaOnlyOne(criteria...)
// criteriaNot(criteria...)
.criteria("Описание критерия, которому должен соответствовать " +
    "каждый элемент, который попадет в результирующий массив", item -> {
    /*предикат, как работает критерий*/
})
//----------------------------------------------
// можно указать время,
// за которое нужные элементы должны быть получены
.timeOut(ofSeconds(5))
//----------------------------------------------
//Можно указать, что должно быть выброшено исключение,
// если непустой массив не удалось получить (не было ни одного элемента,
//который бы соответствовал перечисленным критериям,
// или непустой массив просто не был получен)
.throwOnNoResult()
//ТАКЖЕ ЕСТЬ СЛЕДУЮЩИЕ ОПЦИИ:
//----------------------------------------------
//1. можно указать сколько объектов,
//которые соответствуют критериям,
//нужно вернуть
.returnArrayOfLength(3)
//-------------------------------------------
//2. можно указать, до элемента с каким индексом
//нужно собрать результирующие элементы,
//индекс - индекс объекта в наборе элементов,
//которые соответствуют критериям
.returnBeforeIndex(7)
//.returnAfterIndex(8) либо после какого элемента
//----------------------------------------------
//3. Либо можно перечислить индексы элементов,
// которые следует вернуть.
//Индексы - индексы объектов в наборе элементов,
//которые соответствуют критериям
.returnItemsOfIndexes(0, 3, 5)
//-----------------------------------------------
//4. можно указать, при достижении какого количества
//ВСЕХ объектов, которые соответствуют критериям,
//должен быть возвращен результат
.returnIfEntireLength(isEqual(8))
//----------------------------------------------
//5. можно указать, при достижении каких условий,
//которым должен соответствовать массив ВСЕХ объектов,
//соответствующих критериям,
//можно возвращать результирующий массив/суб-массив
.returnOnCondition("Описание условия", array -> {
    /*предикат, как работает критерий*/
})
//так же доступны returnOnConditionOr(criteria...),
// returnOnConditionOnlyOne(criteria...)
// returnOnConditionNot(criteria...)
//------------------------------------------
//Если не нашлось столько подходящих объектов, чтобы вернуть результат,
//или весь суб-массив подходящих по критериям элементов,
// который должен вернуться как результат,
// не соответствует каким-то критериям -
// будет выброшено исключение с подробным описанием

Ниже другие варианты получения массива объектов из очередей

import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollArraySupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        SomeData[] someDataArray = kafka().poll(consumedArrayKeyData(
                "Описание / название результирующего массива", //описание того 
                // ЧТО следует получить,
                //в свободной форме или бизнес
                //терминологии
                new SomeKeyTypeDeserializer(), //десериализатор для ключа
                SomeData.class, //класс результирующего массива
                //так же можно использовать объект
                //com.fasterxml.jackson.core.type.TypeReference
                keyData -> {
                    //Алгоритм функции, которая получает данные
                    //из каждого ключа сообщения
                    return someDataObject; //эти объекты
                    //будут собраны в результирующий массив
                })
            //уточняющие параметры
        );
    }
}
import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollArraySupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        //возврат массива ключей сообщений
        SomeKeyType[] someKeyArray = kafka().poll(consumedArrayKeys(
            SomeKeyType.class, //класс результирующего массива
            // так же можно использовать объект
            //com.fasterxml.jackson.core.type.TypeReference
            new SomeKeyTypeDeserializer()) //десериализатор для ключа
            //уточняющие параметры
        );
    }
}
import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollArraySupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        //возврат массива ключей сообщений
        //в виде текстов
        String[] someKeyArray = kafka().poll(consumedArrayKeys()
            //уточняющие параметры
        );
    }
}
import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollArraySupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        SomeData[] someDataArray = kafka().poll(consumedArrayValueData(
                "Описание / название результирующего массива", //описание того 
                // ЧТО следует получить,
                //в свободной форме или бизнес
                //терминологии
                new SomeValueTypeDeserializer(), //десериализатор для значения
                SomeData.class, //класс результирующего массива
                //так же можно использовать объект
                //com.fasterxml.jackson.core.type.TypeReference
                valueData -> {
                    //Алгоритм функции, которая получает данные
                    //из каждого значения сообщения
                    return someDataObject; //эти объекты
                    //будут собраны в результирующий массив
                })
            //уточняющие параметры
        );
    }
}
import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollArraySupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        //возврат массива значений сообщений
        SomeValueType[] someValueArray = kafka().poll(consumedArrayValues(
            SomeValueType.class, //класс результирующего массива
            // так же можно использовать объект
            //com.fasterxml.jackson.core.type.TypeReference
            new SomeValueTypeDeserializer()) //десериализатор для значения
            //уточняющие параметры
        );
    }
}
import static ru.tinkoff.qa.neptune.kafka.KafkaStepContext.kafka;
import static ru.tinkoff.qa.neptune.kafka.functions.poll.KafkaPollArraySupplier.*;

public class MyTest {

    @Test
    public void myTest1() {
        //возврат массива значений сообщений
        //в виде текстов
        String[] someValueArray = kafka().poll(consumedArrayValues()
            //уточняющие параметры
        );
    }
}