I am working on an application where the user will have flexibility to create and destroy a server instance at any port he/she wishes. User can create multi servers operating simultaneously - each dedicated for a particular client and independent of other servers.
I have been working on this for quite sometime now, and finally reached a state where the code is working satisfactorily.
I have referred below examples:
I had real trouble maintaining a list of active servers and destroying any one at will, was throwing exceptions weirdly.
Finally I got it working, Could some one please review it?
I have concerns mainly for:
How to use
io_service::run()
? The way I am using in my code, is it satisfactory?How to ensure clean destruction of server when calling
commManager::dropServer()
?Clean destruction of connection object with respect to ongoing operations and sockets in open state.
Here is my code: Compiled with Visual Studio 2013. Is this code OK to be operated in multithreaded environment?
buffer.hxx:
#pragma once
#include <string>
#include <iostream>
#include <iomanip>
//==========================================================================================================================!
namespace http{
//#define DEBUG ON
#ifdef DEBUG
#define DEBUG_MSG(str) do {std::cout << std::setw(75) << std::left << __FUNCTION__ \
<< std::setw(3) << std::left << ":" << std::setw(5) << std::left << __LINE__ \
<< std::setw(5) << std::left << ":"\
<< std::left << str \
<< std::endl;} while( false )
#else
#define DEBUG_MSG(str) do { } while ( false )
#endif
class CBuffer
{
public:
enum { buffsize = 32 };
CBuffer();
CBuffer(const std::string str);
bool empty() const;
const std::string getString() const;
const char* getReceived() const;
private:
char received_[buffsize];
};
}
buffer.cpp:
#include "buffer.hxx"
#include <string>
#include <mutex>
using namespace http;
CBuffer::CBuffer()
{
received_[0] = '\0';
}
CBuffer::CBuffer(const std::string str)
{
//Truncate if Overflow
auto len = str.size();
if (len >= buffsize)
{
len = buffsize - 1;
}
std::copy(str.begin(), str.begin() + len, received_);
received_[len] = '\0';
}
bool CBuffer::empty() const
{
auto i = 0;
//No need for critical sections as this is read only fn
//and no method to update shared member data except the construction
while (received_[i] != '\0')
i++;
if (i == 0)
return true;
else
return false;
}
const std::string CBuffer::getString() const
{
//No need for critical sections as this is read only fn
//and no method to update shared member data except the construction
return std::string(received_);
}
const char* CBuffer::getReceived() const
{
//No need for critical sections as this is read only fn
//and no method to update shared member data except the construction
return received_;
}
server.hpp:
#ifndef HTTP_SERVER_HPP
#define HTTP_SERVER_HPP
#include <asio.hpp>
#include <string>
#include <boost/noncopyable.hpp>
#include "connection.hpp"
#include "connection_manager.hpp"
namespace http {
class connection;
typedef boost::shared_ptr<connection> connection_ptr;
using weakPtrConnection = boost::weak_ptr < connection > ;
namespace server {
/// The top-level class of the HTTP server.
class server
: public boost::enable_shared_from_this<server>, private boost::noncopyable
{
public:
/// Construct the server to listen on the specified TCP address and port, and
/// serve up files from the given directory.
explicit server(/*asio::io_service& io,*/const std::string& address, const std::string& port);
~server();
/// Run the server's io_service loop.
void run();
void stop();
void Init();
private:
/// Initiate an asynchronous accept operation.
void start_accept();
/// Handle completion of an asynchronous accept operation.
void handle_accept(const asio::error_code& e);
/// Handle a request to stop the server.
void handle_stop();
/// The io_service used to perform asynchronous operations.
asio::io_service io_service_;
/// The signal_set is used to register for process termination notifications.
asio::signal_set signals_;
/// Acceptor used to listen for incoming connections.
asio::ip::tcp::acceptor acceptor_;
/// The connection manager which owns all live connections.
connection_manager connection_manager_;
/// The next connection to be accepted.
connection_ptr new_connection_;
std::string IP_;
std::string port_;
};
} // namespace server
} // namespace http
#endif // HTTP_SERVER_HPP
server.cpp:
#include "server.hpp"
#include <boost/bind.hpp>
#include <signal.h>
#include <boost/thread.hpp>
#include <boost/bind.hpp>
namespace http {
namespace server {
server::server(/*asio::io_service& io,*/const std::string& address, const std::string& port)
: io_service_(/*io*/),
signals_(io_service_),
acceptor_(io_service_),
connection_manager_(),
new_connection_(), IP_(address), port_(port)
{
// Register to handle the signals that indicate when the server should exit.
// It is safe to register for the same signal multiple times in a program,
// provided all registration for the specified signal is made through Asio.
DEBUG_MSG("Created");
signals_.add(SIGINT);
signals_.add(SIGTERM);
#if defined(SIGQUIT)
signals_.add(SIGQUIT);
#endif // defined(SIGQUIT)
//signals_.async_wait(boost::bind(&server::handle_stop, this));
}
void server::Init()
{
// Open the acceptor with the option to reuse the address (i.e. SO_REUSEADDR).
asio::ip::tcp::resolver resolver(io_service_);
asio::ip::tcp::resolver::query query(IP_, port_);
asio::ip::tcp::endpoint endpoint = *resolver.resolve(query);
acceptor_.open(endpoint.protocol());
acceptor_.set_option(asio::ip::tcp::acceptor::reuse_address(true));
acceptor_.bind(endpoint);
acceptor_.listen();
start_accept();
}
server::~server()
{
DEBUG_MSG("Destroyed");
io_service_.stop();
}
void server::run()
{
// The io_service::run() call will block until all asynchronous operations
// have finished. While the server is running, there is always at least one
// asynchronous operation outstanding: the asynchronous accept call waiting
// for new incoming connections.
//io_service_.run();
DEBUG_MSG("Run");
//io_service_.run();
//asio::thread t(boost::bind(&asio::io_service::run, &(io_service_)));
boost::thread t(boost::bind(&asio::io_service::run, &io_service_));
}
void server::start_accept()
{
DEBUG_MSG("Accepting connection");
new_connection_.reset(new connection(io_service_,
connection_manager_));
acceptor_.async_accept(new_connection_->socket(),
boost::bind(&server::handle_accept, shared_from_this(),
asio::placeholders::error));
}
void server::handle_accept(const asio::error_code& e)
{
// Check whether the server was stopped by a signal before this completion
// handler had a chance to run.
if (!acceptor_.is_open())
{
DEBUG_MSG("Acceptor Not Open");
return;
}
if (!e)
{
DEBUG_MSG("Accepted");
connection_manager_.start(new_connection_);
}
start_accept();
}
void server::handle_stop()
{
// The server is stopped by cancelling all outstanding asynchronous
// operations. Once all operations have finished the io_service::run() call
// will exit.
DEBUG_MSG("Stopping Acceptor");
connection_manager_.stop_all();
acceptor_.close();
}
void server::stop()
{
DEBUG_MSG("Request posted to stop server");
io_service_.post(boost::bind(&server::handle_stop, shared_from_this()));
}
} // namespace server
} // namespace http
connection_manager.hpp:
#ifndef HTTP_CONNECTION_MANAGER_HPP
#define HTTP_CONNECTION_MANAGER_HPP
#include <set>
#include <boost/noncopyable.hpp>
#include "connection.hpp"
namespace http {
namespace server {
/// Manages open connections so that they may be cleanly stopped when the server
/// needs to shut down.
class connection_manager
: private boost::noncopyable
{
public:
connection_manager();
~connection_manager();
/// Add the specified connection to the manager and start it.
void start(connection_ptr c);
/// Stop the specified connection.
void stop(connection_ptr c);
/// Stop all connections.
void stop_all();
private:
/// The managed connections.
std::set<connection_ptr> connections_;
};
} // namespace server
} // namespace http
#endif // HTTP_CONNECTION_MANAGER_HPP
connection_manager.cpp:
#include "connection_manager.hpp"
#include <algorithm>
#include <boost/bind.hpp>
namespace http {
namespace server {
connection_manager::connection_manager()
{
DEBUG_MSG("Created");
}
connection_manager::~connection_manager()
{
DEBUG_MSG("Destroyed");
}
void connection_manager::start(connection_ptr c)
{
DEBUG_MSG("Session Added");
connections_.insert(c);
c->start();
}
void connection_manager::stop(connection_ptr c)
{
DEBUG_MSG("Session Deleted");
connections_.erase(c);
c->stop();
}
void connection_manager::stop_all()
{
DEBUG_MSG("Request to stop all");
std::for_each(connections_.begin(), connections_.end(),
boost::bind(&connection::stop, _1));
connections_.clear();
}
} // namespace server
} // namespace http
connection.hpp:
#include "connection_manager.hpp"
#include <algorithm>
#include <boost/bind.hpp>
namespace http {
namespace server {
connection_manager::connection_manager()
{
DEBUG_MSG("Created");
}
connection_manager::~connection_manager()
{
DEBUG_MSG("Destroyed");
}
void connection_manager::start(connection_ptr c)
{
DEBUG_MSG("Session Added");
connections_.insert(c);
c->start();
}
void connection_manager::stop(connection_ptr c)
{
DEBUG_MSG("Session Deleted");
connections_.erase(c);
c->stop();
}
void connection_manager::stop_all()
{
DEBUG_MSG("Request to stop all");
std::for_each(connections_.begin(), connections_.end(),
boost::bind(&connection::stop, _1));
connections_.clear();
}
} // namespace server
} // namespace http
connection.cpp:
#include "connection.hpp"
#include <vector>
#include <boost/bind.hpp>
#include "connection_manager.hpp"
namespace http {
namespace server {
connection::connection(asio::io_service& io_service,
connection_manager& manager)
: socket_(io_service),
connection_manager_(manager)
{
DEBUG_MSG("Session Created");
}
connection::~connection()
{
DEBUG_MSG("Session Destroyed");
}
asio::ip::tcp::socket& connection::socket()
{
return socket_;
}
void connection::start()
{
DEBUG_MSG("Reading");
socket_.async_read_some(asio::buffer(const_cast<char *> (buffer_.getReceived()), buffer_.buffsize),
boost::bind(&connection::handle_read, shared_from_this(),
asio::placeholders::error,
asio::placeholders::bytes_transferred));
}
void connection::stop()
{
DEBUG_MSG("Closed");
socket_.close();
}
void connection::handle_read(const asio::error_code& e,
std::size_t bytes_transferred)
{
if (!e)
{
//DEBUG_MSG(buffer_.getString());
std::cout << buffer_.getString() << std::endl;
start();
}
else if (e != asio::error::operation_aborted)
{
DEBUG_MSG("Read Error");
connection_manager_.stop(shared_from_this());
}
}
void connection::handle_write(const asio::error_code& e)
{
if (!e)
{
// Initiate graceful connection closure.
asio::error_code ignored_ec;
socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ignored_ec);
}
if (e != asio::error::operation_aborted)
{
connection_manager_.stop(shared_from_this());
}
}
} // namespace server
} // namespace http
main.cpp:
#include <iostream>
#include <string>
#include <asio.hpp>
#include <boost/bind.hpp>
#include "server.hpp"
#include <boost/thread.hpp>
#include <set>
using sharedPtr = boost::shared_ptr < http::server::server > ;
using weakPtr = boost::weak_ptr < http::server::server > ;
class commManager{
public:
commManager()
{
DEBUG_MSG("Created");
}
~commManager()
{
DEBUG_MSG("Destroyed");
}
weakPtr createServer(std::string IP, std::string port)
{
DEBUG_MSG("Creating Server");
auto ptr = boost::shared_ptr<http::server::server>(new http::server::server(IP, port));
//auto ptr = std::make_shared<http::server::server>(IP, port);
servers_.insert(ptr);
ptr->Init();
ptr->run();
return weakPtr(ptr);
}
void dropServer(weakPtr ptr)
{
DEBUG_MSG("Dropping Server");
ptr.lock()->stop();
servers_.erase(ptr.lock());
}
private:
std::set<sharedPtr> servers_;
};
int main()
{
try
{
commManager mng;
auto s1 = mng.createServer(/*io,*/"127.0.0.1", "8973");
auto s2 = mng.createServer(/*io,*/"127.0.0.1", "8974");
// Run the server until stopped.
boost::this_thread::sleep_for(boost::chrono::seconds(300));
mng.dropServer(s1);
mng.dropServer(s2);
boost::this_thread::sleep_for(boost::chrono::seconds(15));
}
catch (std::exception& e)
{
std::cerr << "exception: " << e.what() << "\n";
}
system("Pause");
return 0;
}
Client Code to Test the server:
ioservice.hxx:
#include <asio.hpp>
using asio::ip::tcp;
namespace channel{
class CIOService{
public:
CIOService(const CIOService & rhs) = delete;
CIOService & operator = (const CIOService & rhs) = delete;
static CIOService & fetchIOService();
std::shared_ptr<asio::io_service> getIO();
private:
CIOService();
~CIOService();
private:
std::shared_ptr<asio::io_service> pIO_;
};
}
ioservice.cpp:
#include "ioservice.hxx"
#include <iostream>
using namespace channel;
CIOService & CIOService::fetchIOService()
{
static CIOService IO;
return IO;
}
CIOService::CIOService()
{
pIO_ = std::make_shared<asio::io_service>();
std::cout << "IOService Created" << std::endl;
}
std::shared_ptr<asio::io_service> CIOService::getIO()
{
return pIO_;
}
CIOService::~CIOService()
{
}
session.hxx:
#pragma once
#include <iostream>
#include <asio.hpp>
#include <memory>
#include <deque>
using asio::ip::tcp;
namespace channel {
#define defaultIP "127.0.0.1"
#define defaultPort "8000"
struct Buffer
{
enum { buffsize = 32 };
char received_[buffsize];
Buffer(){}
Buffer(std::string str)
{
//Truncate if Overflow
auto len = str.size();
if (len >= buffsize)
{
len = buffsize - 1;
}
std::copy(str.begin(), str.begin() + len, received_);
received_[str.size()] = '\0';
}
};
class CSession{
public:
CSession(std::string ip = defaultIP, std::string port = defaultPort);
~CSession();
void initSession();
void handle_connect(const asio::error_code& error /*error*/, tcp::resolver::iterator i);
void handle_write(const asio::error_code& error /*error*/, size_t bytes_transferred /*bytes_transferred*/);
void handle_read(const asio::error_code& error /*error*/, size_t bytes_transferred /*bytes_transferred*/);
void write(const std::string & message);
void read(Buffer & buff);
void close();
tcp::socket& socket();
bool getState();
private:
void beginWrite(const Buffer & message);
void doClose();
private:
mutable tcp::socket socket_; // client connection
std::string connetIP_;
std::string connectPort_;
std::string identifier_;
Buffer msg_;
std::deque<Buffer> writeQueue_;
bool state_;
};
}
session.cpp:
#include "ioservice.hxx"
#include <string>
#include <mutex>
#include "session.hxx"
#include <sstream>
using namespace channel;
CSession::CSession(std::string ip, std::string port) : connetIP_(ip), connectPort_(port), state_(true),
socket_(*CIOService::fetchIOService().getIO())
{
std::cout << "CSession Created" << std::endl;
}
CSession::~CSession()
{
close();
}
void CSession::initSession()
{
//std::cout << "CSession::initSession() Called" << std::endl;
tcp::resolver resolver(*CIOService::fetchIOService().getIO());
tcp::resolver::query query(connetIP_, connectPort_);
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
asio::async_connect(socket_, endpoint_iterator, std::bind(&CSession::handle_connect, this, /*std::ref(error)*/
std::placeholders::_1, std::placeholders::_2));
}
void CSession::read(Buffer & buff)
{
//std::cout << "CSession::read() Called" << std::endl;
//Check if the socket is open or not?
asio::async_read(socket_, asio::buffer(buff.received_, buff.buffsize),
std::bind(&CSession::handle_read, this,
std::placeholders::_1, std::placeholders::_2));
}
void CSession::write(const std::string& message)
{
//std::cout << "CSession::write() Called" << std::endl;
(*CIOService::fetchIOService().getIO()).post(std::bind(&CSession::beginWrite, this, Buffer(message)));
}
void CSession::beginWrite(const Buffer & message)
{
//Check if the socket is open or not?
bool writeInProgress = !writeQueue_.empty();
writeQueue_.push_back(message);
if (!writeInProgress)
{
asio::async_write(socket_, asio::buffer(writeQueue_.front().received_, writeQueue_.front().buffsize),
std::bind(&CSession::handle_write, this,
std::placeholders::_1, std::placeholders::_2));
}
}
void CSession::close()
{
std::cout << "CSession::close() Called" << std::endl;
(*CIOService::fetchIOService().getIO()).post(std::bind(&CSession::doClose, this));
}
void CSession::doClose()
{
socket_.close();
}
tcp::socket& CSession::socket()
{
return socket_;
}
bool CSession::getState()
{
return state_;
}
void CSession::handle_connect(const asio::error_code& error /*error*/, tcp::resolver::iterator i)
{
//std::cout << "CSession::handle_connect() Called" << std::endl;
if (error){
std::cout << error.message() << std::endl;
state_ = false;
doClose();
return;
}
static std::mutex m;
//Must send the identifier: Think if send failed how should failed identifier be handled
std::stringstream ss;
ss << socket_.local_endpoint();
std::lock_guard<std::mutex> lock(m);
identifier_ = ss.str();
std::cout << identifier_ << " is my identity." << std::endl;
read(msg_);
}
void CSession::handle_write(const asio::error_code& error /*error*/, size_t bytes_transferred /*bytes_transferred*/)
{
//std::cout << "CSession::handle_write() Called" << "(" << __FILE__ << " : " << __LINE__ << ")" << std::endl;
if (!error)
{
//std::cout << bytes_transferred << " bytes written to the socket." << std::endl;
writeQueue_.pop_front();
if (!writeQueue_.empty())
{
asio::async_write(socket_, asio::buffer(writeQueue_.front().received_, writeQueue_.front().buffsize),
std::bind(&CSession::handle_write, this,
std::placeholders::_1, std::placeholders::_2));
}
}
else
{
std::cout << "Write Error Detected" << std::endl;
std::cout << error.message() << std::endl;
state_ = false;
doClose();
return;
}
}
void CSession::handle_read(const asio::error_code& error /*error*/, size_t bytes_transferred /*bytes_transferred*/)
{
if (!error)
{
std::cout.write(msg_.received_, bytes_transferred);
std::cout << "\n";
asio::async_read(socket_,
asio::buffer(msg_.received_, msg_.buffsize),
std::bind(&CSession::handle_read, this,
std::placeholders::_1, std::placeholders::_2));
}
else
{
std::cout << "Read Error Detected" << std::endl;
std::cout << error.message() << std::endl;
state_ = false;
doClose();
return;
}
}
main.cpp:
#include "session.hxx"
#include <thread>
#include <boost/thread/thread.hpp>
#include <asio.hpp>
#include <boost/bind.hpp>
#include "ioservice.hxx"
#include <boost/date_time/posix_time/posix_time.hpp>
using namespace channel;
bool flag = false;
void setFlag(const asio::error_code& /*e*/)
{
flag = true;
}
void Client(std::string IP, std::string port)
{
CSession Session(IP, port);
Session.initSession();
asio::thread t(boost::bind(&asio::io_service::run, &(*CIOService::fetchIOService().getIO())));
asio::deadline_timer timer(*CIOService::fetchIOService().getIO(), boost::posix_time::seconds(375));
timer.async_wait(&setFlag);
while (!flag)
{
Session.write("Client 1");
}
//char line[128];
//while (std::cin.getline(line, 128))
//{
// char msg[128];
// std::cout << "Entered Line size: " << strlen(line) << std::endl;
// memcpy(msg, line, strlen(line) + 1);
// std::cout << "Message to be written is : " << msg << std::endl;
// Session.write(msg);
//}
boost::this_thread::sleep_for(boost::chrono::seconds(60));
Session.close();
t.join();
}
void main()
{
Client("localhost", "8973");
system("Pause");
}