/** @file @author from CrypoNote (see copyright below; Andrey N. Sabelnikov) @monero rfree @brief the connection templated-class for one peer connection */ // Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net // All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: // * Redistributions of source code must retain the above copyright // notice, this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // * Neither the name of the Andrey N. Sabelnikov nor the // names of its contributors may be used to endorse or promote products // derived from this software without specific prior written permission. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND // ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED // WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE // DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY // DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES // (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; // LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND // ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS // SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. // #ifndef _ABSTRACT_TCP_SERVER2_H_ #define _ABSTRACT_TCP_SERVER2_H_ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include "net_utils_base.h" #include "syncobj.h" #include "../../../../src/p2p/connection_basic.hpp" #include "../../../../contrib/otshell_utils/utils.hpp" #include "../../../../src/p2p/network_throttle-detail.hpp" #define ABSTRACT_SERVER_SEND_QUE_MAX_COUNT 1000 namespace epee { namespace net_utils { struct i_connection_filter { virtual bool is_remote_ip_allowed(uint32_t adress)=0; protected: virtual ~i_connection_filter(){} }; enum t_server_role { // type of the server, e.g. so that we will know how to limit it NET = 0, // default (not used? used for misc connections maybe?) TODO RPC = 1, // the rpc commands P2P = 2 // to other p2p node }; /************************************************************************/ /* */ /************************************************************************/ /// Represents a single connection from a client. template class connection : public boost::enable_shared_from_this >, private boost::noncopyable, public i_service_endpoint, public connection_basic { public: typedef typename t_protocol_handler::connection_context t_connection_context; /// Construct a connection with the given io_service. explicit connection( boost::asio::io_service& io_service, typename t_protocol_handler::config_type& config, std::atomic &ref_sock_count, // the ++/-- counter std::atomic &sock_number, // the only increasing ++ number generator i_connection_filter * &pfilter); virtual ~connection(); /// Get the socket associated with the connection. boost::asio::ip::tcp::socket& socket(); /// Start the first asynchronous operation for the connection. bool start(bool is_income, bool is_multithreaded); void get_context(t_connection_context& context_){context_ = context;} void call_back_starter(); private: //----------------- i_service_endpoint --------------------- virtual bool do_send(const void* ptr, size_t cb); ///< (see do_send from i_service_endpoint) virtual bool do_send_chunk(const void* ptr, size_t cb); ///< will send (or queue) a part of data virtual bool close(); virtual bool call_run_once_service_io(); virtual bool request_callback(); virtual boost::asio::io_service& get_io_service(); virtual bool add_ref(); virtual bool release(); //------------------------------------------------------ boost::shared_ptr > safe_shared_from_this(); bool shutdown(); /// Handle completion of a read operation. void handle_read(const boost::system::error_code& e, std::size_t bytes_transferred); /// Handle completion of a write operation. void handle_write(const boost::system::error_code& e, size_t cb); /// Buffer for incoming data. boost::array buffer_; //boost::array buffer_; t_connection_context context; i_connection_filter* &m_pfilter; // TODO what do they mean about wait on destructor?? --rfree : //this should be the last one, because it could be wait on destructor, while other activities possible on other threads t_protocol_handler m_protocol_handler; //typename t_protocol_handler::config_type m_dummy_config; std::list > > m_self_refs; // add_ref/release support critical_section m_self_refs_lock; critical_section m_chunking_lock; // held while we add small chunks of the big do_send() to small do_send_chunk() t_server_role m_connection_type; // for calculate speed (last 60 sec) network_throttle m_throttle_speed_in; network_throttle m_throttle_speed_out; std::mutex m_throttle_speed_in_mutex; std::mutex m_throttle_speed_out_mutex; public: void setRPcStation(); }; /************************************************************************/ /* */ /************************************************************************/ template class boosted_tcp_server : private boost::noncopyable { public: typedef boost::shared_ptr > connection_ptr; typedef typename t_protocol_handler::connection_context t_connection_context; /// Construct the server to listen on the specified TCP address and port, and /// serve up files from the given directory. boosted_tcp_server(); explicit boosted_tcp_server(boost::asio::io_service& external_io_service, t_server_role s_type); ~boosted_tcp_server(); std::map server_type_map; void create_server_type_map(); bool init_server(uint32_t port, const std::string address = "0.0.0.0"); bool init_server(const std::string port, const std::string& address = "0.0.0.0"); /// Run the server's io_service loop. bool run_server(size_t threads_count, bool wait = true, const boost::thread::attributes& attrs = boost::thread::attributes()); /// wait for service workers stop bool timed_wait_server_stop(uint64_t wait_mseconds); /// Stop the server. void send_stop_signal(); bool is_stop_signal_sent(); void set_threads_prefix(const std::string& prefix_name); bool deinit_server(){return true;} size_t get_threads_count(){return m_threads_count;} void set_connection_filter(i_connection_filter* pfilter); bool connect(const std::string& adr, const std::string& port, uint32_t conn_timeot, t_connection_context& cn, const std::string& bind_ip = "0.0.0.0"); template bool connect_async(const std::string& adr, const std::string& port, uint32_t conn_timeot, t_callback cb, const std::string& bind_ip = "0.0.0.0"); typename t_protocol_handler::config_type& get_config_object(){return m_config;} int get_binded_port(){return m_port;} boost::asio::io_service& get_io_service(){return io_service_;} struct idle_callback_conext_base { virtual ~idle_callback_conext_base(){} virtual bool call_handler(){return true;} idle_callback_conext_base(boost::asio::io_service& io_serice): m_timer(io_serice) {} boost::asio::deadline_timer m_timer; uint64_t m_period; }; template struct idle_callback_conext: public idle_callback_conext_base { idle_callback_conext(boost::asio::io_service& io_serice, t_handler& h, uint64_t period): idle_callback_conext_base(io_serice), m_handler(h) {this->m_period = period;} t_handler m_handler; virtual bool call_handler() { return m_handler(); } }; template bool add_idle_handler(t_handler t_callback, uint64_t timeout_ms) { boost::shared_ptr ptr(new idle_callback_conext(io_service_, t_callback, timeout_ms)); //needed call handler here ?... ptr->m_timer.expires_from_now(boost::posix_time::milliseconds(ptr->m_period)); ptr->m_timer.async_wait(boost::bind(&boosted_tcp_server::global_timer_handler, this, ptr)); return true; } bool global_timer_handler(/*const boost::system::error_code& err, */boost::shared_ptr ptr) { //if handler return false - he don't want to be called anymore if(!ptr->call_handler()) return true; ptr->m_timer.expires_from_now(boost::posix_time::milliseconds(ptr->m_period)); ptr->m_timer.async_wait(boost::bind(&boosted_tcp_server::global_timer_handler, this, ptr)); return true; } template bool async_call(t_handler t_callback) { io_service_.post(t_callback); return true; } protected: typename t_protocol_handler::config_type m_config; private: /// Run the server's io_service loop. bool worker_thread(); /// Handle completion of an asynchronous accept operation. void handle_accept(const boost::system::error_code& e); bool is_thread_worker(); /// The io_service used to perform asynchronous operations. std::unique_ptr m_io_service_local_instance; boost::asio::io_service& io_service_; /// Acceptor used to listen for incoming connections. boost::asio::ip::tcp::acceptor acceptor_; std::atomic m_stop_signal_sent; uint32_t m_port; std::atomic m_sock_count; std::atomic m_sock_number; std::string m_address; std::string m_thread_name_prefix; //TODO: change to enum server_type, now used size_t m_threads_count; i_connection_filter* m_pfilter; std::vector > m_threads; boost::thread::id m_main_thread_id; critical_section m_threads_lock; volatile uint32_t m_thread_index; // TODO change to std::atomic t_server_role type; void detach_threads(); /// The next connection to be accepted connection_ptr new_connection_; }; // class <>boosted_tcp_server } // namespace } // namespace #include "abstract_tcp_server2.inl" #endif