Program Listing for File connector.hpp¶
↰ Return to documentation for file (stream-client/connector/connector.hpp
)
#pragma once
#include "stream-client/resolver/resolver.hpp"
#include "stream-client/stream/http_socket.hpp"
#include <atomic>
#include <condition_variable>
#include <mutex>
namespace stream_client {
namespace connector {
template <typename Stream>
class base_connector
{
public:
using stream_type = typename std::remove_reference<Stream>::type;
using protocol_type = typename stream_type::protocol_type;
using endpoint_type = typename stream_type::endpoint_type;
using endpoint_container_type = std::vector<endpoint_type>;
using resolver_type = ::stream_client::resolver::base_resolver<protocol_type>;
using resolve_flags_type = typename resolver_type::resolve_flags_type;
using resolver_endpoint_iterator_type = typename resolver_type::iterator_type;
using clock_type = typename stream_type::clock_type;
using time_duration_type = typename stream_type::time_duration_type;
using time_point_type = typename stream_type::time_point_type;
base_connector(const std::string& host, const std::string& port, time_duration_type resolve_timeout,
time_duration_type connect_timeout, time_duration_type operation_timeout,
::stream_client::resolver::ip_family ip_family = resolver_type::kDefaultIPFamily,
resolve_flags_type resolve_flags = resolver_type::kDefaultFlags);
base_connector(const base_connector<Stream>& other) = delete;
base_connector<Stream>& operator=(const base_connector<Stream>& other) = delete;
base_connector(base_connector<Stream>&& other) = delete;
base_connector<Stream>& operator=(base_connector<Stream>&& other) = delete;
virtual ~base_connector();
std::unique_ptr<stream_type> new_session(boost::system::error_code& ec, const time_point_type& deadline);
inline std::unique_ptr<stream_type> new_session(boost::system::error_code& ec, const time_duration_type& timeout)
{
return new_session(ec, clock_type::now() + timeout);
}
inline std::unique_ptr<stream_type> new_session(boost::system::error_code& ec)
{
return new_session(ec, get_connect_timeout());
}
inline std::unique_ptr<stream_type> new_session()
{
boost::system::error_code ec;
auto session = new_session(ec);
if (ec) {
throw boost::system::system_error{ec, "Failed to establish new session to " + get_target()};
}
if (!session) {
throw boost::system::system_error{boost::asio::error::operation_aborted,
"Failed to establish new session to " + get_target()};
}
return session;
}
inline std::string get_host() const
{
return host_;
}
inline std::string get_port() const
{
return port_;
}
inline std::string get_target() const
{
return get_host() + ":" + get_port();
}
inline const time_duration_type& get_resolve_timeout() const
{
return resolve_timeout_;
}
inline const time_duration_type& get_connect_timeout() const
{
return connect_timeout_;
}
inline const time_duration_type& get_operation_timeout() const
{
return operation_timeout_;
}
protected:
virtual std::unique_ptr<stream_type> connect_until(const endpoint_type& peer_endpoint,
const time_point_type& until_time) const;
void resolve_routine();
inline void update_endpoints(resolver_endpoint_iterator_type&& resolved_endpoints)
{
endpoint_container_type new_endpoints;
while (resolved_endpoints != resolver_endpoint_iterator_type()) {
new_endpoints.emplace_back(std::move(*resolved_endpoints));
++resolved_endpoints;
}
std::unique_lock<std::mutex> lk(endpoints_mutex_);
endpoints_ = std::move(new_endpoints);
}
inline endpoint_container_type get_endpoints()
{
std::unique_lock<std::mutex> lk(endpoints_mutex_);
return endpoints_;
}
inline void set_resolve_error(const boost::system::error_code& err)
{
std::unique_lock<std::mutex> lk(resolve_error_mutex_);
resolve_error_ = err;
}
inline boost::system::error_code get_resolve_error()
{
std::unique_lock<std::mutex> lk(resolve_error_mutex_);
return resolve_error_;
}
inline void notify_resolve_needed()
{
std::unique_lock<std::timed_mutex> resolve_needed_lk(resolve_needed_mutex_);
resolve_needed_ = true;
resolve_needed_lk.unlock();
resolve_needed_cv_.notify_all();
}
inline void notify_resolve_done()
{
std::unique_lock<std::timed_mutex> resolve_done_lk(resolve_done_mutex_);
resolve_done_ = true;
resolve_done_lk.unlock();
resolve_done_cv_.notify_all();
}
private:
const std::string host_;
const std::string port_;
const time_duration_type resolve_timeout_;
const time_duration_type connect_timeout_;
const time_duration_type operation_timeout_;
resolver_type resolver_;
endpoint_container_type endpoints_;
mutable std::mutex endpoints_mutex_;
std::atomic_bool resolving_thread_running_{true};
std::thread resolving_thread_;
boost::system::error_code resolve_error_;
mutable std::mutex resolve_error_mutex_;
/* used to implement waits on update/done events without polling */
bool resolve_needed_{true};
std::timed_mutex resolve_needed_mutex_;
std::condition_variable_any resolve_needed_cv_;
bool resolve_done_{false};
std::timed_mutex resolve_done_mutex_;
std::condition_variable_any resolve_done_cv_;
};
using tcp_connector = base_connector<::stream_client::tcp_client>;
using udp_connector = base_connector<::stream_client::udp_client>;
using ssl_connector = base_connector<::stream_client::ssl::ssl_client>;
using http_connector = base_connector<::stream_client::http::http_client>;
using https_connector = base_connector<::stream_client::http::https_client>;
} // namespace connector
} // namespace stream_client
#include "impl/connector.ipp"