Program Listing for File connection_pool.ipp¶
↰ Return to documentation for file (stream-client/connector/impl/connection_pool.ipp
)
#pragma once
#include "stream-client/logger.hpp"
namespace stream_client {
namespace connector {
template <typename Connector, typename Strategy>
template <typename... ArgN>
base_connection_pool<Connector, Strategy>::base_connection_pool(std::size_t size, time_duration_type idle_timeout,
ArgN&&... argn)
: connector_(std::forward<ArgN>(argn)...)
, pool_max_size_(size)
, idle_timeout_(idle_timeout)
, watch_pool_(true)
{
name_ = "connection_pool[" + connector_.get_target() + "](" + std::to_string(pool_max_size_) + ")";
pool_watcher_ = std::thread([this]() { this->watch_pool_routine(); });
STREAM_LOG_TRACE(name_ + " has been created");
}
template <typename Connector, typename Strategy>
template <typename Arg1, typename... ArgN,
typename std::enable_if<!std::is_convertible<Arg1, typename Connector::time_duration_type>::value>::type*>
base_connection_pool<Connector, Strategy>::base_connection_pool(std::size_t size, Arg1&& arg1, ArgN&&... argn)
: base_connection_pool(size, time_duration_type::max(), std::forward<Arg1>(arg1), std::forward<ArgN>(argn)...)
{
}
template <typename Connector, typename Strategy>
base_connection_pool<Connector, Strategy>::~base_connection_pool()
{
watch_pool_.store(false, std::memory_order_release);
if (pool_watcher_.joinable()) {
pool_watcher_.join();
}
}
template <typename Connector, typename Strategy>
std::unique_ptr<typename base_connection_pool<Connector, Strategy>::stream_type>
base_connection_pool<Connector, Strategy>::get_session(boost::system::error_code& ec, const time_point_type& deadline)
{
std::unique_lock<std::timed_mutex> pool_lk(pool_mutex_, std::defer_lock);
if (!pool_lk.try_lock_until(deadline)) {
// failed to lock pool_mutex_
ec = boost::asio::error::timed_out;
return nullptr;
}
if (sesson_pool_.empty() && !pool_cv_.wait_until(pool_lk, deadline, [this] { return !sesson_pool_.empty(); })) {
// session pool is still empty
ec = boost::asio::error::not_found;
return nullptr;
}
std::unique_ptr<stream_type> session = std::move(sesson_pool_.front().second);
sesson_pool_.pop_front();
return session;
}
template <typename Connector, typename Strategy>
std::unique_ptr<typename base_connection_pool<Connector, Strategy>::stream_type>
base_connection_pool<Connector, Strategy>::try_get_session(boost::system::error_code& ec,
const time_point_type& deadline)
{
std::unique_lock<std::timed_mutex> pool_lk(pool_mutex_, std::defer_lock);
if (!pool_lk.try_lock_until(deadline)) {
// failed to lock pool_mutex_
ec = boost::asio::error::timed_out;
return nullptr;
}
if (sesson_pool_.empty()) {
// session pool is empty
ec = boost::asio::error::not_found;
return nullptr;
}
std::unique_ptr<stream_type> session = std::move(sesson_pool_.front().second);
sesson_pool_.pop_front();
return session;
}
template <typename Connector, typename Strategy>
void base_connection_pool<Connector, Strategy>::return_session(std::unique_ptr<stream_type>&& session)
{
if (!session || !session->next_layer().is_open()) {
return;
}
std::unique_lock<std::timed_mutex> pool_lk(pool_mutex_, std::defer_lock);
if (!pool_lk.try_lock_for(std::chrono::milliseconds(1))) {
// if we failed to return session in 1ms it's easier to establish new one
STREAM_LOG_INFO(name_ + " has dropped alive session due lock contention");
return;
}
sesson_pool_.emplace_back(clock_type::now(), std::move(session));
pool_lk.unlock();
pool_cv_.notify_one();
}
template <typename Connector, typename Strategy>
bool base_connection_pool<Connector, Strategy>::is_connected(boost::system::error_code& ec,
const time_point_type& deadline) const
{
std::unique_lock<std::timed_mutex> pool_lk(pool_mutex_, std::defer_lock);
if (!pool_lk.try_lock_until(deadline)) {
// failed to lock pool_mutex_
ec = boost::asio::error::timed_out;
return false;
}
if (sesson_pool_.empty() && !pool_cv_.wait_until(pool_lk, deadline, [this] { return !sesson_pool_.empty(); })) {
// session pool is still empty
return false;
}
return true;
}
template <typename Connector, typename Strategy>
void base_connection_pool<Connector, Strategy>::watch_pool_routine()
{
static const auto lock_timeout = std::chrono::milliseconds(100);
while (watch_pool_.load(std::memory_order_acquire)) {
// try to lock pool mutex
std::unique_lock<std::timed_mutex> pool_lk(pool_mutex_, std::defer_lock);
if (!pool_lk.try_lock_for(lock_timeout)) {
continue;
}
// remove session which idling past idle_timeout_
std::size_t pool_current_size = 0;
for (auto pool_it = sesson_pool_.begin(); pool_it != sesson_pool_.end();) {
const auto idle_for = clock_type::now() - pool_it->first;
if (idle_for >= idle_timeout_) {
pool_it = sesson_pool_.erase(pool_it);
} else {
++pool_it;
++pool_current_size;
}
}
// release pool mutex after removing old sessions
pool_lk.unlock();
// pool_current_size may be bigger if someone returned previous session
std::size_t vacant_places = (pool_max_size_ > pool_current_size) ? pool_max_size_ - pool_current_size : 0;
if (vacant_places) {
auto append_func = [this](std::unique_ptr<stream_type>&& session) {
this->return_session(std::move(session));
};
const auto need_more = reconnection_.refill(connector_, vacant_places, append_func);
if (need_more) {
continue;
}
}
// stop cpu spooling if nothing has been added
std::this_thread::sleep_for(std::chrono::milliseconds(50));
}
}
} // namespace connector
} // namespace stream_client