331 lines
8.7 KiB
C++
331 lines
8.7 KiB
C++
|
|
/*
|
||
|
|
----
|
||
|
|
This file is part of SECONDO.
|
||
|
|
|
||
|
|
Copyright (C) 2004, University in Hagen, Department of Computer Science,
|
||
|
|
Database Systems for New Applications.
|
||
|
|
|
||
|
|
SECONDO is free software; you can redistribute it and/or modify
|
||
|
|
it under the terms of the GNU General Public License as published by
|
||
|
|
the Free Software Foundation; either version 2 of the License, or
|
||
|
|
(at your option) any later version.
|
||
|
|
|
||
|
|
SECONDO is distributed in the hope that it will be useful,
|
||
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
|
GNU General Public License for more details.
|
||
|
|
|
||
|
|
You should have received a copy of the GNU General Public License
|
||
|
|
along with SECONDO; if not, write to the Free Software
|
||
|
|
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||
|
|
----
|
||
|
|
|
||
|
|
//paragraph [1] Title: [{\Large \bf \begin{center}] [\end{center}}]
|
||
|
|
//characters [1] Type: [] []
|
||
|
|
//characters [2] Type: [] []
|
||
|
|
//[ae] [\"{a}]
|
||
|
|
//[oe] [\"{o}]
|
||
|
|
//[ue] [\"{u}]
|
||
|
|
//[ss] [{\ss}]
|
||
|
|
//[Ae] [\"{A}]
|
||
|
|
//[Oe] [\"{O}]
|
||
|
|
//[Ue] [\"{U}]
|
||
|
|
//[x] [$\times $]
|
||
|
|
//[->] [$\rightarrow $]
|
||
|
|
//[toc] [\tableofcontents]
|
||
|
|
|
||
|
|
[1] Implementation of generic Tcp Server.
|
||
|
|
|
||
|
|
[toc]
|
||
|
|
|
||
|
|
1 TcpServer class implementation
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
#include "TcpServer.h"
|
||
|
|
|
||
|
|
namespace continuousqueries {
|
||
|
|
|
||
|
|
// Defaualt Constructor
|
||
|
|
TcpServer::TcpServer(): _port(-1), _running(false) {}
|
||
|
|
|
||
|
|
TcpServer::TcpServer(int port): _port(port), _running(false) {}
|
||
|
|
|
||
|
|
TcpServer::~TcpServer() {}
|
||
|
|
|
||
|
|
void TcpServer::Shutdown()
|
||
|
|
{
|
||
|
|
_running = false;
|
||
|
|
}
|
||
|
|
|
||
|
|
void TcpServer::Run()
|
||
|
|
{
|
||
|
|
if (_port == -1) return;
|
||
|
|
|
||
|
|
int addrlen, new_socket, client_socket[SOMAXCONN],
|
||
|
|
activity, i, valread, sd, max_sd, opt = 1;
|
||
|
|
struct sockaddr_in address;
|
||
|
|
char buffer[MAXPACKETSIZE];
|
||
|
|
fd_set readfds;
|
||
|
|
|
||
|
|
//a message
|
||
|
|
// char const *message = "ECHO Daemon v1.0 \r\n";
|
||
|
|
|
||
|
|
//initialise all client_socket[] to 0 so not checked
|
||
|
|
for (i = 0; i < SOMAXCONN; i++)
|
||
|
|
{
|
||
|
|
client_socket[i] = 0;
|
||
|
|
}
|
||
|
|
|
||
|
|
//create a master socket
|
||
|
|
if( (_master_socket = socket(AF_INET , SOCK_STREAM , 0)) == 0)
|
||
|
|
{
|
||
|
|
perror("socket failed\n");
|
||
|
|
exit(EXIT_FAILURE);
|
||
|
|
}
|
||
|
|
|
||
|
|
//set master socket to allow multiple connections ,
|
||
|
|
//this is just a good habit, it will work without this
|
||
|
|
if( setsockopt(_master_socket, SOL_SOCKET, SO_REUSEADDR, (char *)&opt,
|
||
|
|
sizeof(opt)) < 0 )
|
||
|
|
{
|
||
|
|
perror("setsockopt\n");
|
||
|
|
exit(EXIT_FAILURE);
|
||
|
|
}
|
||
|
|
|
||
|
|
//type of socket created
|
||
|
|
address.sin_family = AF_INET;
|
||
|
|
address.sin_addr.s_addr = INADDR_ANY;
|
||
|
|
address.sin_port = htons( _port );
|
||
|
|
|
||
|
|
//bind the socket to localhost and port
|
||
|
|
if (bind(_master_socket, (struct sockaddr *)&address, sizeof(address))<0)
|
||
|
|
{
|
||
|
|
perror("bind failed\n");
|
||
|
|
exit(EXIT_FAILURE);
|
||
|
|
}
|
||
|
|
printf("Listener on port %d \n", _port);
|
||
|
|
|
||
|
|
//try to specify maximum of 30 pending connections for the master socket
|
||
|
|
if (listen(_master_socket, 30) < 0)
|
||
|
|
{
|
||
|
|
perror("listen");
|
||
|
|
exit(EXIT_FAILURE);
|
||
|
|
}
|
||
|
|
|
||
|
|
//accept the incoming connection
|
||
|
|
addrlen = sizeof(address);
|
||
|
|
puts("Waiting for connections ...\n");
|
||
|
|
|
||
|
|
_running = true;
|
||
|
|
|
||
|
|
struct timeval timeout;
|
||
|
|
|
||
|
|
while(_running)
|
||
|
|
{
|
||
|
|
//clear the socket set
|
||
|
|
FD_ZERO(&readfds);
|
||
|
|
|
||
|
|
// reset the timeout
|
||
|
|
timeout.tv_sec = 10;
|
||
|
|
timeout.tv_usec = 0;
|
||
|
|
|
||
|
|
//add master socket to set
|
||
|
|
FD_SET(_master_socket, &readfds);
|
||
|
|
max_sd = _master_socket;
|
||
|
|
|
||
|
|
//add child sockets to set
|
||
|
|
for ( i = 0 ; i < SOMAXCONN ; i++)
|
||
|
|
{
|
||
|
|
//socket descriptor
|
||
|
|
sd = client_socket[i];
|
||
|
|
|
||
|
|
//if valid socket descriptor then add to read list
|
||
|
|
if(sd > 0) FD_SET( sd , &readfds);
|
||
|
|
|
||
|
|
//highest file descriptor number, need it for the select function
|
||
|
|
if(sd > max_sd) max_sd = sd;
|
||
|
|
}
|
||
|
|
|
||
|
|
//wait for an activity on one of the sockets , timeout is 10 sec
|
||
|
|
activity = select( max_sd + 1 , &readfds , NULL , NULL , &timeout);
|
||
|
|
|
||
|
|
if ((activity < 0) && (errno!=EINTR))
|
||
|
|
{
|
||
|
|
printf("select error\n");
|
||
|
|
} else
|
||
|
|
|
||
|
|
// check if there was an timeout
|
||
|
|
if (activity == 0)
|
||
|
|
{
|
||
|
|
// std::cout << "Timeout... \n";
|
||
|
|
continue;
|
||
|
|
} else
|
||
|
|
|
||
|
|
//If something happened on the master socket,
|
||
|
|
//then its an incoming connection
|
||
|
|
if (FD_ISSET(_master_socket, &readfds))
|
||
|
|
{
|
||
|
|
if ((new_socket = accept(
|
||
|
|
_master_socket,
|
||
|
|
(struct sockaddr *)&address,
|
||
|
|
(socklen_t*)&addrlen)
|
||
|
|
)<0)
|
||
|
|
{
|
||
|
|
perror("accept\n");
|
||
|
|
exit(EXIT_FAILURE);
|
||
|
|
}
|
||
|
|
|
||
|
|
// add new socket to array of sockets on empty
|
||
|
|
// position and inform via message queue
|
||
|
|
for (i = 0; i < SOMAXCONN; i++)
|
||
|
|
{
|
||
|
|
if (client_socket[i] == 0)
|
||
|
|
{
|
||
|
|
client_socket[i] = new_socket;
|
||
|
|
PushMsgToQueue(CreateConnectMsg(new_socket));
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
//else it's some IO operation on some other socket
|
||
|
|
else
|
||
|
|
{
|
||
|
|
for (i = 0; i < SOMAXCONN; i++)
|
||
|
|
{
|
||
|
|
sd = client_socket[i];
|
||
|
|
|
||
|
|
if (FD_ISSET(sd , &readfds))
|
||
|
|
{
|
||
|
|
// Check if it was for closing and read the incoming message
|
||
|
|
if ((valread = read( sd , buffer, MAXPACKETSIZE)) == 0)
|
||
|
|
{
|
||
|
|
// Somebody disconnected, inform via message queue
|
||
|
|
PushMsgToQueue(CreateDisconnectMsg(sd));
|
||
|
|
|
||
|
|
// Close the socket and mark as 0 in list for reuse
|
||
|
|
close( sd );
|
||
|
|
client_socket[i] = 0;
|
||
|
|
}
|
||
|
|
|
||
|
|
// Push the incoming message to the queue
|
||
|
|
else
|
||
|
|
{
|
||
|
|
TcpServer::Message msg = CreateMsg(
|
||
|
|
sd,
|
||
|
|
(std::string) buffer
|
||
|
|
);
|
||
|
|
|
||
|
|
PushMsgToQueue(msg);
|
||
|
|
|
||
|
|
memset(buffer, 0, MAXPACKETSIZE);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
close(_master_socket);
|
||
|
|
}
|
||
|
|
|
||
|
|
TcpServer::Message TcpServer::CreateMsg(int sockd, std::string body)
|
||
|
|
{
|
||
|
|
TcpServer::Message msg;
|
||
|
|
|
||
|
|
msg.body = body;
|
||
|
|
msg.socket = sockd;
|
||
|
|
msg.timestamp = std::chrono::duration_cast<std::chrono::milliseconds> (
|
||
|
|
std::chrono::system_clock::now().time_since_epoch()
|
||
|
|
).count();
|
||
|
|
|
||
|
|
return msg;
|
||
|
|
}
|
||
|
|
|
||
|
|
TcpServer::Message TcpServer::CreateConnectMsg(int sockd)
|
||
|
|
{
|
||
|
|
struct sockaddr_in ad;
|
||
|
|
int adlen = sizeof(ad);
|
||
|
|
|
||
|
|
getpeername(sockd, (struct sockaddr*)&ad, (socklen_t*)&adlen);
|
||
|
|
|
||
|
|
std::string body = "connected|" +
|
||
|
|
(std::string) inet_ntoa(ad.sin_addr) + " " +
|
||
|
|
std::to_string(ntohs(ad.sin_port));
|
||
|
|
|
||
|
|
return TcpServer::CreateMsg(sockd, body);
|
||
|
|
}
|
||
|
|
|
||
|
|
TcpServer::Message TcpServer::CreateDisconnectMsg(int sockd)
|
||
|
|
{
|
||
|
|
struct sockaddr_in ad;
|
||
|
|
int adlen = sizeof(ad);
|
||
|
|
|
||
|
|
getpeername(sockd, (struct sockaddr*)&ad, (socklen_t*)&adlen);
|
||
|
|
|
||
|
|
std::string body = "disconnected|" +
|
||
|
|
(std::string) inet_ntoa(ad.sin_addr) + " " +
|
||
|
|
std::to_string(ntohs(ad.sin_port));
|
||
|
|
|
||
|
|
return TcpServer::CreateMsg(sockd, body);
|
||
|
|
}
|
||
|
|
|
||
|
|
void TcpServer::PushMsgToQueue(TcpServer::Message msg)
|
||
|
|
{
|
||
|
|
std::lock_guard<std::mutex> guard(mqMutex);
|
||
|
|
// std::cout << "TcpServer received '" << msg.body
|
||
|
|
// << "' from socket " << std::to_string(msg.socket)
|
||
|
|
// << ". Pushing to queue.\n";
|
||
|
|
|
||
|
|
messages.push(msg);
|
||
|
|
|
||
|
|
mqCondition.notify_one();
|
||
|
|
}
|
||
|
|
|
||
|
|
int TcpServer::GetPortFromSocket(int sockd)
|
||
|
|
{
|
||
|
|
struct sockaddr_in ad;
|
||
|
|
int adlen = sizeof(ad);
|
||
|
|
|
||
|
|
getpeername(sockd, (struct sockaddr*)&ad, (socklen_t*)&adlen);
|
||
|
|
|
||
|
|
return ntohs(ad.sin_port);
|
||
|
|
}
|
||
|
|
|
||
|
|
std::string TcpServer::GetIpFromSocket(int sockd)
|
||
|
|
{
|
||
|
|
struct sockaddr_in ad;
|
||
|
|
int adlen = sizeof(ad);
|
||
|
|
|
||
|
|
getpeername(sockd, (struct sockaddr*)&ad, (socklen_t*)&adlen);
|
||
|
|
|
||
|
|
return std::string(inet_ntoa(ad.sin_addr));
|
||
|
|
}
|
||
|
|
|
||
|
|
int TcpServer::GetMasterPort()
|
||
|
|
{
|
||
|
|
return _port;
|
||
|
|
}
|
||
|
|
|
||
|
|
int TcpServer::GetMasterSocket()
|
||
|
|
{
|
||
|
|
return _master_socket;
|
||
|
|
}
|
||
|
|
|
||
|
|
bool TcpServer::IsRunning() {
|
||
|
|
return _running;
|
||
|
|
}
|
||
|
|
|
||
|
|
int TcpServer::Send(int socket, std::string msg)
|
||
|
|
{
|
||
|
|
// std::cout << "TcpServer sending '" + msg + "' to Socket '"
|
||
|
|
// << socket << "'.\n";
|
||
|
|
|
||
|
|
int res = send(socket, msg.c_str(), msg.length() + 1, MSG_NOSIGNAL) - 1;
|
||
|
|
if (errno == EPIPE) PushMsgToQueue(CreateDisconnectMsg(socket));
|
||
|
|
|
||
|
|
return res;
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
}
|