Program Listing for File connection_pool.hpp

Return to documentation for file (stream-client/connector/connection_pool.hpp)

#pragma once

#include "connector.hpp"
#include "pool_strategy.hpp"

#include <atomic>
#include <list>

namespace stream_client {
namespace connector {

template <typename Connector, typename Strategy = greedy_strategy<Connector>>
class base_connection_pool
{
public:
    using connector_type = typename std::remove_reference<Connector>::type;
    using stream_type = typename connector_type::stream_type;
    using protocol_type = typename stream_type::protocol_type;

    using clock_type = typename connector_type::clock_type;
    using time_duration_type = typename connector_type::time_duration_type;
    using time_point_type = typename connector_type::time_point_type;

    template <typename... ArgN>
    base_connection_pool(std::size_t size, time_duration_type idle_timeout, ArgN&&... argn);

    template <typename Arg1, typename... ArgN,
              typename std::enable_if<
                  !std::is_convertible<Arg1, typename Connector::time_duration_type>::value>::type* = nullptr>
    base_connection_pool(std::size_t size, Arg1&& arg1, ArgN&&... argn);

    base_connection_pool(const base_connection_pool<Connector>& other) = delete;
    base_connection_pool<Connector>& operator=(const base_connection_pool<Connector>& other) = delete;
    base_connection_pool(base_connection_pool<Connector>&& other) = delete;
    base_connection_pool<Connector>& operator=(base_connection_pool<Connector>&& other) = delete;

    virtual ~base_connection_pool();

    std::unique_ptr<stream_type> get_session(boost::system::error_code& ec, const time_point_type& deadline);

    inline std::unique_ptr<stream_type> get_session(boost::system::error_code& ec, const time_duration_type& timeout)
    {
        return get_session(ec, clock_type::now() + timeout);
    }

    inline std::unique_ptr<stream_type> get_session(boost::system::error_code& ec)
    {
        return get_session(ec, get_connect_timeout());
    }

    template <typename Time>
    inline std::unique_ptr<stream_type> get_session(const Time& timeout_or_deadline)
    {
        boost::system::error_code ec;
        auto session = get_session(ec, timeout_or_deadline);
        if (ec) {
            throw boost::system::system_error{ec, "Failed to get a session from the pool connected to " +
                                                      connector_.get_target()};
        }
        if (!session) {
            throw boost::system::system_error{boost::asio::error::operation_aborted,
                                              "Failed to get a session from the pool connected to " +
                                                  connector_.get_target()};
        }
        return session;
    }

    inline std::unique_ptr<stream_type> get_session()
    {
        return get_session(get_connect_timeout());
    }

    std::unique_ptr<stream_type> try_get_session(boost::system::error_code& ec, const time_point_type& deadline);

    inline std::unique_ptr<stream_type> try_get_session(boost::system::error_code& ec,
                                                        const time_duration_type& timeout)
    {
        return try_get_session(ec, clock_type::now() + timeout);
    }

    void return_session(std::unique_ptr<stream_type>&& session);

    bool is_connected(boost::system::error_code& ec, const time_point_type& deadline) const;

    inline bool is_connected(boost::system::error_code& ec, const time_duration_type& timeout) const
    {
        return is_connected(ec, clock_type::now() + timeout);
    }

    inline bool is_connected(boost::system::error_code& ec) const
    {
        return is_connected(ec, clock_type::now() + get_connect_timeout());
    }

    inline bool is_connected() const
    {
        boost::system::error_code ec;
        auto empty = is_connected(ec);
        if (ec) {
            throw boost::system::system_error{ec, "Failed to lock the pool connected to " + connector_.get_target()};
        }
        return empty;
    }

    inline const time_duration_type& get_resolve_timeout() const
    {
        return connector_.get_resolve_timeout();
    }

    inline const time_duration_type& get_connect_timeout() const
    {
        return connector_.get_connect_timeout();
    }

    inline const time_duration_type& get_operation_timeout() const
    {
        return connector_.get_operation_timeout();
    }

private:
    void watch_pool_routine();

    std::string name_;
    Strategy reconnection_;
    connector_type connector_;

    std::size_t pool_max_size_;
    time_duration_type idle_timeout_;
    std::list<std::pair<time_point_type, std::unique_ptr<stream_type>>>
        sesson_pool_;
    mutable std::timed_mutex pool_mutex_;
    mutable std::condition_variable_any pool_cv_;

    std::atomic_bool watch_pool_{false};
    std::thread pool_watcher_;
};

using tcp_pool = base_connection_pool<tcp_connector>;
using tcp_conservative_pool = base_connection_pool<tcp_connector, conservative_strategy<tcp_connector>>;
using udp_pool = base_connection_pool<udp_connector>;
using udp_conservative_pool = base_connection_pool<udp_connector, conservative_strategy<udp_connector>>;

using ssl_pool = base_connection_pool<ssl_connector>;
using ssl_conservative_pool = base_connection_pool<ssl_connector, conservative_strategy<ssl_connector>>;

using http_pool = base_connection_pool<http_connector>;
using http_conservative_pool = base_connection_pool<http_connector, conservative_strategy<http_connector>>;
using https_pool = base_connection_pool<https_connector>;
using https_conservative_pool = base_connection_pool<https_connector, conservative_strategy<https_connector>>;

} // namespace connector
} // namespace stream_client

#include "impl/connection_pool.ipp"