/* */ #ifndef SECONDO_TES_MESSAGEBROKER_H #define SECONDO_TES_MESSAGEBROKER_H #include #include #include #include "MessageServer.h" #include "MessageClient.h" //#include "QueueSupplier.h" #include "LoopbackProxy.h" #include "../Helpers/RemoteEndpoint.h" #include "../typedefs.h" #include namespace distributed2 { class MessageBroker { public: MessageBroker(); ~MessageBroker(); static MessageBroker& get(); void addServer(const int eid, const int slot, MessageServer* server); // 5 void wakeup(const int eid, const int slot); // 5 Tuple* getTuple(const int eid, const int slot); // 5 void pushTuple(const int eid, const int slot, Tuple* tuple); // 5 void incrementFinished(const int eid, const int slot); // 5 void setNumberOfSlots(const int eid, const int slots); // 5 int getNumberOfSlots(const int eid); // 5 void sendTuple(const int eid, const int slot, const int workerNumber, Tuple* tuple); void broadcastFinishMessage(const int eid); bool startTcpListener(const int port); // in StartTESServer bool startClient(const int slot, const RemoteEndpoint host); bool startLoopbackProxy(const int slot); bool tcpListenerRunning(); // in StartTESServer void stopServers(); void stopClients(); void reset(); private: static MessageBroker broker; std::shared_ptr tcpListener = nullptr; std::shared_ptr globalSocket = nullptr; std::list > servers = std::list< std::shared_ptr >(); using mt = std::map >; mt workerToClient = mt(); template void clear(std::map*>>& inbox); void removeMessageServer(const int eid,const int slot); // 5 bool allfinished(const int eid, const int slot); // 5 std::map> servers2mtx; // 5 std::map>> servers2; // 5 std::map::iterator>> serversIterator; // 5 std::map> readycond; // 5 std::map* > > loopInboxes; // 5 std::map> finished; // 5 std::map slotCountMap; // 5 inline void initializeInboxes(const int eid, const int slots); void acceptConnections(int port); }; } #endif //SECONDO_MESSAGEBROKER_H