Files
2026-01-23 17:03:45 +08:00

256 lines
7.7 KiB
C++

/*
*/
#include <iostream>
#include <memory>
#include "MessageBroker.h"
#include "Message.h"
#include "NetworkedClient.h"
#include <pthread.h>
#include <boost/log/trivial.hpp>
#include "../TESContext.h"
#include "../TESManager.h"
#include <StandardTypes.h>
namespace distributed2 {
MessageBroker MessageBroker::broker;
MessageBroker::MessageBroker() {}
MessageBroker::~MessageBroker() { // loopInboxes
reset();
}
template<typename T>
void MessageBroker::clear(std::map<int,std::map<int, std::queue<T>*>>& inbox) {
for (auto it1 = inbox.begin(); it1 != inbox.end(); ++it1) {
for (auto it2 = (*it1).second.begin(); it2 != (*it1).second.end(); ++it2) {
delete (*it2).second;
(*it2).second = nullptr;
}
}
}
MessageBroker& MessageBroker::get() {
return broker;
}
Tuple* MessageBroker::getTuple(const int eid, const int slot) {
// read LoopInbox first until empty
if (!loopInboxes[eid][slot]->empty()) {
Tuple* tuple = loopInboxes[eid][slot]->front();
loopInboxes[eid][slot]->pop();
return tuple;
}
// else: getTuple from MessageServer
boost::unique_lock<boost::mutex> lock1 {servers2mtx[eid][slot]};
//while (!ready[eid][slot] ) {
while (servers2[eid][slot].empty() ) {
readycond[eid][slot].wait(lock1);
}
boost::mutex* m = lock1.release();
m->unlock();
while(true) {
assert(!servers2[eid][slot].empty()); // addServer has been called earlier
if (serversIterator[eid][slot] == servers2[eid][slot].end() ) {
// make it circular
serversIterator[eid][slot] = servers2[eid][slot].begin();
}
MessageServer* ms = *(serversIterator[eid][slot]);
if (ms->hasMessage(eid,slot)) { // besser als hasTuple
Tuple* tuple = ms->getTuple(eid,slot); // gibt nullptr zurück,
//wenn nullptr, sonst
if (tuple == nullptr) { // Finish-Message
incrementFinished(eid,slot);
removeMessageServer(eid,slot); // every MessageServer
//receives only one finish message
if (allfinished(eid,slot)) {
return nullptr;
}
} else { // regular tuple
(serversIterator[eid][slot])++;
return tuple;
}
} else { // ms has no message
removeMessageServer(eid,slot);
}
// List empty?
boost::unique_lock<boost::mutex> lock2 {servers2mtx[eid][slot]};
while (servers2[eid][slot].empty() ) {
readycond[eid][slot].wait(lock2);
}
}
}
void MessageBroker::addServer(const int eid, const int slot,
MessageServer* server) {
// thesis one: if getTuple() uses a MessageServer who has at
// least one tuple, we possibly avoid erasing a MessageServer
// from servers2[eid][slot] who has not yet received its next tuple.
boost::unique_lock<boost::mutex> lock {servers2mtx[eid][slot]};
if (servers2[eid][slot].empty()) { // initialized on the fly
servers2[eid][slot].push_back(server); // first elem must be pushed.
// An iterator to an empty list points to the end
serversIterator[eid][slot] = servers2[eid][slot].begin();
// now the iterator points to an elem
}
auto insertit = serversIterator[eid][slot];
insertit++; // might be the end
servers2[eid][slot].insert(insertit,server);
}
void MessageBroker::wakeup(const int eid, const int slot) {
readycond[eid][slot].notify_one();
}
void MessageBroker::removeMessageServer(const int eid, const int slot) {
boost::unique_lock<boost::mutex> lock {servers2mtx[eid][slot]};
std::list<MessageServer*>::iterator remove = serversIterator[eid][slot];
(serversIterator[eid][slot])++;
servers2[eid][slot].erase(remove);
}
bool MessageBroker::allfinished(const int eid, const int slot) {
return servers.size() == finished[eid][slot];
// LoopbackProxy can be spared out
}
void MessageBroker::incrementFinished(const int eid, const int slot) {
finished[eid][slot]++;
}
void MessageBroker::pushTuple(const int eid, const int slot, Tuple* tuple) {
loopInboxes[eid][slot]->push(tuple);
}
bool MessageBroker::startTcpListener(const int port) {
try {
BOOST_LOG_TRIVIAL(info) << "start message server on port " << port;
tcpListener = std::make_shared<boost::thread>(
boost::bind(&MessageBroker::acceptConnections, this, port)
);
return true;
} catch (std::exception &e) {
BOOST_LOG_TRIVIAL(error) << e.what();
return false;
} catch (boost::exception &e) {
BOOST_LOG_TRIVIAL(error) << "Caught boost exception";
return false;
}
}
void MessageBroker::acceptConnections(const int port) {
try {
globalSocket = std::shared_ptr<Socket>(Socket::CreateGlobal("localhost",
std::to_string(port)));
while (!boost::this_thread::interruption_requested()) {
std::shared_ptr<Socket> serverSocket(globalSocket->Accept());
boost::this_thread::interruption_point();
if (boost::this_thread::interruption_requested() ||
serverSocket == nullptr || !serverSocket->IsOk()) {
continue;
}
auto messageServer = std::make_shared<MessageServer>(serverSocket);
servers.push_back(messageServer);
boost::this_thread::interruption_point();
}
} catch (boost::thread_interrupted &interrupted) {
BOOST_LOG_TRIVIAL(debug) << "Interrupted. Return";
}
if (globalSocket != nullptr) {
globalSocket->Close();
}
}
bool MessageBroker::startClient(const int worker, const RemoteEndpoint host) {
try {
auto client = std::make_shared<NetworkedClient>(host);
workerToClient.insert(std::make_pair(worker, client));
return true;
} catch (std::exception &e) {
return false;
}
}
bool MessageBroker::startLoopbackProxy(const int worker) {
auto loopbackProxy = std::make_shared<LoopbackProxy>();
workerToClient.insert(std::make_pair(worker, loopbackProxy));
return true;
}
void MessageBroker::reset() {
stopServers();
stopClients();
//clear(inboxes);
clear(loopInboxes);
}
void MessageBroker::stopServers() {
if (tcpListenerRunning()) {
tcpListener->interrupt();
tcpListener = nullptr;
globalSocket = nullptr;
}
for (auto server : servers) {
server->interrupt();
}
servers.clear();
}
void MessageBroker::stopClients() {
workerToClient.clear();
}
void MessageBroker::sendTuple(const int eid, const int slot,
const int workerNumber, Tuple* tuple) {
if (workerToClient.find(workerNumber) == workerToClient.end()) {
BOOST_LOG_TRIVIAL(warning) << "no client set up for worker "
<< workerNumber << ". Will delete message";
return;
}
std::shared_ptr<MessageClient> client = workerToClient.at(workerNumber);
client->sendTuple(eid,slot,tuple);
}
void MessageBroker::broadcastFinishMessage(const int eid) {
for (auto it = workerToClient.begin(); it != workerToClient.end(); ++it) {
auto client = (*it).second;
client->sendFinishMessage(eid);
}
}
bool MessageBroker::tcpListenerRunning() {
if (tcpListener == nullptr) {
return false;
}
if (!tcpListener->joinable()) {
return false;
}
if (tcpListener->try_join_for(boost::chrono::nanoseconds(1))) {
return false;
}
return true;
}
void MessageBroker::setNumberOfSlots(const int eid, const int slots) {
slotCountMap[eid] = slots;
initializeInboxes(eid,slots);
}
int MessageBroker::getNumberOfSlots(const int eid) {
assert (slotCountMap[eid] > 0);
return slotCountMap[eid];
}
void MessageBroker::initializeInboxes(const int eid, const int slots) {
for (int i = 0; i < slots; ++i) {
loopInboxes[eid][i] = new std::queue<Tuple*>;
}
for (auto server : servers) {
server->initializeInboxes(eid, slots);
}
}
}