/* */ #include #include #include "MessageBroker.h" #include "Message.h" #include "NetworkedClient.h" #include #include #include "../TESContext.h" #include "../TESManager.h" #include namespace distributed2 { MessageBroker MessageBroker::broker; MessageBroker::MessageBroker() {} MessageBroker::~MessageBroker() { // loopInboxes reset(); } template void MessageBroker::clear(std::map*>>& inbox) { for (auto it1 = inbox.begin(); it1 != inbox.end(); ++it1) { for (auto it2 = (*it1).second.begin(); it2 != (*it1).second.end(); ++it2) { delete (*it2).second; (*it2).second = nullptr; } } } MessageBroker& MessageBroker::get() { return broker; } Tuple* MessageBroker::getTuple(const int eid, const int slot) { // read LoopInbox first until empty if (!loopInboxes[eid][slot]->empty()) { Tuple* tuple = loopInboxes[eid][slot]->front(); loopInboxes[eid][slot]->pop(); return tuple; } // else: getTuple from MessageServer boost::unique_lock lock1 {servers2mtx[eid][slot]}; //while (!ready[eid][slot] ) { while (servers2[eid][slot].empty() ) { readycond[eid][slot].wait(lock1); } boost::mutex* m = lock1.release(); m->unlock(); while(true) { assert(!servers2[eid][slot].empty()); // addServer has been called earlier if (serversIterator[eid][slot] == servers2[eid][slot].end() ) { // make it circular serversIterator[eid][slot] = servers2[eid][slot].begin(); } MessageServer* ms = *(serversIterator[eid][slot]); if (ms->hasMessage(eid,slot)) { // besser als hasTuple Tuple* tuple = ms->getTuple(eid,slot); // gibt nullptr zurück, //wenn nullptr, sonst if (tuple == nullptr) { // Finish-Message incrementFinished(eid,slot); removeMessageServer(eid,slot); // every MessageServer //receives only one finish message if (allfinished(eid,slot)) { return nullptr; } } else { // regular tuple (serversIterator[eid][slot])++; return tuple; } } else { // ms has no message removeMessageServer(eid,slot); } // List empty? boost::unique_lock lock2 {servers2mtx[eid][slot]}; while (servers2[eid][slot].empty() ) { readycond[eid][slot].wait(lock2); } } } void MessageBroker::addServer(const int eid, const int slot, MessageServer* server) { // thesis one: if getTuple() uses a MessageServer who has at // least one tuple, we possibly avoid erasing a MessageServer // from servers2[eid][slot] who has not yet received its next tuple. boost::unique_lock lock {servers2mtx[eid][slot]}; if (servers2[eid][slot].empty()) { // initialized on the fly servers2[eid][slot].push_back(server); // first elem must be pushed. // An iterator to an empty list points to the end serversIterator[eid][slot] = servers2[eid][slot].begin(); // now the iterator points to an elem } auto insertit = serversIterator[eid][slot]; insertit++; // might be the end servers2[eid][slot].insert(insertit,server); } void MessageBroker::wakeup(const int eid, const int slot) { readycond[eid][slot].notify_one(); } void MessageBroker::removeMessageServer(const int eid, const int slot) { boost::unique_lock lock {servers2mtx[eid][slot]}; std::list::iterator remove = serversIterator[eid][slot]; (serversIterator[eid][slot])++; servers2[eid][slot].erase(remove); } bool MessageBroker::allfinished(const int eid, const int slot) { return servers.size() == finished[eid][slot]; // LoopbackProxy can be spared out } void MessageBroker::incrementFinished(const int eid, const int slot) { finished[eid][slot]++; } void MessageBroker::pushTuple(const int eid, const int slot, Tuple* tuple) { loopInboxes[eid][slot]->push(tuple); } bool MessageBroker::startTcpListener(const int port) { try { BOOST_LOG_TRIVIAL(info) << "start message server on port " << port; tcpListener = std::make_shared( boost::bind(&MessageBroker::acceptConnections, this, port) ); return true; } catch (std::exception &e) { BOOST_LOG_TRIVIAL(error) << e.what(); return false; } catch (boost::exception &e) { BOOST_LOG_TRIVIAL(error) << "Caught boost exception"; return false; } } void MessageBroker::acceptConnections(const int port) { try { globalSocket = std::shared_ptr(Socket::CreateGlobal("localhost", std::to_string(port))); while (!boost::this_thread::interruption_requested()) { std::shared_ptr serverSocket(globalSocket->Accept()); boost::this_thread::interruption_point(); if (boost::this_thread::interruption_requested() || serverSocket == nullptr || !serverSocket->IsOk()) { continue; } auto messageServer = std::make_shared(serverSocket); servers.push_back(messageServer); boost::this_thread::interruption_point(); } } catch (boost::thread_interrupted &interrupted) { BOOST_LOG_TRIVIAL(debug) << "Interrupted. Return"; } if (globalSocket != nullptr) { globalSocket->Close(); } } bool MessageBroker::startClient(const int worker, const RemoteEndpoint host) { try { auto client = std::make_shared(host); workerToClient.insert(std::make_pair(worker, client)); return true; } catch (std::exception &e) { return false; } } bool MessageBroker::startLoopbackProxy(const int worker) { auto loopbackProxy = std::make_shared(); workerToClient.insert(std::make_pair(worker, loopbackProxy)); return true; } void MessageBroker::reset() { stopServers(); stopClients(); //clear(inboxes); clear(loopInboxes); } void MessageBroker::stopServers() { if (tcpListenerRunning()) { tcpListener->interrupt(); tcpListener = nullptr; globalSocket = nullptr; } for (auto server : servers) { server->interrupt(); } servers.clear(); } void MessageBroker::stopClients() { workerToClient.clear(); } void MessageBroker::sendTuple(const int eid, const int slot, const int workerNumber, Tuple* tuple) { if (workerToClient.find(workerNumber) == workerToClient.end()) { BOOST_LOG_TRIVIAL(warning) << "no client set up for worker " << workerNumber << ". Will delete message"; return; } std::shared_ptr client = workerToClient.at(workerNumber); client->sendTuple(eid,slot,tuple); } void MessageBroker::broadcastFinishMessage(const int eid) { for (auto it = workerToClient.begin(); it != workerToClient.end(); ++it) { auto client = (*it).second; client->sendFinishMessage(eid); } } bool MessageBroker::tcpListenerRunning() { if (tcpListener == nullptr) { return false; } if (!tcpListener->joinable()) { return false; } if (tcpListener->try_join_for(boost::chrono::nanoseconds(1))) { return false; } return true; } void MessageBroker::setNumberOfSlots(const int eid, const int slots) { slotCountMap[eid] = slots; initializeInboxes(eid,slots); } int MessageBroker::getNumberOfSlots(const int eid) { assert (slotCountMap[eid] > 0); return slotCountMap[eid]; } void MessageBroker::initializeInboxes(const int eid, const int slots) { for (int i = 0; i < slots; ++i) { loopInboxes[eid][i] = new std::queue; } for (auto server : servers) { server->initializeInboxes(eid, slots); } } }