Program Listing for File connector.ipp

Return to documentation for file (stream-client/connector/impl/connector.ipp)

#pragma once

#include <random>

namespace {

template <typename T>
void shuffle_vector(std::vector<T>& v)
{
    std::random_device r_device;
    std::mt19937 r_generator(r_device());
    std::shuffle(v.begin(), v.end(), r_generator);
}

} // anonymous namespace

namespace stream_client {
namespace connector {

template <typename Stream>
base_connector<Stream>::base_connector(const std::string& host, const std::string& port,
                                       time_duration_type resolve_timeout, time_duration_type connect_timeout,
                                       time_duration_type operation_timeout,
                                       ::stream_client::resolver::ip_family ip_family, resolve_flags_type resolve_flags)
    : host_(host)
    , port_(port)
    , resolve_timeout_(resolve_timeout)
    , connect_timeout_(connect_timeout)
    , operation_timeout_(operation_timeout)
    , resolver_(host_, port_, resolve_timeout_, std::move(ip_family), std::move(resolve_flags))
{
    resolve_done_ = false;
    resolve_needed_ = true;
    resolving_thread_running_.store(true, std::memory_order_release);
    resolving_thread_ = std::thread([this]() { this->resolve_routine(); });
}

template <typename Stream>
base_connector<Stream>::~base_connector()
{
    resolving_thread_running_.store(false, std::memory_order_release);
    if (resolving_thread_.joinable()) {
        resolving_thread_.join();
    }
}

template <typename Stream>
std::unique_ptr<typename base_connector<Stream>::stream_type>
base_connector<Stream>::new_session(boost::system::error_code& ec, const time_point_type& deadline)
{
    std::unique_lock<std::timed_mutex> resolve_done_lk(resolve_done_mutex_, std::defer_lock);
    if (!resolve_done_lk.try_lock_until(deadline)) {
        // failed to lock resolve_done_mutex_ within deadline
        auto resolve_ec = get_resolve_error();
        ec = resolve_ec ? std::move(resolve_ec) : boost::asio::error::timed_out;
        return nullptr;
    }
    if (resolve_done_ == false &&
        !resolve_done_cv_.wait_until(resolve_done_lk, deadline, [this] { return resolve_done_; })) {
        // faield to wait for endpoints resolution
        auto resolve_ec = get_resolve_error();
        ec = resolve_ec ? std::move(resolve_ec) : boost::asio::error::timed_out;
        return nullptr;
    }
    // unlock owned resolve_done_mutex_ to release other new_session() calls while we are connecting
    resolve_done_lk.unlock();

    auto endpoints = get_endpoints();
    shuffle_vector(endpoints);
    for (const auto& peer : endpoints) {
        try {
            return connect_until(peer, deadline);
        } catch (const boost::system::system_error& err) {
            ec = err.code();
            break;
        }
    }
    if (!ec) {
        // endpoints may be empty because of resolve error
        ec = get_resolve_error();
    }
    // if failed to connect trigger resolving thread to update endpoints
    notify_resolve_needed();
    return nullptr;
}

template <typename Stream>
void base_connector<Stream>::resolve_routine()
{
    static const auto lock_timeout = std::chrono::milliseconds(100);

    while (resolving_thread_running_.load(std::memory_order_acquire)) {
        std::unique_lock<std::timed_mutex> resolve_needed_lk(resolve_needed_mutex_, std::defer_lock);
        if (!resolve_needed_lk.try_lock_for(lock_timeout) ||
            !resolve_needed_cv_.wait_for(resolve_needed_lk, lock_timeout, [this] { return resolve_needed_; })) {
            continue;
        }
        // at this point we owe locked resolve_needed_mutex_

        boost::system::error_code resolve_ec;
        resolver_endpoint_iterator_type new_endpoints = resolver_.resolve(resolve_ec);
        set_resolve_error(resolve_ec);
        if (resolve_ec) {
            std::this_thread::sleep_for(std::chrono::milliseconds(50));
            continue;
        }

        resolve_needed_ = false;
        update_endpoints(std::move(new_endpoints));
        notify_resolve_done();
    }
}

// connect_until specialization for tcp_client
template <>
inline std::unique_ptr<::stream_client::tcp_client>
base_connector<::stream_client::tcp_client>::connect_until(const endpoint_type& peer_endpoint,
                                                           const time_point_type& until_time) const
{
    const time_duration_type connect_timeout{until_time - clock_type::now()};
    return std::make_unique<::stream_client::tcp_client>(peer_endpoint, connect_timeout, operation_timeout_);
}

// connect_until specialization for udp_client
template <>
inline std::unique_ptr<::stream_client::udp_client>
base_connector<::stream_client::udp_client>::connect_until(const endpoint_type& peer_endpoint,
                                                           const time_point_type& until_time) const
{
    const time_duration_type connect_timeout{until_time - clock_type::now()};
    return std::make_unique<::stream_client::udp_client>(peer_endpoint, connect_timeout, operation_timeout_);
}

// connect_until specialization for ssl::ssl_client
template <>
inline std::unique_ptr<::stream_client::ssl::ssl_client>
base_connector<::stream_client::ssl::ssl_client>::connect_until(const endpoint_type& peer_endpoint,
                                                                const time_point_type& until_time) const
{
    const time_duration_type connect_timeout{until_time - clock_type::now()};
    return std::make_unique<::stream_client::ssl::ssl_client>(peer_endpoint, connect_timeout, operation_timeout_,
                                                              host_);
}

// connect_until specialization for http_client
template <>
inline std::unique_ptr<::stream_client::http::http_client>
base_connector<::stream_client::http::http_client>::connect_until(const endpoint_type& peer_endpoint,
                                                                  const time_point_type& until_time) const
{
    const time_duration_type connect_timeout{until_time - clock_type::now()};
    return std::make_unique<::stream_client::http::http_client>(peer_endpoint, connect_timeout, operation_timeout_);
}

// connect_until specialization for https_client
template <>
inline std::unique_ptr<::stream_client::http::https_client>
base_connector<::stream_client::http::https_client>::connect_until(const endpoint_type& peer_endpoint,
                                                                   const time_point_type& until_time) const
{
    const time_duration_type connect_timeout{until_time - clock_type::now()};
    return std::make_unique<::stream_client::http::https_client>(peer_endpoint, connect_timeout, operation_timeout_,
                                                                 host_);
}

} // namespace connector
} // namespace stream_client