256 lines
7.7 KiB
C++
256 lines
7.7 KiB
C++
/*
|
|
|
|
*/
|
|
|
|
#include <iostream>
|
|
#include <memory>
|
|
#include "MessageBroker.h"
|
|
#include "Message.h"
|
|
#include "NetworkedClient.h"
|
|
#include <pthread.h>
|
|
#include <boost/log/trivial.hpp>
|
|
#include "../TESContext.h"
|
|
#include "../TESManager.h"
|
|
#include <StandardTypes.h>
|
|
|
|
namespace distributed2 {
|
|
MessageBroker MessageBroker::broker;
|
|
|
|
MessageBroker::MessageBroker() {}
|
|
|
|
MessageBroker::~MessageBroker() { // loopInboxes
|
|
reset();
|
|
}
|
|
template<typename T>
|
|
void MessageBroker::clear(std::map<int,std::map<int, std::queue<T>*>>& inbox) {
|
|
for (auto it1 = inbox.begin(); it1 != inbox.end(); ++it1) {
|
|
for (auto it2 = (*it1).second.begin(); it2 != (*it1).second.end(); ++it2) {
|
|
delete (*it2).second;
|
|
(*it2).second = nullptr;
|
|
}
|
|
}
|
|
}
|
|
MessageBroker& MessageBroker::get() {
|
|
return broker;
|
|
}
|
|
|
|
Tuple* MessageBroker::getTuple(const int eid, const int slot) {
|
|
// read LoopInbox first until empty
|
|
if (!loopInboxes[eid][slot]->empty()) {
|
|
Tuple* tuple = loopInboxes[eid][slot]->front();
|
|
loopInboxes[eid][slot]->pop();
|
|
return tuple;
|
|
}
|
|
// else: getTuple from MessageServer
|
|
|
|
boost::unique_lock<boost::mutex> lock1 {servers2mtx[eid][slot]};
|
|
//while (!ready[eid][slot] ) {
|
|
while (servers2[eid][slot].empty() ) {
|
|
readycond[eid][slot].wait(lock1);
|
|
}
|
|
boost::mutex* m = lock1.release();
|
|
m->unlock();
|
|
|
|
|
|
while(true) {
|
|
assert(!servers2[eid][slot].empty()); // addServer has been called earlier
|
|
if (serversIterator[eid][slot] == servers2[eid][slot].end() ) {
|
|
// make it circular
|
|
serversIterator[eid][slot] = servers2[eid][slot].begin();
|
|
}
|
|
MessageServer* ms = *(serversIterator[eid][slot]);
|
|
|
|
if (ms->hasMessage(eid,slot)) { // besser als hasTuple
|
|
Tuple* tuple = ms->getTuple(eid,slot); // gibt nullptr zurück,
|
|
//wenn nullptr, sonst
|
|
if (tuple == nullptr) { // Finish-Message
|
|
incrementFinished(eid,slot);
|
|
removeMessageServer(eid,slot); // every MessageServer
|
|
//receives only one finish message
|
|
if (allfinished(eid,slot)) {
|
|
return nullptr;
|
|
}
|
|
} else { // regular tuple
|
|
(serversIterator[eid][slot])++;
|
|
return tuple;
|
|
}
|
|
} else { // ms has no message
|
|
removeMessageServer(eid,slot);
|
|
}
|
|
// List empty?
|
|
boost::unique_lock<boost::mutex> lock2 {servers2mtx[eid][slot]};
|
|
while (servers2[eid][slot].empty() ) {
|
|
readycond[eid][slot].wait(lock2);
|
|
}
|
|
}
|
|
}
|
|
void MessageBroker::addServer(const int eid, const int slot,
|
|
MessageServer* server) {
|
|
|
|
// thesis one: if getTuple() uses a MessageServer who has at
|
|
// least one tuple, we possibly avoid erasing a MessageServer
|
|
// from servers2[eid][slot] who has not yet received its next tuple.
|
|
boost::unique_lock<boost::mutex> lock {servers2mtx[eid][slot]};
|
|
if (servers2[eid][slot].empty()) { // initialized on the fly
|
|
servers2[eid][slot].push_back(server); // first elem must be pushed.
|
|
// An iterator to an empty list points to the end
|
|
serversIterator[eid][slot] = servers2[eid][slot].begin();
|
|
// now the iterator points to an elem
|
|
}
|
|
auto insertit = serversIterator[eid][slot];
|
|
insertit++; // might be the end
|
|
servers2[eid][slot].insert(insertit,server);
|
|
}
|
|
void MessageBroker::wakeup(const int eid, const int slot) {
|
|
readycond[eid][slot].notify_one();
|
|
}
|
|
|
|
void MessageBroker::removeMessageServer(const int eid, const int slot) {
|
|
boost::unique_lock<boost::mutex> lock {servers2mtx[eid][slot]};
|
|
std::list<MessageServer*>::iterator remove = serversIterator[eid][slot];
|
|
(serversIterator[eid][slot])++;
|
|
servers2[eid][slot].erase(remove);
|
|
}
|
|
bool MessageBroker::allfinished(const int eid, const int slot) {
|
|
return servers.size() == finished[eid][slot];
|
|
// LoopbackProxy can be spared out
|
|
}
|
|
void MessageBroker::incrementFinished(const int eid, const int slot) {
|
|
finished[eid][slot]++;
|
|
}
|
|
|
|
void MessageBroker::pushTuple(const int eid, const int slot, Tuple* tuple) {
|
|
loopInboxes[eid][slot]->push(tuple);
|
|
}
|
|
|
|
bool MessageBroker::startTcpListener(const int port) {
|
|
try {
|
|
BOOST_LOG_TRIVIAL(info) << "start message server on port " << port;
|
|
tcpListener = std::make_shared<boost::thread>(
|
|
boost::bind(&MessageBroker::acceptConnections, this, port)
|
|
);
|
|
return true;
|
|
} catch (std::exception &e) {
|
|
BOOST_LOG_TRIVIAL(error) << e.what();
|
|
return false;
|
|
} catch (boost::exception &e) {
|
|
BOOST_LOG_TRIVIAL(error) << "Caught boost exception";
|
|
return false;
|
|
}
|
|
}
|
|
|
|
void MessageBroker::acceptConnections(const int port) {
|
|
try {
|
|
globalSocket = std::shared_ptr<Socket>(Socket::CreateGlobal("localhost",
|
|
std::to_string(port)));
|
|
|
|
while (!boost::this_thread::interruption_requested()) {
|
|
std::shared_ptr<Socket> serverSocket(globalSocket->Accept());
|
|
boost::this_thread::interruption_point();
|
|
if (boost::this_thread::interruption_requested() ||
|
|
serverSocket == nullptr || !serverSocket->IsOk()) {
|
|
continue;
|
|
}
|
|
|
|
auto messageServer = std::make_shared<MessageServer>(serverSocket);
|
|
servers.push_back(messageServer);
|
|
|
|
boost::this_thread::interruption_point();
|
|
}
|
|
} catch (boost::thread_interrupted &interrupted) {
|
|
BOOST_LOG_TRIVIAL(debug) << "Interrupted. Return";
|
|
}
|
|
if (globalSocket != nullptr) {
|
|
globalSocket->Close();
|
|
}
|
|
}
|
|
|
|
bool MessageBroker::startClient(const int worker, const RemoteEndpoint host) {
|
|
try {
|
|
auto client = std::make_shared<NetworkedClient>(host);
|
|
workerToClient.insert(std::make_pair(worker, client));
|
|
return true;
|
|
} catch (std::exception &e) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
bool MessageBroker::startLoopbackProxy(const int worker) {
|
|
auto loopbackProxy = std::make_shared<LoopbackProxy>();
|
|
workerToClient.insert(std::make_pair(worker, loopbackProxy));
|
|
return true;
|
|
}
|
|
|
|
void MessageBroker::reset() {
|
|
stopServers();
|
|
stopClients();
|
|
//clear(inboxes);
|
|
clear(loopInboxes);
|
|
}
|
|
void MessageBroker::stopServers() {
|
|
if (tcpListenerRunning()) {
|
|
tcpListener->interrupt();
|
|
tcpListener = nullptr;
|
|
globalSocket = nullptr;
|
|
}
|
|
for (auto server : servers) {
|
|
server->interrupt();
|
|
}
|
|
servers.clear();
|
|
}
|
|
|
|
void MessageBroker::stopClients() {
|
|
workerToClient.clear();
|
|
}
|
|
|
|
void MessageBroker::sendTuple(const int eid, const int slot,
|
|
const int workerNumber, Tuple* tuple) {
|
|
if (workerToClient.find(workerNumber) == workerToClient.end()) {
|
|
BOOST_LOG_TRIVIAL(warning) << "no client set up for worker "
|
|
<< workerNumber << ". Will delete message";
|
|
return;
|
|
}
|
|
std::shared_ptr<MessageClient> client = workerToClient.at(workerNumber);
|
|
client->sendTuple(eid,slot,tuple);
|
|
}
|
|
|
|
void MessageBroker::broadcastFinishMessage(const int eid) {
|
|
for (auto it = workerToClient.begin(); it != workerToClient.end(); ++it) {
|
|
auto client = (*it).second;
|
|
client->sendFinishMessage(eid);
|
|
}
|
|
}
|
|
|
|
bool MessageBroker::tcpListenerRunning() {
|
|
if (tcpListener == nullptr) {
|
|
return false;
|
|
}
|
|
if (!tcpListener->joinable()) {
|
|
return false;
|
|
}
|
|
if (tcpListener->try_join_for(boost::chrono::nanoseconds(1))) {
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
void MessageBroker::setNumberOfSlots(const int eid, const int slots) {
|
|
slotCountMap[eid] = slots;
|
|
initializeInboxes(eid,slots);
|
|
}
|
|
int MessageBroker::getNumberOfSlots(const int eid) {
|
|
assert (slotCountMap[eid] > 0);
|
|
return slotCountMap[eid];
|
|
}
|
|
void MessageBroker::initializeInboxes(const int eid, const int slots) {
|
|
for (int i = 0; i < slots; ++i) {
|
|
loopInboxes[eid][i] = new std::queue<Tuple*>;
|
|
}
|
|
for (auto server : servers) {
|
|
server->initializeInboxes(eid, slots);
|
|
}
|
|
}
|
|
|
|
}
|
|
|