Program Listing for File connector.hpp

Return to documentation for file (stream-client/connector/connector.hpp)

#pragma once

#include "stream-client/resolver/resolver.hpp"
#include "stream-client/stream/http_socket.hpp"

#include <atomic>
#include <condition_variable>
#include <mutex>

namespace stream_client {
namespace connector {

template <typename Stream>
class base_connector
{
public:
    using stream_type = typename std::remove_reference<Stream>::type;
    using protocol_type = typename stream_type::protocol_type;
    using endpoint_type = typename stream_type::endpoint_type;
    using endpoint_container_type = std::vector<endpoint_type>;

    using resolver_type = ::stream_client::resolver::base_resolver<protocol_type>;
    using resolve_flags_type = typename resolver_type::resolve_flags_type;
    using resolver_endpoint_iterator_type = typename resolver_type::iterator_type;

    using clock_type = typename stream_type::clock_type;
    using time_duration_type = typename stream_type::time_duration_type;
    using time_point_type = typename stream_type::time_point_type;

    base_connector(const std::string& host, const std::string& port, time_duration_type resolve_timeout,
                   time_duration_type connect_timeout, time_duration_type operation_timeout,
                   ::stream_client::resolver::ip_family ip_family = resolver_type::kDefaultIPFamily,
                   resolve_flags_type resolve_flags = resolver_type::kDefaultFlags);

    base_connector(const base_connector<Stream>& other) = delete;
    base_connector<Stream>& operator=(const base_connector<Stream>& other) = delete;
    base_connector(base_connector<Stream>&& other) = delete;
    base_connector<Stream>& operator=(base_connector<Stream>&& other) = delete;

    virtual ~base_connector();

    std::unique_ptr<stream_type> new_session(boost::system::error_code& ec, const time_point_type& deadline);

    inline std::unique_ptr<stream_type> new_session(boost::system::error_code& ec, const time_duration_type& timeout)
    {
        return new_session(ec, clock_type::now() + timeout);
    }

    inline std::unique_ptr<stream_type> new_session(boost::system::error_code& ec)
    {
        return new_session(ec, get_connect_timeout());
    }

    inline std::unique_ptr<stream_type> new_session()
    {
        boost::system::error_code ec;
        auto session = new_session(ec);
        if (ec) {
            throw boost::system::system_error{ec, "Failed to establish new session to " + get_target()};
        }
        if (!session) {
            throw boost::system::system_error{boost::asio::error::operation_aborted,
                                              "Failed to establish new session to " + get_target()};
        }
        return session;
    }

    inline std::string get_host() const
    {
        return host_;
    }

    inline std::string get_port() const
    {
        return port_;
    }

    inline std::string get_target() const
    {
        return get_host() + ":" + get_port();
    }

    inline const time_duration_type& get_resolve_timeout() const
    {
        return resolve_timeout_;
    }

    inline const time_duration_type& get_connect_timeout() const
    {
        return connect_timeout_;
    }

    inline const time_duration_type& get_operation_timeout() const
    {
        return operation_timeout_;
    }

protected:
    virtual std::unique_ptr<stream_type> connect_until(const endpoint_type& peer_endpoint,
                                                       const time_point_type& until_time) const;

    void resolve_routine();

    inline void update_endpoints(resolver_endpoint_iterator_type&& resolved_endpoints)
    {
        endpoint_container_type new_endpoints;
        while (resolved_endpoints != resolver_endpoint_iterator_type()) {
            new_endpoints.emplace_back(std::move(*resolved_endpoints));
            ++resolved_endpoints;
        }

        std::unique_lock<std::mutex> lk(endpoints_mutex_);
        endpoints_ = std::move(new_endpoints);
    }

    inline endpoint_container_type get_endpoints()
    {
        std::unique_lock<std::mutex> lk(endpoints_mutex_);
        return endpoints_;
    }

    inline void set_resolve_error(const boost::system::error_code& err)
    {
        std::unique_lock<std::mutex> lk(resolve_error_mutex_);
        resolve_error_ = err;
    }

    inline boost::system::error_code get_resolve_error()
    {
        std::unique_lock<std::mutex> lk(resolve_error_mutex_);
        return resolve_error_;
    }

    inline void notify_resolve_needed()
    {
        std::unique_lock<std::timed_mutex> resolve_needed_lk(resolve_needed_mutex_);
        resolve_needed_ = true;
        resolve_needed_lk.unlock();
        resolve_needed_cv_.notify_all();
    }

    inline void notify_resolve_done()
    {
        std::unique_lock<std::timed_mutex> resolve_done_lk(resolve_done_mutex_);
        resolve_done_ = true;
        resolve_done_lk.unlock();
        resolve_done_cv_.notify_all();
    }

private:
    const std::string host_;
    const std::string port_;
    const time_duration_type resolve_timeout_;
    const time_duration_type connect_timeout_;
    const time_duration_type operation_timeout_;

    resolver_type resolver_;

    endpoint_container_type endpoints_;
    mutable std::mutex endpoints_mutex_;

    std::atomic_bool resolving_thread_running_{true};
    std::thread resolving_thread_;

    boost::system::error_code resolve_error_;
    mutable std::mutex resolve_error_mutex_;

    /* used to implement waits on update/done events without polling */
    bool resolve_needed_{true};
    std::timed_mutex resolve_needed_mutex_;
    std::condition_variable_any resolve_needed_cv_;
    bool resolve_done_{false};
    std::timed_mutex resolve_done_mutex_;
    std::condition_variable_any resolve_done_cv_;
};

using tcp_connector = base_connector<::stream_client::tcp_client>;
using udp_connector = base_connector<::stream_client::udp_client>;

using ssl_connector = base_connector<::stream_client::ssl::ssl_client>;

using http_connector = base_connector<::stream_client::http::http_client>;
using https_connector = base_connector<::stream_client::http::https_client>;

} // namespace connector
} // namespace stream_client

#include "impl/connector.ipp"