Files
2026-01-23 17:03:45 +08:00

92 lines
2.5 KiB
C++

/*
*/
#ifndef SECONDO_TES_MESSAGEBROKER_H
#define SECONDO_TES_MESSAGEBROKER_H
#include <list>
#include <functional>
#include <map>
#include "MessageServer.h"
#include "MessageClient.h"
//#include "QueueSupplier.h"
#include "LoopbackProxy.h"
#include "../Helpers/RemoteEndpoint.h"
#include "../typedefs.h"
#include <boost/thread.hpp>
namespace distributed2 {
class MessageBroker {
public:
MessageBroker();
~MessageBroker();
static MessageBroker& get();
void addServer(const int eid, const int slot, MessageServer* server); // 5
void wakeup(const int eid, const int slot); // 5
Tuple* getTuple(const int eid, const int slot); // 5
void pushTuple(const int eid, const int slot, Tuple* tuple); // 5
void incrementFinished(const int eid, const int slot); // 5
void setNumberOfSlots(const int eid, const int slots); // 5
int getNumberOfSlots(const int eid); // 5
void sendTuple(const int eid, const int slot, const int workerNumber,
Tuple* tuple);
void broadcastFinishMessage(const int eid);
bool startTcpListener(const int port); // in StartTESServer
bool startClient(const int slot, const RemoteEndpoint host);
bool startLoopbackProxy(const int slot);
bool tcpListenerRunning(); // in StartTESServer
void stopServers();
void stopClients();
void reset();
private:
static MessageBroker broker;
std::shared_ptr<boost::thread> tcpListener = nullptr;
std::shared_ptr<Socket> globalSocket = nullptr;
std::list<std::shared_ptr<MessageServer> > servers
= std::list< std::shared_ptr<MessageServer> >();
using mt = std::map<int, std::shared_ptr<MessageClient> >;
mt workerToClient = mt();
template<typename T>
void clear(std::map<int,std::map<int, std::queue<T>*>>& inbox);
void removeMessageServer(const int eid,const int slot); // 5
bool allfinished(const int eid, const int slot); // 5
std::map<int, std::map<int, boost::mutex>> servers2mtx; // 5
std::map<int, std::map<int, std::list<MessageServer*>>> servers2; // 5
std::map<int, std::map<int, std::list<MessageServer*>::iterator>>
serversIterator; // 5
std::map<int, std::map<int, boost::condition_variable>> readycond; // 5
std::map<int, std::map<int, std::queue<Tuple*>* > > loopInboxes; // 5
std::map<int, std::map<int, unsigned int>> finished; // 5
std::map<int,int> slotCountMap; // 5
inline void initializeInboxes(const int eid, const int slots);
void acceptConnections(int port);
};
}
#endif //SECONDO_MESSAGEBROKER_H