Program Listing for File pool_strategy.ipp

Return to documentation for file (stream-client/connector/impl/pool_strategy.ipp)

#pragma once

#include "stream-client/logger.hpp"

#include <boost/system/system_error.hpp>

#include <atomic>
#include <chrono>
#include <list>
#include <random>
#include <thread>
#include <vector>

namespace stream_client {
namespace connector {

template <typename Connector>
const unsigned long conservative_strategy<Connector>::kMaxBackoffMs = 10000; // 10 seconds maximum delay

template <typename Connector>
const unsigned long conservative_strategy<Connector>::kDefaultDelayMs = 50; // 50 milliseconds is default initial delay

template <typename Connector>
const unsigned long conservative_strategy<Connector>::kDefaultDelayMul = 3; // 3 is default delay multiplier

template <typename Connector>
bool greedy_strategy<Connector>::refill(connector_type& connector, std::size_t vacant_places,
                                        append_func_type append_func)
{
    // creating new sessions may be slow and we want to add them simultaneously
    auto add_session = [&]() {
        try {
            auto new_session = connector.new_session();
            append_func(std::move(new_session));
        } catch (const boost::system::system_error& e) {
            STREAM_LOG_ERROR("failed to establish new session to " + connector.get_target() + ": " + e.what());
        }
    };

    std::list<std::thread> adders;
    for (std::size_t i = 0; i < vacant_places; ++i) {
        adders.emplace_back(add_session);
    }
    for (auto& a : adders) {
        a.join();
    }

    return vacant_places > 0;
}

template <typename Connector>
conservative_strategy<Connector>::conservative_strategy(unsigned long first_delay_ms, unsigned delay_multiplier)
    : initial_delay_ms_(first_delay_ms)
    , delay_multiplier_(delay_multiplier)
    , current_delay_ms_(0)
    , r_generator_(r_device_())
{
    if (delay_multiplier_ < 1) {
        throw std::runtime_error("delay multiplier should be >= 1");
    }
}

template <typename Connector>
bool conservative_strategy<Connector>::refill(connector_type& connector, std::size_t vacant_places,
                                              append_func_type append_func)
{
    if (clock_type::now() < wait_until_) {
        return false;
    }

    std::atomic_bool is_added{false};

    // creating new sessions may be slow and we want to add them simultaneously
    auto add_session = [&]() {
        try {
            auto new_session = connector.new_session();
            append_func(std::move(new_session));
            is_added = true;
        } catch (const boost::system::system_error& e) {
            STREAM_LOG_ERROR("failed to establish new session to " + connector.get_target() + ": " + e.what());
        }
    };

    std::vector<std::thread> adders;
    const size_t parallel = (vacant_places + 2) / 3 - 1;
    if (!current_delay_ms_ && parallel > 0) {
        adders.reserve(parallel);
        for (std::size_t i = 0; i < parallel; ++i) {
            adders.emplace_back(add_session);
        }
    }
    add_session();
    for (auto& a : adders) {
        a.join();
    }

    if (is_added) {
        current_delay_ms_ = 0;
        return true;
    }

    if (!current_delay_ms_) {
        current_delay_ms_ = initial_delay_ms_;
    } else {
        current_delay_ms_ *= delay_multiplier_;
    }
    const auto rand_val = double(r_generator_()) / r_generator_.max();
    current_delay_ms_ *= rand_val;
    current_delay_ms_ = std::min(kMaxBackoffMs, current_delay_ms_);
    STREAM_LOG_TRACE("wait for " + std::to_string(current_delay_ms_) + "ms" + " before connect to " +
                     connector.get_target());
    wait_until_ = clock_type::now() + std::chrono::milliseconds(current_delay_ms_);

    return false;
}

} // namespace connector
} // namespace stream_client