From bc6a7b6cbe2a86d804f4448f15db5a0b75528c3c Mon Sep 17 00:00:00 2001 From: Jeffrey Date: Mon, 14 Feb 2022 00:10:53 -0600 Subject: [PATCH] Remove abstract_tcp_server_cp.* --- .../epee/include/net/abstract_tcp_server_cp.h | 236 ------- .../include/net/abstract_tcp_server_cp.inl | 607 ------------------ 2 files changed, 843 deletions(-) delete mode 100644 contrib/epee/include/net/abstract_tcp_server_cp.h delete mode 100644 contrib/epee/include/net/abstract_tcp_server_cp.inl diff --git a/contrib/epee/include/net/abstract_tcp_server_cp.h b/contrib/epee/include/net/abstract_tcp_server_cp.h deleted file mode 100644 index f10f4203f..000000000 --- a/contrib/epee/include/net/abstract_tcp_server_cp.h +++ /dev/null @@ -1,236 +0,0 @@ -// Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are met: -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above copyright -// notice, this list of conditions and the following disclaimer in the -// documentation and/or other materials provided with the distribution. -// * Neither the name of the Andrey N. Sabelnikov nor the -// names of its contributors may be used to endorse or promote products -// derived from this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY -// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES -// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND -// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// - - -#ifndef _LEVIN_CP_SERVER_H_ -#define _LEVIN_CP_SERVER_H_ - -#include -#include -#include -#include -#include - -#include "misc_log_ex.h" -//#include "threads_helper.h" -#include "syncobj.h" -#define ENABLE_PROFILING -#include "profile_tools.h" -#include "net_utils_base.h" -#include "pragma_comp_defs.h" - -#undef MONERO_DEFAULT_LOG_CATEGORY -#define MONERO_DEFAULT_LOG_CATEGORY "net" - -#define LEVIN_DEFAULT_DATA_BUFF_SIZE 2000 - -namespace epee -{ -namespace net_utils -{ - - template - class cp_server_impl//: public abstract_handler - { - public: - cp_server_impl(/*abstract_handler* phandler = NULL*/); - virtual ~cp_server_impl(); - - bool init_server(int port_no); - bool deinit_server(); - bool run_server(int threads_count = 0); - bool send_stop_signal(); - bool is_stop_signal(); - virtual bool on_net_idle(){return true;} - size_t get_active_connections_num(); - typename TProtocol::config_type& get_config_object(){return m_config;} - private: - enum overlapped_operation_type - { - op_type_recv, - op_type_send, - op_type_stop - }; - - struct io_data_base - { - OVERLAPPED m_overlapped; - WSABUF DataBuf; - overlapped_operation_type m_op_type; - DWORD TotalBuffBytes; - volatile LONG m_is_in_use; - char Buffer[1]; - }; - -PRAGMA_WARNING_PUSH -PRAGMA_WARNING_DISABLE_VS(4355) - template - struct connection: public net_utils::i_service_endpoint - { - connection(typename TProtocol::config_type& ref_config):m_sock(INVALID_SOCKET), m_tprotocol_handler(this, ref_config, context), m_psend_data(NULL), m_precv_data(NULL), m_asked_to_shutdown(0), m_connection_shutwoned(0) - { - } - - //connection():m_sock(INVALID_SOCKET), m_tprotocol_handler(this, m_dummy_config, context), m_psend_data(NULL), m_precv_data(NULL), m_asked_to_shutdown(0), m_connection_shutwoned(0) - //{ - //} - - connection& operator=(const connection& obj) - { - return *this; - } - - bool init_buffers() - { - m_psend_data = (io_data_base*)new char[sizeof(io_data_base) + LEVIN_DEFAULT_DATA_BUFF_SIZE-1]; - m_psend_data->TotalBuffBytes = LEVIN_DEFAULT_DATA_BUFF_SIZE; - m_precv_data = (io_data_base*)new char[sizeof(io_data_base) + LEVIN_DEFAULT_DATA_BUFF_SIZE-1]; - m_precv_data->TotalBuffBytes = LEVIN_DEFAULT_DATA_BUFF_SIZE; - return true; - } - - bool query_shutdown() - { - if(!::InterlockedCompareExchange(&m_asked_to_shutdown, 1, 0)) - { - m_psend_data->m_op_type = op_type_stop; - ::PostQueuedCompletionStatus(m_completion_port, 0, (ULONG_PTR)this, &m_psend_data->m_overlapped); - } - return true; - } - - //bool set_config(typename TProtocol::config_type& config) - //{ - // this->~connection(); - // new(this) connection(config); - // return true; - //} - ~connection() - { - if(m_psend_data) - delete m_psend_data; - - if(m_precv_data) - delete m_precv_data; - } - virtual bool handle_send(const void* ptr, size_t cb) - { - PROFILE_FUNC("[handle_send]"); - if(m_psend_data->TotalBuffBytes < cb) - resize_send_buff((DWORD)cb); - - ZeroMemory(&m_psend_data->m_overlapped, sizeof(OVERLAPPED)); - m_psend_data->DataBuf.len = (u_long)cb;//m_psend_data->TotalBuffBytes; - m_psend_data->DataBuf.buf = m_psend_data->Buffer; - memcpy(m_psend_data->DataBuf.buf, ptr, cb); - m_psend_data->m_op_type = op_type_send; - InterlockedExchange(&m_psend_data->m_is_in_use, 1); - DWORD bytes_sent = 0; - DWORD flags = 0; - int res = 0; - { - PROFILE_FUNC("[handle_send] ::WSASend"); - res = ::WSASend(m_sock, &(m_psend_data->DataBuf), 1, &bytes_sent, flags, &(m_psend_data->m_overlapped), NULL); - } - - if(res == SOCKET_ERROR ) - { - int err = ::WSAGetLastError(); - if(WSA_IO_PENDING == err ) - return true; - } - LOG_ERROR("BIG FAIL: WSASend error code not correct, res=" << res << " last_err=" << err); - ::InterlockedExchange(&m_psend_data->m_is_in_use, 0); - query_shutdown(); - //closesocket(m_psend_data); - return false; - }else if(0 == res) - { - ::InterlockedExchange(&m_psend_data->m_is_in_use, 0); - if(!bytes_sent || bytes_sent != cb) - { - int err = ::WSAGetLastError(); - LOG_ERROR("BIG FAIL: WSASend immediatly complete? but bad results, res=" << res << " last_err=" << err); - query_shutdown(); - return false; - }else - { - return true; - } - } - - return true; - } - bool resize_send_buff(DWORD new_size) - { - if(m_psend_data->TotalBuffBytes >= new_size) - return true; - - delete m_psend_data; - m_psend_data = (io_data_base*)new char[sizeof(io_data_base) + new_size-1]; - m_psend_data->TotalBuffBytes = new_size; - LOG_PRINT("Connection buffer resized up to " << new_size, LOG_LEVEL_3); - return true; - } - - - SOCKET m_sock; - net_utils::connection_context_base context; - TProtocol m_tprotocol_handler; - typename TProtocol::config_type m_dummy_config; - io_data_base* m_precv_data; - io_data_base* m_psend_data; - HANDLE m_completion_port; - volatile LONG m_asked_to_shutdown; - volatile LONG m_connection_shutwoned; - }; -PRAGMA_WARNING_POP - - bool worker_thread_member(); - static unsigned CALLBACK worker_thread(void* param); - - bool add_new_connection(SOCKET new_sock, long ip_from, int port_from); - bool shutdown_connection(connection* pconn); - - - typedef std::map > > connections_container; - SOCKET m_listen_socket; - HANDLE m_completion_port; - connections_container m_connections; - critical_section m_connections_lock; - int m_port; - volatile LONG m_stop; - //abstract_handler* m_phandler; - bool m_initialized; - volatile LONG m_worker_thread_counter; - typename TProtocol::config_type m_config; - }; -} -} -#include "abstract_tcp_server_cp.inl" - - -#endif //_LEVIN_SERVER_H_ diff --git a/contrib/epee/include/net/abstract_tcp_server_cp.inl b/contrib/epee/include/net/abstract_tcp_server_cp.inl deleted file mode 100644 index e0ef6470e..000000000 --- a/contrib/epee/include/net/abstract_tcp_server_cp.inl +++ /dev/null @@ -1,607 +0,0 @@ -// Copyright (c) 2006-2013, Andrey N. Sabelnikov, www.sabelnikov.net -// All rights reserved. -// -// Redistribution and use in source and binary forms, with or without -// modification, are permitted provided that the following conditions are met: -// * Redistributions of source code must retain the above copyright -// notice, this list of conditions and the following disclaimer. -// * Redistributions in binary form must reproduce the above copyright -// notice, this list of conditions and the following disclaimer in the -// documentation and/or other materials provided with the distribution. -// * Neither the name of the Andrey N. Sabelnikov nor the -// names of its contributors may be used to endorse or promote products -// derived from this software without specific prior written permission. -// -// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND -// ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED -// WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE -// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER BE LIABLE FOR ANY -// DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES -// (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; -// LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND -// ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT -// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS -// SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -// - - -#pragma comment(lib, "Ws2_32.lib") - -#undef MONERO_DEFAULT_LOG_CATEGORY -#define MONERO_DEFAULT_LOG_CATEGORY "net" - -namespace epee -{ -namespace net_utils -{ -template -cp_server_impl::cp_server_impl(): - m_port(0), m_stop(false), - m_worker_thread_counter(0), m_listen_socket(INVALID_SOCKET) -{ -} -//------------------------------------------------------------- -template -cp_server_impl::~cp_server_impl() -{ - deinit_server(); -} -//------------------------------------------------------------- -template -bool cp_server_impl::init_server(int port_no) -{ - m_port = port_no; - - WSADATA wsad = {0}; - int err = ::WSAStartup(MAKEWORD(2,2), &wsad); - if ( err != 0 || LOBYTE( wsad.wVersion ) != 2 || HIBYTE( wsad.wVersion ) != 2 ) - { - LOG_ERROR("Could not find a usable WinSock DLL, err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\""); - return false; - } - - m_initialized = true; - - m_listen_socket = ::WSASocket(AF_INET, SOCK_STREAM, IPPROTO_TCP, NULL, 0, WSA_FLAG_OVERLAPPED); - if(INVALID_SOCKET == m_listen_socket) - { - err = ::WSAGetLastError(); - LOG_ERROR("Failed to create socket, err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\""); - return false; - } - - - int opt = 1; - err = setsockopt (m_listen_socket, SOL_SOCKET,SO_REUSEADDR, reinterpret_cast(&opt), sizeof(int)); - if(SOCKET_ERROR == err ) - { - err = ::WSAGetLastError(); - LOG_PRINT("Failed to setsockopt(SO_REUSEADDR), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"", LOG_LEVEL_1); - deinit_server(); - return false; - } - - - sockaddr_in adr = {0}; - adr.sin_family = AF_INET; - adr.sin_addr.s_addr = htonl(INADDR_ANY); - adr.sin_port = (u_short)htons(m_port); - - //binding - err = bind(m_listen_socket, (const sockaddr*)&adr, sizeof(adr )); - if(SOCKET_ERROR == err ) - { - err = ::WSAGetLastError(); - LOG_PRINT("Failed to Bind, err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"", LOG_LEVEL_1); - deinit_server(); - return false; - } - - - m_completion_port = ::CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0); - if(INVALID_HANDLE_VALUE == m_completion_port) - { - err = ::WSAGetLastError(); - LOG_PRINT("Failed to CreateIoCompletionPort, err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"", LOG_LEVEL_1); - deinit_server(); - return false; - } - - - return true; -} -//------------------------------------------------------------- - -//------------------------------------------------------------- -static int CALLBACK CPConditionFunc( - IN LPWSABUF lpCallerId, - IN LPWSABUF lpCallerData, - IN OUT LPQOS lpSQOS, - IN OUT LPQOS lpGQOS, - IN LPWSABUF lpCalleeId, - OUT LPWSABUF lpCalleeData, - OUT GROUP FAR *g, - IN DWORD_PTR dwCallbackData - ) -{ - - /*cp_server_impl* pthis = (cp_server_impl*)dwCallbackData; - if(!pthis) - return CF_REJECT;*/ - /*if(pthis->get_active_connections_num()>=FD_SETSIZE-1) - { - LOG_PRINT("Maximum connections count overfull.", LOG_LEVEL_2); - return CF_REJECT; - }*/ - - return CF_ACCEPT; -} -//------------------------------------------------------------- -template -size_t cp_server_impl::get_active_connections_num() -{ - return m_connections.size(); -} -//------------------------------------------------------------- -template -unsigned CALLBACK cp_server_impl::worker_thread(void* param) -{ - if(!param) - return 0; - - cp_server_impl* pthis = (cp_server_impl*)param; - pthis->worker_thread_member(); - return 1; -} -//------------------------------------------------------------- -template -bool cp_server_impl::worker_thread_member() -{ - LOG_PRINT("Worker thread STARTED", LOG_LEVEL_1); - bool stop_handling = false; - while(!stop_handling) - { - PROFILE_FUNC("[worker_thread]Worker Loop"); - DWORD bytes_transfered = 0; - connection* pconnection = 0; - io_data_base* pio_data = 0; - - { - PROFILE_FUNC("[worker_thread]GetQueuedCompletionStatus"); - BOOL res = ::GetQueuedCompletionStatus (m_completion_port, &bytes_transfered , (PULONG_PTR)&pconnection, (LPOVERLAPPED *)&pio_data, INFINITE); - if (res == 0) - { - // check return code for error - int err = GetLastError(); - LOG_PRINT("GetQueuedCompletionStatus failed with error " << err << " " << log_space::get_win32_err_descr(err), LOG_LEVEL_1); - - if(pio_data) - ::InterlockedExchange(&pio_data->m_is_in_use, 0); - - - continue; - } - } - - if(pio_data) - ::InterlockedExchange(&pio_data->m_is_in_use, 0); - - - - if(!bytes_transfered && !pconnection && !pio_data) - { - //signal to stop - break; - } - if(!pconnection || !pio_data) - { - LOG_PRINT("BIG FAIL: pconnection or pio_data is empty: pconnection=" << pconnection << " pio_data=" << pio_data, LOG_LEVEL_0); - break; - } - - - - if(::InterlockedCompareExchange(&pconnection->m_connection_shutwoned, 0, 0)) - { - LOG_ERROR("InterlockedCompareExchange(&pconnection->m_connection_shutwoned, 0, 0)"); - //DebugBreak(); - } - - if(pio_data->m_op_type == op_type_stop) - { - if(!pconnection) - { - LOG_ERROR("op_type=op_type_stop, but pconnection is empty!!!"); - continue; - } - shutdown_connection(pconnection); - continue;// - } - else if(pio_data->m_op_type == op_type_send) - { - continue; - //do nothing, just queuing request - }else if(pio_data->m_op_type == op_type_recv) - { - PROFILE_FUNC("[worker_thread]m_tprotocol_handler.handle_recv"); - if(bytes_transfered) - { - bool res = pconnection->m_tprotocol_handler.handle_recv(pio_data->Buffer, bytes_transfered); - if(!res) - pconnection->query_shutdown(); - } - else - { - pconnection->query_shutdown(); - continue; - } - - } - - //preparing new request, - - { - PROFILE_FUNC("[worker_thread]RECV Request small loop"); - int res = 0; - while(true) - { - LOG_PRINT("Prepearing data for WSARecv....", LOG_LEVEL_3); - ZeroMemory(&pio_data->m_overlapped, sizeof(OVERLAPPED)); - pio_data->DataBuf.len = pio_data->TotalBuffBytes; - pio_data->DataBuf.buf = pio_data->Buffer; - pio_data->m_op_type = op_type_recv; - //calling WSARecv() and go to completion waiting - DWORD bytes_recvd = 0; - DWORD flags = 0; - - LOG_PRINT("Calling WSARecv....", LOG_LEVEL_3); - ::InterlockedExchange(&pio_data->m_is_in_use, 1); - res = WSARecv(pconnection->m_sock, &(pio_data->DataBuf), 1, &bytes_recvd , &flags, &(pio_data->m_overlapped), NULL); - if(res == SOCKET_ERROR ) - { - int err = ::WSAGetLastError(); - if(WSA_IO_PENDING == err ) - {//go pending, ok - LOG_PRINT("WSARecv return WSA_IO_PENDING", LOG_LEVEL_3); - break; - } - LOG_ERROR("BIG FAIL: WSARecv error code not correct, res=" << res << " last_err=" << err); - ::InterlockedExchange(&pio_data->m_is_in_use, 0); - pconnection->query_shutdown(); - break; - } - break; - /*else if(0 == res) - { - if(!bytes_recvd) - { - ::InterlockedExchange(&pio_data->m_is_in_use, 0); - LOG_PRINT("WSARecv return 0, bytes_recvd=0, graceful close.", LOG_LEVEL_3); - int err = ::WSAGetLastError(); - //LOG_ERROR("BIG FAIL: WSARecv error code not correct, res=" << res << " last_err=" << err); - //pconnection->query_shutdown(); - break; - }else - { - LOG_PRINT("WSARecv return immediatily 0, bytes_recvd=" << bytes_recvd, LOG_LEVEL_3); - //pconnection->m_tprotocol_handler.handle_recv(pio_data->Buffer, bytes_recvd); - } - }*/ - } - } - } - - - LOG_PRINT("Worker thread STOPED", LOG_LEVEL_1); - ::InterlockedDecrement(&m_worker_thread_counter); - return true; -} -//------------------------------------------------------------- -template -bool cp_server_impl::shutdown_connection(connection* pconn) -{ - PROFILE_FUNC("[shutdown_connection]"); - - if(!pconn) - { - LOG_ERROR("Attempt to remove null pptr connection!"); - return false; - } - else - { - LOG_PRINT("Shutting down connection ("<< pconn << ")", LOG_LEVEL_3); - } - m_connections_lock.lock(); - connections_container::iterator it = m_connections.find(pconn->m_sock); - m_connections_lock.unlock(); - if(it == m_connections.end()) - { - LOG_ERROR("Failed to find closing socket=" << pconn->m_sock); - return false; - } - SOCKET sock = it->second->m_sock; - { - PROFILE_FUNC("[shutdown_connection] shutdown, close"); - ::shutdown(it->second->m_sock, SD_SEND ); - } - size_t close_sock_wait_count = 0; - { - LOG_PRINT("Entered to 'in_use wait zone'", LOG_LEVEL_3); - PROFILE_FUNC("[shutdown_connection] wait for in_use"); - while(::InterlockedCompareExchange(&it->second->m_precv_data->m_is_in_use, 1, 1)) - { - - Sleep(100); - close_sock_wait_count++; - } - LOG_PRINT("First step to 'in_use wait zone'", LOG_LEVEL_3); - - - while(::InterlockedCompareExchange(&it->second->m_psend_data->m_is_in_use, 1, 1)) - { - Sleep(100); - close_sock_wait_count++; - } - LOG_PRINT("Leaved 'in_use wait zone'", LOG_LEVEL_3); - } - - ::closesocket(it->second->m_sock); - - ::InterlockedExchange(&it->second->m_connection_shutwoned, 1); - m_connections_lock.lock(); - m_connections.erase(it); - m_connections_lock.unlock(); - LOG_PRINT("Socked " << sock << " closed, wait_count=" << close_sock_wait_count, LOG_LEVEL_2); - return true; -} -//------------------------------------------------------------- -template -bool cp_server_impl::run_server(int threads_count = 0) -{ - int err = listen(m_listen_socket, 100); - if(SOCKET_ERROR == err ) - { - err = ::WSAGetLastError(); - LOG_ERROR("Failed to listen, err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\""); - return false; - } - - if(!threads_count) - { - SYSTEM_INFO si = {0}; - ::GetSystemInfo(&si); - threads_count = si.dwNumberOfProcessors + 2; - } - for(int i = 0; i != threads_count; i++) - { - boost::thread(boost::bind(&cp_server_impl::worker_thread_member, this)); - //HANDLE h_thread = threads_helper::create_thread(worker_thread, this); - InterlockedIncrement(&m_worker_thread_counter); - //::CloseHandle(h_thread); - } - - LOG_PRINT("Numbers of worker threads started: " << threads_count, LOG_LEVEL_1); - - m_stop = false; - while(!m_stop) - { - PROFILE_FUNC("[run_server] main_loop"); - TIMEVAL tv = {0}; - tv.tv_sec = 0; - tv.tv_usec = 100; - fd_set sock_set; - sock_set.fd_count = 1; - sock_set.fd_array[0] = m_listen_socket; - int select_res = 0; - { - PROFILE_FUNC("[run_server] select"); - select_res = select(0, &sock_set, &sock_set, NULL, &tv); - } - - if(SOCKET_ERROR == select_res) - { - err = ::WSAGetLastError(); - LOG_ERROR("Failed to select, err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\""); - return false; - } - if(!select_res) - { - on_net_idle(); - continue; - } - else - { - sockaddr_in adr_from = {0}; - int adr_len = sizeof(adr_from); - SOCKET new_sock = INVALID_SOCKET; - { - PROFILE_FUNC("[run_server] WSAAccept"); - new_sock = ::WSAAccept(m_listen_socket, (sockaddr *)&adr_from, &adr_len, CPConditionFunc, (DWORD_PTR)this); - } - - if(INVALID_SOCKET == new_sock) - { - if(m_stop) - break; - int err = ::WSAGetLastError(); - LOG_PRINT("Failed to WSAAccept, err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"", LOG_LEVEL_2); - continue; - } - LOG_PRINT("Accepted connection (new socket=" << new_sock << ")", LOG_LEVEL_2); - { - PROFILE_FUNC("[run_server] Add new connection"); - add_new_connection(new_sock, adr_from.sin_addr.s_addr, adr_from.sin_port); - } - - } - - } - LOG_PRINT("Closing connections("<< m_connections.size() << ") and waiting...", LOG_LEVEL_2); - m_connections_lock.lock(); - for(connections_container::iterator it = m_connections.begin(); it != m_connections.end(); it++) - { - ::shutdown(it->second->m_sock, SD_BOTH); - ::closesocket(it->second->m_sock); - } - m_connections_lock.unlock(); - size_t wait_count = 0; - while(m_connections.size() && wait_count < 100) - { - ::Sleep(100); - wait_count++; - } - LOG_PRINT("Connections closed OK (wait_count=" << wait_count << ")", LOG_LEVEL_2); - - - LOG_PRINT("Stopping worker threads("<< m_worker_thread_counter << ").", LOG_LEVEL_2); - for(int i = 0; i > ptr; - ptr.reset(new connection(m_config)); - - connection& conn = *ptr.get(); - m_connections[new_sock] = ptr; - LOG_PRINT("Add new connection zone: leaving lock", LOG_LEVEL_3); - m_connections_lock.unlock(); - conn.init_buffers(); - conn.m_sock = new_sock; - conn.context.m_remote_address = address_from; - conn.m_completion_port = m_completion_port; - { - PROFILE_FUNC("[add_new_connection] CreateIoCompletionPort"); - ::CreateIoCompletionPort((HANDLE)new_sock, m_completion_port, (ULONG_PTR)&conn, 0); - } - - //if(NULL == ::CreateIoCompletionPort((HANDLE)new_sock, m_completion_port, (ULONG_PTR)&conn, 0)) - //{ - // int err = ::GetLastError(); - // LOG_PRINT("Failed to CreateIoCompletionPort(associate socket and completion port), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\"", LOG_LEVEL_2); - // return false; - //} - - conn.m_tprotocol_handler.after_init_connection(); - { - PROFILE_FUNC("[add_new_connection] starting loop"); - int res = 0; - while(true)//res!=SOCKET_ERROR) - { - PROFILE_FUNC("[add_new_connection] in loop time"); - conn.m_precv_data->TotalBuffBytes = LEVIN_DEFAULT_DATA_BUFF_SIZE; - ZeroMemory(&conn.m_precv_data->m_overlapped, sizeof(OVERLAPPED)); - conn.m_precv_data->DataBuf.len = conn.m_precv_data->TotalBuffBytes; - conn.m_precv_data->DataBuf.buf = conn.m_precv_data->Buffer; - conn.m_precv_data->m_op_type = op_type_recv; - InterlockedExchange(&conn.m_precv_data->m_is_in_use, 1); - DWORD bytes_recvd = 0; - DWORD flags = 0; - - ::InterlockedExchange(&conn.m_precv_data->m_is_in_use, 1); - { - PROFILE_FUNC("[add_new_connection] ::WSARecv"); - res = ::WSARecv(conn.m_sock, &(conn.m_precv_data->DataBuf), 1, &bytes_recvd , &flags, &(conn.m_precv_data->m_overlapped), NULL); - } - if(res == SOCKET_ERROR ) - { - int err = ::WSAGetLastError(); - if(WSA_IO_PENDING == err ) - { - break; - } - LOG_ERROR("BIG FAIL: WSARecv error code not correct, res=" << res << " last_err=" << err << " " << log_space::get_win32_err_descr(err)); - ::InterlockedExchange(&conn.m_precv_data->m_is_in_use, 0); - conn.query_shutdown(); - //shutdown_connection(&conn); - break; - } - - - break; - /*else if(0 == res) - { - if(!bytes_recvd) - { - PROFILE_FUNC("[add_new_connection] shutdown_connection"); - ::InterlockedExchange(&conn.m_precv_data->m_is_in_use, 0); - conn.query_shutdown(); - //shutdown_connection(&conn); - break; - }else - { - PROFILE_FUNC("[add_new_connection] handle_recv"); - } - }*/ - } - } - - - - return true; -} -//------------------------------------------------------------- -template -bool cp_server_impl::deinit_server() -{ - if(!m_initialized) - return true; - - if(INVALID_SOCKET != m_listen_socket) - { - shutdown(m_listen_socket, SD_BOTH); - int res = closesocket(m_listen_socket); - if(SOCKET_ERROR == res) - { - int err = ::WSAGetLastError(); - LOG_ERROR("Failed to closesocket(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\""); - } - m_listen_socket = INVALID_SOCKET; - } - - int res = ::WSACleanup(); - if(SOCKET_ERROR == res) - { - int err = ::WSAGetLastError(); - LOG_ERROR("Failed to WSACleanup(), err = " << err << " \"" << socket_errors::get_socket_error_text(err) <<"\""); - } - m_initialized = false; - - return true; -} - -//------------------------------------------------------------- -template -bool cp_server_impl::send_stop_signal() -{ - ::InterlockedExchange(&m_stop, 1); - return true; -} -//------------------------------------------------------------- -template -bool cp_server_impl::is_stop_signal() -{ - return m_stop?true:false; -} -//------------------------------------------------------------- -} -}