.. _program_listing_file_stream-client_connector_connector.hpp: Program Listing for File connector.hpp ====================================== |exhale_lsh| :ref:`Return to documentation for file ` (``stream-client/connector/connector.hpp``) .. |exhale_lsh| unicode:: U+021B0 .. UPWARDS ARROW WITH TIP LEFTWARDS .. code-block:: cpp #pragma once #include "stream-client/resolver/resolver.hpp" #include "stream-client/stream/http_socket.hpp" #include #include #include namespace stream_client { namespace connector { template class base_connector { public: using stream_type = typename std::remove_reference::type; using protocol_type = typename stream_type::protocol_type; using endpoint_type = typename stream_type::endpoint_type; using endpoint_container_type = std::vector; using resolver_type = ::stream_client::resolver::base_resolver; using resolve_flags_type = typename resolver_type::resolve_flags_type; using resolver_endpoint_iterator_type = typename resolver_type::iterator_type; using clock_type = typename stream_type::clock_type; using time_duration_type = typename stream_type::time_duration_type; using time_point_type = typename stream_type::time_point_type; base_connector(const std::string& host, const std::string& port, time_duration_type resolve_timeout, time_duration_type connect_timeout, time_duration_type operation_timeout, ::stream_client::resolver::ip_family ip_family = resolver_type::kDefaultIPFamily, resolve_flags_type resolve_flags = resolver_type::kDefaultFlags); base_connector(const base_connector& other) = delete; base_connector& operator=(const base_connector& other) = delete; base_connector(base_connector&& other) = delete; base_connector& operator=(base_connector&& other) = delete; virtual ~base_connector(); std::unique_ptr new_session(boost::system::error_code& ec, const time_point_type& deadline); inline std::unique_ptr new_session(boost::system::error_code& ec, const time_duration_type& timeout) { return new_session(ec, clock_type::now() + timeout); } inline std::unique_ptr new_session(boost::system::error_code& ec) { return new_session(ec, get_connect_timeout()); } inline std::unique_ptr new_session() { boost::system::error_code ec; auto session = new_session(ec); if (ec) { throw boost::system::system_error{ec, "Failed to establish new session to " + get_target()}; } if (!session) { throw boost::system::system_error{boost::asio::error::operation_aborted, "Failed to establish new session to " + get_target()}; } return session; } inline std::string get_host() const { return host_; } inline std::string get_port() const { return port_; } inline std::string get_target() const { return get_host() + ":" + get_port(); } inline const time_duration_type& get_resolve_timeout() const { return resolve_timeout_; } inline const time_duration_type& get_connect_timeout() const { return connect_timeout_; } inline const time_duration_type& get_operation_timeout() const { return operation_timeout_; } protected: virtual std::unique_ptr connect_until(const endpoint_type& peer_endpoint, const time_point_type& until_time) const; void resolve_routine(); inline void update_endpoints(resolver_endpoint_iterator_type&& resolved_endpoints) { endpoint_container_type new_endpoints; while (resolved_endpoints != resolver_endpoint_iterator_type()) { new_endpoints.emplace_back(std::move(*resolved_endpoints)); ++resolved_endpoints; } std::unique_lock lk(endpoints_mutex_); endpoints_ = std::move(new_endpoints); } inline endpoint_container_type get_endpoints() { std::unique_lock lk(endpoints_mutex_); return endpoints_; } inline void set_resolve_error(const boost::system::error_code& err) { std::unique_lock lk(resolve_error_mutex_); resolve_error_ = err; } inline boost::system::error_code get_resolve_error() { std::unique_lock lk(resolve_error_mutex_); return resolve_error_; } inline void notify_resolve_needed() { std::unique_lock resolve_needed_lk(resolve_needed_mutex_); resolve_needed_ = true; resolve_needed_lk.unlock(); resolve_needed_cv_.notify_all(); } inline void notify_resolve_done() { std::unique_lock resolve_done_lk(resolve_done_mutex_); resolve_done_ = true; resolve_done_lk.unlock(); resolve_done_cv_.notify_all(); } private: const std::string host_; const std::string port_; const time_duration_type resolve_timeout_; const time_duration_type connect_timeout_; const time_duration_type operation_timeout_; resolver_type resolver_; endpoint_container_type endpoints_; mutable std::mutex endpoints_mutex_; std::atomic_bool resolving_thread_running_{true}; std::thread resolving_thread_; boost::system::error_code resolve_error_; mutable std::mutex resolve_error_mutex_; /* used to implement waits on update/done events without polling */ bool resolve_needed_{true}; std::timed_mutex resolve_needed_mutex_; std::condition_variable_any resolve_needed_cv_; bool resolve_done_{false}; std::timed_mutex resolve_done_mutex_; std::condition_variable_any resolve_done_cv_; }; using tcp_connector = base_connector<::stream_client::tcp_client>; using udp_connector = base_connector<::stream_client::udp_client>; using ssl_connector = base_connector<::stream_client::ssl::ssl_client>; using http_connector = base_connector<::stream_client::http::http_client>; using https_connector = base_connector<::stream_client::http::https_client>; } // namespace connector } // namespace stream_client #include "impl/connector.ipp"