Program Listing for File connection_pool.hpp¶
↰ Return to documentation for file (stream-client/connector/connection_pool.hpp
)
#pragma once
#include "connector.hpp"
#include "pool_strategy.hpp"
#include <atomic>
#include <list>
namespace stream_client {
namespace connector {
template <typename Connector, typename Strategy = greedy_strategy<Connector>>
class base_connection_pool
{
public:
using connector_type = typename std::remove_reference<Connector>::type;
using stream_type = typename connector_type::stream_type;
using protocol_type = typename stream_type::protocol_type;
using clock_type = typename connector_type::clock_type;
using time_duration_type = typename connector_type::time_duration_type;
using time_point_type = typename connector_type::time_point_type;
template <typename... ArgN>
base_connection_pool(std::size_t size, time_duration_type idle_timeout, ArgN&&... argn);
template <typename Arg1, typename... ArgN,
typename std::enable_if<
!std::is_convertible<Arg1, typename Connector::time_duration_type>::value>::type* = nullptr>
base_connection_pool(std::size_t size, Arg1&& arg1, ArgN&&... argn);
base_connection_pool(const base_connection_pool<Connector>& other) = delete;
base_connection_pool<Connector>& operator=(const base_connection_pool<Connector>& other) = delete;
base_connection_pool(base_connection_pool<Connector>&& other) = delete;
base_connection_pool<Connector>& operator=(base_connection_pool<Connector>&& other) = delete;
virtual ~base_connection_pool();
std::unique_ptr<stream_type> get_session(boost::system::error_code& ec, const time_point_type& deadline);
inline std::unique_ptr<stream_type> get_session(boost::system::error_code& ec, const time_duration_type& timeout)
{
return get_session(ec, clock_type::now() + timeout);
}
inline std::unique_ptr<stream_type> get_session(boost::system::error_code& ec)
{
return get_session(ec, get_connect_timeout());
}
template <typename Time>
inline std::unique_ptr<stream_type> get_session(const Time& timeout_or_deadline)
{
boost::system::error_code ec;
auto session = get_session(ec, timeout_or_deadline);
if (ec) {
throw boost::system::system_error{ec, "Failed to get a session from the pool connected to " +
connector_.get_target()};
}
if (!session) {
throw boost::system::system_error{boost::asio::error::operation_aborted,
"Failed to get a session from the pool connected to " +
connector_.get_target()};
}
return session;
}
inline std::unique_ptr<stream_type> get_session()
{
return get_session(get_connect_timeout());
}
std::unique_ptr<stream_type> try_get_session(boost::system::error_code& ec, const time_point_type& deadline);
inline std::unique_ptr<stream_type> try_get_session(boost::system::error_code& ec,
const time_duration_type& timeout)
{
return try_get_session(ec, clock_type::now() + timeout);
}
void return_session(std::unique_ptr<stream_type>&& session);
bool is_connected(boost::system::error_code& ec, const time_point_type& deadline) const;
inline bool is_connected(boost::system::error_code& ec, const time_duration_type& timeout) const
{
return is_connected(ec, clock_type::now() + timeout);
}
inline bool is_connected(boost::system::error_code& ec) const
{
return is_connected(ec, clock_type::now() + get_connect_timeout());
}
inline bool is_connected() const
{
boost::system::error_code ec;
auto empty = is_connected(ec);
if (ec) {
throw boost::system::system_error{ec, "Failed to lock the pool connected to " + connector_.get_target()};
}
return empty;
}
inline const time_duration_type& get_resolve_timeout() const
{
return connector_.get_resolve_timeout();
}
inline const time_duration_type& get_connect_timeout() const
{
return connector_.get_connect_timeout();
}
inline const time_duration_type& get_operation_timeout() const
{
return connector_.get_operation_timeout();
}
private:
void watch_pool_routine();
std::string name_;
Strategy reconnection_;
connector_type connector_;
std::size_t pool_max_size_;
time_duration_type idle_timeout_;
std::list<std::pair<time_point_type, std::unique_ptr<stream_type>>>
sesson_pool_;
mutable std::timed_mutex pool_mutex_;
mutable std::condition_variable_any pool_cv_;
std::atomic_bool watch_pool_{false};
std::thread pool_watcher_;
};
using tcp_pool = base_connection_pool<tcp_connector>;
using tcp_conservative_pool = base_connection_pool<tcp_connector, conservative_strategy<tcp_connector>>;
using udp_pool = base_connection_pool<udp_connector>;
using udp_conservative_pool = base_connection_pool<udp_connector, conservative_strategy<udp_connector>>;
using ssl_pool = base_connection_pool<ssl_connector>;
using ssl_conservative_pool = base_connection_pool<ssl_connector, conservative_strategy<ssl_connector>>;
using http_pool = base_connection_pool<http_connector>;
using http_conservative_pool = base_connection_pool<http_connector, conservative_strategy<http_connector>>;
using https_pool = base_connection_pool<https_connector>;
using https_conservative_pool = base_connection_pool<https_connector, conservative_strategy<https_connector>>;
} // namespace connector
} // namespace stream_client
#include "impl/connection_pool.ipp"