325 lines
10 KiB
C++
325 lines
10 KiB
C++
/*
|
|
|
|
*/
|
|
|
|
#include <iostream>
|
|
#include <memory>
|
|
#include "MessageBroker.h"
|
|
#include "Message.h"
|
|
#include "NetworkedClient.h"
|
|
#include <pthread.h>
|
|
#include <boost/log/trivial.hpp>
|
|
#include "../TESContext.h"
|
|
#include "../TESManager.h"
|
|
#include <StandardTypes.h>
|
|
|
|
namespace distributed3 {
|
|
MessageBroker MessageBroker::broker;
|
|
|
|
MessageBroker::MessageBroker() {}
|
|
|
|
MessageBroker::~MessageBroker() { // TODO destroy inboxes and loopInboxes
|
|
|
|
//clear(loopInboxes);
|
|
|
|
//stopServers(false);
|
|
}
|
|
template<typename T>
|
|
void MessageBroker::clear(std::map<int,
|
|
std::map<int, std::queue<T>*>>& inbox) {
|
|
for (auto it1 = inbox.begin(); it1 != inbox.end(); ++it1) {
|
|
for (auto it2 = (*it1).second.begin();
|
|
it2 != (*it1).second.end(); ++it2) {
|
|
std::cout << "\ndelete " << (*it2).first;
|
|
if ((*it2).second) {
|
|
delete (*it2).second;
|
|
(*it2).second = nullptr;
|
|
}
|
|
// TODO erase vielleicht besser
|
|
}
|
|
}
|
|
}
|
|
MessageBroker& MessageBroker::get() {
|
|
return broker;
|
|
}
|
|
|
|
Tuple* MessageBroker::getTuple(const int eid, const int slot) {
|
|
// read LoopInbox first until empty
|
|
if (!loopInboxes[eid][slot]->empty()) {
|
|
Tuple* tuple = loopInboxes[eid][slot]->front();
|
|
loopInboxes[eid][slot]->pop();
|
|
return tuple;
|
|
}
|
|
// else: getTuple from MessageServer
|
|
boost::unique_lock<boost::mutex> lock1 {servers2mtx[eid][slot]};
|
|
//while (!ready[eid][slot] ) {
|
|
while (servers2[eid][slot].empty() ) {
|
|
readycond[eid][slot].wait(lock1);
|
|
}
|
|
boost::mutex* m = lock1.release();
|
|
m->unlock();
|
|
|
|
/* // only needed for alternative in addServer
|
|
|
|
// serversIterator[eid][slot] must be initialized correctly
|
|
// servers2[eid][slot] can't be empty here
|
|
serversIterator[eid][slot] = servers2[eid][slot].begin();
|
|
}
|
|
*/ // alternative end
|
|
|
|
while(true) {
|
|
assert(!servers2[eid][slot].empty()); // addServer has
|
|
// been called earlier
|
|
// make it circular and ensure that the serversIterator does
|
|
// not point to the end.
|
|
// serversIterator[eid][slot] is not modified in the
|
|
// state !servers2[eid][slot].empty() by addServer(). So we need no lock.
|
|
if (serversIterator[eid][slot] == servers2[eid][slot].end() ) { // make
|
|
// it circular
|
|
serversIterator[eid][slot] = servers2[eid][slot].begin();
|
|
}
|
|
MessageServer* ms = *(serversIterator[eid][slot]);
|
|
|
|
if (ms->hasTuple(eid,slot)) {
|
|
(serversIterator[eid][slot])++; // it is not possible to
|
|
// move it to the beginning of the while-loop.
|
|
return ms->getTuple(eid,slot); // Explanation follows later
|
|
}
|
|
removeMessageServer(eid,slot);
|
|
if (allfinished(eid)) {
|
|
if (servers2[eid][slot].empty()) {
|
|
return nullptr;
|
|
}
|
|
continue; // try next MessageServer
|
|
}
|
|
// else: not allfinished(eid)
|
|
boost::unique_lock<boost::mutex> lock2 {servers2mtx[eid][slot]};
|
|
while (servers2[eid][slot].empty() ) {
|
|
readycond[eid][slot].wait(lock2);
|
|
}
|
|
// List not empty -> try next MessageServer
|
|
}
|
|
}
|
|
|
|
|
|
void MessageBroker::removeMessageServer(const int eid, const int slot) {
|
|
boost::unique_lock<boost::mutex> lock {servers2mtx[eid][slot]};
|
|
std::list<MessageServer*>::iterator remove = serversIterator[eid][slot];
|
|
(serversIterator[eid][slot])++;
|
|
servers2[eid][slot].erase(remove);
|
|
}
|
|
|
|
void MessageBroker::addServer(const int eid,
|
|
const int slot,
|
|
MessageServer* server) {
|
|
|
|
// alternative: if getTuple() uses a MessageServer who has
|
|
// at least one tuple, we possibly avoid erasing a MessageServer
|
|
// from servers2[eid][slot] who has not yet received
|
|
// its next tuple.
|
|
boost::unique_lock<boost::mutex> lock {servers2mtx[eid][slot]};
|
|
if (servers2[eid][slot].empty()) { // initialized on the fly
|
|
servers2[eid][slot].push_back(server); // first elem must be pushed.
|
|
// An iterator to an empty list points to the end
|
|
serversIterator[eid][slot] = servers2[eid][slot].begin(); // now
|
|
// the iterator points to an elem
|
|
}
|
|
auto insertit = serversIterator[eid][slot];
|
|
insertit++; // might be the end
|
|
servers2[eid][slot].insert(insertit,server);
|
|
// thesis one end
|
|
|
|
// thesis two: in the naive implementation servers[eid][slot]
|
|
// is locked for the shortest possible duration.
|
|
// getTuple() can be called sooner than in the first solution
|
|
//boost::unique_lock<boost::mutex> lock {servers2mtx[eid][slot]};
|
|
//servers2[eid][slot].push_back(server);
|
|
// thesis two end
|
|
}
|
|
void MessageBroker::wakeup(const int eid, const int slot) {
|
|
readycond[eid][slot].notify_one();
|
|
}
|
|
|
|
void MessageBroker::pushTuple(const int eid, const int slot, Tuple* tuple) {
|
|
// there is only one LoopbackProxy. And inserting tuples is
|
|
// finished before tuples are fetched. So no lock is needed
|
|
if (!loopInboxes[eid][slot]) {
|
|
loopInboxes[eid][slot] = new std::queue<Tuple*>; // braucht es
|
|
// die Initialisierung mit {} ?
|
|
}
|
|
loopInboxes[eid][slot]->push(tuple);
|
|
// no notification needed. All tuples are inserted before any
|
|
// tuple is fetched
|
|
}
|
|
void MessageBroker::incrementFinished(const int eid) {
|
|
boost::lock_guard<boost::mutex> {finishedMutex[eid]};
|
|
finished[eid]++;
|
|
}
|
|
bool MessageBroker::allfinished(const int eid) {
|
|
return servers.size() == finished[eid]; // LoopbackProxy can be spared out
|
|
}
|
|
void MessageBroker::setNumberOfSlots(const int eid, const int slots) {
|
|
slotCountMap[eid] = slots;
|
|
initializeInboxes(eid,slots);
|
|
}
|
|
int MessageBroker::getNumberOfSlots(const int eid) {
|
|
assert (slotCountMap[eid] > 0);
|
|
return slotCountMap[eid];
|
|
}
|
|
void MessageBroker::initializeInboxes(const int eid, const int slots) {
|
|
for (int i = 0; i < slots; ++i) {
|
|
loopInboxes[eid][i] = new std::queue<Tuple*>;
|
|
}
|
|
for (auto server : servers) {
|
|
server->initializeInboxes(eid, slots);
|
|
}
|
|
}
|
|
|
|
bool MessageBroker::startTcpListener(const int port) {
|
|
try {
|
|
BOOST_LOG_TRIVIAL(info) << "start message server on port " << port;
|
|
tcpListener = std::make_shared<boost::thread>(
|
|
boost::bind(&MessageBroker::acceptConnections, this, port)
|
|
);
|
|
return true;
|
|
} catch (std::exception &e) {
|
|
BOOST_LOG_TRIVIAL(error) << e.what();
|
|
return false;
|
|
} catch (boost::exception &e) {
|
|
BOOST_LOG_TRIVIAL(error) << "Caught boost exception";
|
|
return false;
|
|
}
|
|
}
|
|
|
|
void MessageBroker::acceptConnections(const int port) {
|
|
try {
|
|
globalSocket = std::shared_ptr<Socket>(Socket::CreateGlobal("localhost",
|
|
std::to_string(port)));
|
|
|
|
while (!boost::this_thread::interruption_requested()) {
|
|
std::shared_ptr<Socket> serverSocket(globalSocket->Accept());
|
|
boost::this_thread::interruption_point();
|
|
if (boost::this_thread::interruption_requested() ||
|
|
serverSocket == nullptr || !serverSocket->IsOk()) {
|
|
continue;
|
|
}
|
|
|
|
auto messageServer = std::make_shared<MessageServer>(serverSocket);
|
|
servers.push_back(messageServer);
|
|
|
|
boost::this_thread::interruption_point();
|
|
}
|
|
} catch (boost::thread_interrupted &interrupted) {
|
|
BOOST_LOG_TRIVIAL(debug) << "Interrupted. Return";
|
|
}
|
|
if (globalSocket != nullptr) {
|
|
globalSocket->Close();
|
|
}
|
|
}
|
|
|
|
bool MessageBroker::startClient(const int worker,
|
|
const RemoteEndpoint host) {
|
|
try {
|
|
auto client = std::make_shared<NetworkedClient>(host);
|
|
workerToClient.insert(std::make_pair(worker, client));
|
|
return true;
|
|
} catch (std::exception &e) {
|
|
return false;
|
|
}
|
|
}
|
|
|
|
bool MessageBroker::startLoopbackProxy(const int worker) {
|
|
auto loopbackProxy = std::make_shared<LoopbackProxy>();
|
|
workerToClient.insert(std::make_pair(worker, loopbackProxy));
|
|
return true;
|
|
}
|
|
|
|
void MessageBroker::reset() {
|
|
std::cout << "\nServer werden gestoppt:";
|
|
stopServers();
|
|
std::cout << "\nServer gestoppt";
|
|
stopClients();
|
|
std::cout << "\nClients gestoppt";
|
|
clear(loopInboxes);
|
|
std::cout << "\nloopInboxes gelöscht";
|
|
}
|
|
|
|
void MessageBroker::stopServers() {
|
|
if (tcpListenerRunning()) {
|
|
tcpListener->interrupt();
|
|
|
|
const std::string &socketAddress = globalSocket->GetSocketAddress();
|
|
|
|
int messageServerPort = TESContext::get().getMessageServerPort();
|
|
// kann nicht einfach globalSocket zerstört werden?
|
|
Socket *dummySocket = nullptr;
|
|
dummySocket = Socket::Connect(socketAddress, std::to_string(
|
|
messageServerPort), Socket::SocketDomain::SockGlobalDomain, 10);
|
|
if (dummySocket != nullptr) {
|
|
delete dummySocket;
|
|
tcpListener->join();
|
|
//delete tcpListener;
|
|
tcpListener = nullptr;
|
|
|
|
if (globalSocket != nullptr) {
|
|
//delete globalSocket;
|
|
globalSocket = nullptr;
|
|
}
|
|
} else {
|
|
BOOST_LOG_TRIVIAL(warning) << "There was a problem shutting down "
|
|
"the MessageServer. "
|
|
"There may remain open connections.";
|
|
}
|
|
}
|
|
|
|
//if(killThreads){
|
|
for (auto server : servers) {
|
|
server->interrupt();
|
|
}
|
|
//}
|
|
servers.clear();
|
|
}
|
|
|
|
void MessageBroker::stopClients() {
|
|
workerToClient.clear();
|
|
}
|
|
|
|
void MessageBroker::sendTuple(const int eid,
|
|
const int slot,
|
|
const int workerNumber,
|
|
Tuple* tuple) {
|
|
if (workerToClient.find(workerNumber) == workerToClient.end()) { // TODO
|
|
BOOST_LOG_TRIVIAL(warning) << "no client set up for worker "
|
|
<< workerNumber << ". Will delete message";
|
|
//DISCARDED_MESSAGE
|
|
return;
|
|
}
|
|
std::shared_ptr<MessageClient> client = workerToClient.at(workerNumber);
|
|
client->sendTuple(eid,slot,tuple);
|
|
|
|
}
|
|
|
|
void MessageBroker::broadcastFinishMessage(const int eid) {
|
|
for (auto it = workerToClient.begin(); it != workerToClient.end(); ++it) {
|
|
auto client = (*it).second;
|
|
client->sendFinishMessage(eid);
|
|
}
|
|
}
|
|
|
|
bool MessageBroker::tcpListenerRunning() {
|
|
if (tcpListener == nullptr) {
|
|
return false;
|
|
}
|
|
if (!tcpListener->joinable()) {
|
|
return false;
|
|
}
|
|
// wenn interrupt() aufgerufen, wird er jetzt unterbrochen
|
|
if (tcpListener->try_join_for(boost::chrono::nanoseconds(1))) {
|
|
return false;
|
|
}
|
|
return true;
|
|
}
|
|
|
|
}
|
|
|