343 lines
9.4 KiB
C++
343 lines
9.4 KiB
C++
|
|
/*
|
||
|
|
----
|
||
|
|
This file is part of SECONDO.
|
||
|
|
|
||
|
|
Copyright (C) 2004, University in Hagen, Department of Computer Science,
|
||
|
|
Database Systems for New Applications.
|
||
|
|
|
||
|
|
SECONDO is free software; you can redistribute it and/or modify
|
||
|
|
it under the terms of the GNU General Public License as published by
|
||
|
|
the Free Software Foundation; either version 2 of the License, or
|
||
|
|
(at your option) any later version.
|
||
|
|
|
||
|
|
SECONDO is distributed in the hope that it will be useful,
|
||
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||
|
|
GNU General Public License for more details.
|
||
|
|
|
||
|
|
You should have received a copy of the GNU General Public License
|
||
|
|
along with SECONDO; if not, write to the Free Software
|
||
|
|
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
||
|
|
----
|
||
|
|
|
||
|
|
//paragraph [1] Title: [{\Large \bf \begin {center}] [\end {center}}]
|
||
|
|
//[TOC] [\tableofcontents]
|
||
|
|
|
||
|
|
[1] Header File of the class ~PregelAlgebra~
|
||
|
|
|
||
|
|
November 2018, J. Mende
|
||
|
|
|
||
|
|
|
||
|
|
[TOC]
|
||
|
|
|
||
|
|
1 Overview
|
||
|
|
|
||
|
|
This file defines the members of class MessageBroker
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
#include <iostream>
|
||
|
|
#include <memory>
|
||
|
|
#include "MessageBroker.h"
|
||
|
|
#include "NetworkedClient.h"
|
||
|
|
#include "LoopbackProxy.h"
|
||
|
|
#include "../typedefs.h"
|
||
|
|
#include <pthread.h>
|
||
|
|
#include <boost/log/trivial.hpp>
|
||
|
|
#include "../Helpers/Metrics.h"
|
||
|
|
#include "../PregelContext.h"
|
||
|
|
#include <StandardTypes.h>
|
||
|
|
|
||
|
|
namespace pregel {
|
||
|
|
MessageBroker MessageBroker::broker;
|
||
|
|
|
||
|
|
MessageBroker::MessageBroker() : inbox() {}
|
||
|
|
|
||
|
|
MessageBroker::~MessageBroker() {
|
||
|
|
stopServers(false);
|
||
|
|
stopClients();
|
||
|
|
}
|
||
|
|
|
||
|
|
MessageBroker &MessageBroker::get() {
|
||
|
|
return broker;
|
||
|
|
}
|
||
|
|
|
||
|
|
bool MessageBroker::startTcpListener(const int port) {
|
||
|
|
try {
|
||
|
|
BOOST_LOG_TRIVIAL(info) << "start message server on port " << port;
|
||
|
|
tcpListener = std::make_shared<boost::thread>(
|
||
|
|
boost::bind(&MessageBroker::acceptConnections, this, port)
|
||
|
|
);
|
||
|
|
return true;
|
||
|
|
} catch (std::exception &e) {
|
||
|
|
BOOST_LOG_TRIVIAL(error) << e.what();
|
||
|
|
return false;
|
||
|
|
} catch (boost::exception &e) {
|
||
|
|
BOOST_LOG_TRIVIAL(error) << "Caught boost exception";
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void MessageBroker::acceptConnections(const int port) {
|
||
|
|
try {
|
||
|
|
globalSocket = std::shared_ptr<Socket>(Socket::CreateGlobal("localhost",
|
||
|
|
std::to_string(port)));
|
||
|
|
|
||
|
|
while (!boost::this_thread::interruption_requested()) {
|
||
|
|
std::shared_ptr<Socket> serverSocket(globalSocket->Accept());
|
||
|
|
boost::this_thread::interruption_point();
|
||
|
|
if (boost::this_thread::interruption_requested() ||
|
||
|
|
serverSocket == nullptr || !serverSocket->IsOk()) {
|
||
|
|
continue;
|
||
|
|
}
|
||
|
|
|
||
|
|
executable initDoneMessageHandler = [this]() {
|
||
|
|
pauseServers();
|
||
|
|
collectFromAllServers(SuperstepCounter::get());
|
||
|
|
startServers();
|
||
|
|
};
|
||
|
|
|
||
|
|
auto messageServer = std::make_shared<MessageServer>(serverSocket,
|
||
|
|
initDoneMessageHandler);
|
||
|
|
servers.push_back(messageServer);
|
||
|
|
boost::this_thread::interruption_point();
|
||
|
|
}
|
||
|
|
} catch (boost::thread_interrupted &interrupted) {
|
||
|
|
BOOST_LOG_TRIVIAL(debug) << "Interrupted. Return";
|
||
|
|
}
|
||
|
|
if (globalSocket != nullptr) {
|
||
|
|
globalSocket->Close();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
bool MessageBroker::startClient(const int slot, const RemoteEndpoint host) {
|
||
|
|
try {
|
||
|
|
auto client = std::make_shared<NetworkedClient>(host);
|
||
|
|
slotToClient.insert(std::make_pair(slot, client));
|
||
|
|
return true;
|
||
|
|
} catch (std::exception &e) {
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
bool MessageBroker::startLoopbackProxy(int slot) {
|
||
|
|
try {
|
||
|
|
consumer2<MessageWrapper> loopbackInsert =
|
||
|
|
[this](std::shared_ptr<MessageWrapper> message) {
|
||
|
|
if (message->getType() == MessageWrapper::MessageType::DATA) {
|
||
|
|
this->inbox.push(message, message->getRound());
|
||
|
|
QUEUED_MESSAGE
|
||
|
|
} //else {
|
||
|
|
//delete message;
|
||
|
|
//}
|
||
|
|
};
|
||
|
|
|
||
|
|
auto client = std::make_shared<LoopbackProxy>(loopbackInsert);
|
||
|
|
slotToClient.insert(std::make_pair(slot, client));
|
||
|
|
return true;
|
||
|
|
} catch (std::exception &e) {
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void MessageBroker::reset(const bool killThreads) {
|
||
|
|
stopServers(killThreads);
|
||
|
|
stopClients();
|
||
|
|
}
|
||
|
|
|
||
|
|
void MessageBroker::stopServers(bool killThreads) {
|
||
|
|
if (tcpListenerRunning()) {
|
||
|
|
tcpListener->interrupt();
|
||
|
|
|
||
|
|
const std::string &socketAddress = globalSocket->GetSocketAddress();
|
||
|
|
|
||
|
|
int messageServerPort = PregelContext::get().getMessageServerPort();
|
||
|
|
Socket *dummySocket = nullptr;
|
||
|
|
dummySocket = Socket::Connect(socketAddress, std::to_string(
|
||
|
|
messageServerPort), Socket::SocketDomain::SockGlobalDomain, 10);
|
||
|
|
if (dummySocket != nullptr) {
|
||
|
|
delete dummySocket;
|
||
|
|
tcpListener->join();
|
||
|
|
//delete tcpListener;
|
||
|
|
tcpListener = nullptr;
|
||
|
|
|
||
|
|
if (globalSocket != nullptr) {
|
||
|
|
//delete globalSocket;
|
||
|
|
globalSocket = nullptr;
|
||
|
|
}
|
||
|
|
} else {
|
||
|
|
BOOST_LOG_TRIVIAL(warning) << "There was a problem shutting down "
|
||
|
|
"the MessageServer. "
|
||
|
|
"There may remain open connections.";
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
if(killThreads){
|
||
|
|
for (auto server : servers) {
|
||
|
|
server->interrupt();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
servers.clear();
|
||
|
|
}
|
||
|
|
|
||
|
|
void MessageBroker::pauseServers() {
|
||
|
|
if (!tcpListenerRunning()) {
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
for (auto server : servers) {
|
||
|
|
server->requestPause();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void MessageBroker::stopClients() {
|
||
|
|
slotToClient.clear();
|
||
|
|
}
|
||
|
|
|
||
|
|
void MessageBroker::sendMessage(std::shared_ptr<MessageWrapper> message) {
|
||
|
|
const int destination = message->getDestination();
|
||
|
|
if (slotToClient.find(destination) == slotToClient.end()) {
|
||
|
|
BOOST_LOG_TRIVIAL(warning) << "no client set up with destination "
|
||
|
|
<< destination << ". Will delete message";
|
||
|
|
DISCARDED_MESSAGE
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
std::shared_ptr<MessageClient> client = slotToClient.at(destination);
|
||
|
|
client->sendMessage(message);
|
||
|
|
}
|
||
|
|
|
||
|
|
void MessageBroker::collectFromAllServers(int superstep) {
|
||
|
|
consumer2<MessageWrapper> moveToOwnBuffer =
|
||
|
|
[this, superstep](std::shared_ptr<MessageWrapper> message) {
|
||
|
|
inbox.push(message, superstep);
|
||
|
|
};
|
||
|
|
for (auto server : servers) {
|
||
|
|
server->drainBuffer(moveToOwnBuffer, superstep);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
supplier2<MessageWrapper>* MessageBroker::inboxSupplier(const int superstep) {
|
||
|
|
return new supplier2<MessageWrapper> (inbox.supply(superstep));
|
||
|
|
}
|
||
|
|
|
||
|
|
void MessageBroker::startNewRound(bool &allEmpty,
|
||
|
|
executable &callMeWhenYoureDone) {
|
||
|
|
const unsigned long numberOfConnections = servers.size();
|
||
|
|
|
||
|
|
auto callback = static_cast<std::function<void(bool)> > (
|
||
|
|
[this, &allEmpty, &callMeWhenYoureDone](bool empty) {
|
||
|
|
int superstep = SuperstepCounter::get();
|
||
|
|
collectFromAllServers(superstep);
|
||
|
|
allEmpty &= empty; // not synch, but thread is waiting, or still busy anyway
|
||
|
|
callMeWhenYoureDone();
|
||
|
|
});
|
||
|
|
|
||
|
|
std::shared_ptr<Monitor> monitor =
|
||
|
|
std::make_shared<Monitor>(numberOfConnections - 1,
|
||
|
|
callback);
|
||
|
|
|
||
|
|
for (auto server : servers) {
|
||
|
|
server->setMonitor(monitor);
|
||
|
|
server->startReading();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
unsigned long MessageBroker::howManyMessagesInInbox(int round) {
|
||
|
|
return inbox.size(round);
|
||
|
|
}
|
||
|
|
|
||
|
|
void MessageBroker::broadcastEmptyMessage() {
|
||
|
|
const int superstep = SuperstepCounter::get();
|
||
|
|
for (auto it = slotToClient.begin(); it != slotToClient.end(); ++it) {
|
||
|
|
int destination = (*it).first;
|
||
|
|
auto message = MessageWrapper::constructEmptyMessage(destination, superstep);
|
||
|
|
sendMessage(message);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void MessageBroker::broadcastFinishMessage() {
|
||
|
|
const int superstep = SuperstepCounter::get();
|
||
|
|
for (auto it = slotToClient.begin(); it != slotToClient.end(); ++it) {
|
||
|
|
int destination = (*it).first;
|
||
|
|
auto message = MessageWrapper::constructFinishMessage(destination,
|
||
|
|
superstep);
|
||
|
|
sendMessage(message);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void MessageBroker::broadcastInitDoneMessage() {
|
||
|
|
const int superstep = SuperstepCounter::get();
|
||
|
|
for (auto it = slotToClient.begin(); it != slotToClient.end(); ++it) {
|
||
|
|
int destination = (*it).first;
|
||
|
|
auto message = MessageWrapper::constructInitDoneMessage(destination,
|
||
|
|
superstep);
|
||
|
|
sendMessage(message);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void MessageBroker::healthReport(std::stringstream &sstream) {
|
||
|
|
sstream << "+++Broker" << std::endl;
|
||
|
|
sstream << " Queue: " << inbox << std::endl;
|
||
|
|
sstream << " Clients " << std::endl;
|
||
|
|
|
||
|
|
for (auto client : slotToClient) {
|
||
|
|
sstream << " Client " << client.first << std::endl;
|
||
|
|
client.second->healthReport(sstream);
|
||
|
|
}
|
||
|
|
|
||
|
|
int number = 0;
|
||
|
|
sstream << " Servers" << std::endl;
|
||
|
|
sstream << " TCP-Listener: " << (tcpListenerRunning() ? "up" : "down")
|
||
|
|
<< std::endl;
|
||
|
|
for (auto server : servers) {
|
||
|
|
sstream << " Server " << number++ << std::endl;
|
||
|
|
server->healthReport(sstream);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
bool MessageBroker::tcpListenerRunning() {
|
||
|
|
if (tcpListener == nullptr) {
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
if (!tcpListener->joinable()) {
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
if (tcpListener->try_join_for(boost::chrono::nanoseconds(1))) {
|
||
|
|
return false;
|
||
|
|
}
|
||
|
|
return true;
|
||
|
|
}
|
||
|
|
|
||
|
|
void MessageBroker::expectInitMessages() {
|
||
|
|
for (auto server : servers) {
|
||
|
|
server->startReading();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
void MessageBroker::startServers() {
|
||
|
|
for (auto server : servers) {
|
||
|
|
server->startReading();
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
unsigned long MessageBroker::numberOfClients() const{
|
||
|
|
return slotToClient.size();
|
||
|
|
}
|
||
|
|
|
||
|
|
unsigned long MessageBroker::numberOfServers() const {
|
||
|
|
return servers.size();
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
DoubleQueue& MessageBroker::getInBox() {
|
||
|
|
return inbox;
|
||
|
|
}
|
||
|
|
|
||
|
|
void MessageBroker::clearMessages(){
|
||
|
|
inbox.clear();
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
}
|