/* 1.1.1 Class Implementation ---- This file is part of SECONDO. Copyright (C) 2017, Faculty of Mathematics and Computer Science, Database Systems for New Applications. SECONDO is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. SECONDO is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with SECONDO; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ---- */ #include #include #include "SecondoException.h" #include "Algebras/DBService2/CommunicationClient.hpp" #include "Algebras/DBService2/DBServiceClient.hpp" #include "Algebras/DBService2/DebugOutput.hpp" #include "Algebras/DBService2/ReplicationClient.hpp" #include "Algebras/DBService2/ReplicationServer.hpp" #include "Algebras/DBService2/ReplicationUtils.hpp" #include "Algebras/DBService2/Replicator.hpp" #include "Algebras/DBService2/SecondoUtilsLocal.hpp" #include "Algebras/DBService2/ServerRunnable.hpp" #include using namespace std; namespace DBService { DBServiceClient::DBServiceClient() { printFunction("DBServiceClient::DBServiceClient", std::cout); LOG_SCOPE_FUNCTION(INFO); if(!SecondoUtilsLocal::lookupDBServiceLocation( dbServiceHost, dbServicePort)) { dbServiceHost = ""; dbServicePort = ""; throw new SecondoException("Unable to connect to DBService"); } print("dbServiceHost", dbServiceHost, std::cout); print("dbServicePort", dbServicePort, std::cout); LOG_F(INFO, "DBService (Host: %s, Port: %s)", dbServiceHost.c_str(), dbServicePort.c_str()); startReplicationServer(); } void DBServiceClient::startReplicationServer() { printFunction("DBServiceClient::startReplicationServer", std::cout); LOG_SCOPE_FUNCTION(INFO); string fileTransferPort; SecondoUtilsLocal::readFromConfigFile(fileTransferPort, "DBService", "FileTransferPort", ""); print(fileTransferPort, std::cout); LOG_F(INFO, "FileTransferPort: %s", fileTransferPort.c_str()); ServerRunnable replicationServer(atoi(fileTransferPort.c_str())); replicationServer.run(); } DBServiceClient* DBServiceClient::getInstance() { printFunction("DBServiceClient::getInstance", std::cout); LOG_SCOPE_FUNCTION(INFO); if (!_instance) { try{ _instance = new DBServiceClient(); } catch(...){ _instance = 0; } } return _instance; } bool DBServiceClient::triggerReplication(const std::string& databaseName, const std::string& relationName, const ListExpr relType, const bool async) { //LOG_SCOPE_FUNCTION(INFO); printFunction("DBServiceClient::triggerReplication", std::cout); print("databaseName", databaseName, std::cout); print("relationName", relationName, std::cout); print(relType, std::cout); LOG_F(INFO, "Database: %s, Relation: %s", databaseName.c_str(), relationName.c_str()); CommunicationClient dbServiceMasterClient(dbServiceHost, atoi(dbServicePort.c_str()), 0); if(dbServiceMasterClient.triggerReplication( databaseName, relationName)) { Replicator replicator( *(const_cast(&databaseName)), *(const_cast(&relationName)), relType); replicator.run(async); } return true; } bool DBServiceClient::triggerDerivation(const std::string& databaseName, const std::string& targetName, const std::string& relationName, const std::string& fundef) { printFunction("DBServiceClient::triggerDerivation", std::cout); LOG_SCOPE_FUNCTION(INFO); print("databaseName", databaseName, std::cout); print("targetName", targetName, std::cout); print("relationName", relationName, std::cout); print("fundef", fundef, std::cout); LOG_F(INFO, "Database: %s, Target: %s, Relation: %s, Fundef: %s", databaseName.c_str(), targetName.c_str(), relationName.c_str(), fundef.c_str()); CommunicationClient dbServiceMasterClient(dbServiceHost, atoi(dbServicePort.c_str()), 0); dbServiceMasterClient.triggerDerivation( databaseName, targetName, relationName, fundef); return true; } bool DBServiceClient::getReplicaLocation( const string& databaseName, const string& relationName, const std::vector& otherObjects, string& host, string& transferPort, string& commPort) { LOG_SCOPE_FUNCTION(INFO); LOG_F(INFO, "databaseName: %s", databaseName.c_str()); LOG_F(INFO, "relationName: %s", relationName.c_str()); printFunction("DBServiceClient::getReplicaLocation", std::cout); print("databaseName", databaseName, std::cout); print("relationName", relationName, std::cout); CommunicationClient dbServiceMasterClient(dbServiceHost, atoi(dbServicePort.c_str()), 0); return dbServiceMasterClient.getReplicaLocation(databaseName, relationName, otherObjects, host, transferPort, commPort); } fs::path DBServiceClient::retrieveReplicaAndGetFileName( const string& databaseName, const string& relationName, const vector& otherObjects, const string& functionAsNestedListString) { LOG_SCOPE_FUNCTION(INFO); LOG_F(INFO, "databaseName: %s", databaseName.c_str()); LOG_F(INFO, "relationName: %s", relationName.c_str()); LOG_F(INFO, "function: %s", functionAsNestedListString.c_str()); printFunction("DBServiceClient::retrieveReplicaAndGetFileName", std::cout); print("databaseName", databaseName, std::cout); print("relationName", relationName, std::cout); print("other objects", otherObjects, std::cout); print("function", functionAsNestedListString, std::cout); string host; string transferPort; string dummyCommPort; if(!getReplicaLocation( databaseName, relationName, otherObjects, host, transferPort, dummyCommPort)) { print("No replica available", std::cout); LOG_F(WARNING, "No replica available."); return string(""); } print("host", host, std::cout); print("transferPort", transferPort, std::cout); LOG_F(INFO, "Host: %s, TransferPort: %s", host.c_str(), transferPort.c_str()); fs::path localPathOnClient = ReplicationUtils::expandFilenameToAbsPath( ReplicationUtils::getFileName(databaseName,relationName)); string filenameOnDBSWorker = ReplicationUtils::getFileNameOnDBServiceWorker(databaseName, relationName); /* Use case: Transfer from DBS-W to a O-Node. Here the DBServiceClient is used to get the fileName of the file to be retrieved by the requesting node. (e.g. an O-W) */ ReplicationClient clientToDBServiceWorker( host, atoi(transferPort.c_str()), localPathOnClient, /*local*/ filenameOnDBSWorker, /*remote*/ *(const_cast(&databaseName)), *(const_cast(&relationName))); // requestReplica sets the fileName passed into it. // requestReplica acts upon the remoteFile (2nd argument of the constructor) fs::path fileName; clientToDBServiceWorker.requestReplica( functionAsNestedListString, fileName, otherObjects); LOG_F(INFO, "retrieveReplicaAndGetFileName: %s", fileName.string().c_str()); return fileName; } bool DBServiceClient::deleteReplicas(const string& databaseName, const string& relationName, const string& derivateName) { printFunction("DBServiceClient::deleteReplicas", std::cout); LOG_SCOPE_FUNCTION(INFO); try{ CommunicationClient dbServiceMasterClient(dbServiceHost, atoi(dbServicePort.c_str()), 0); return dbServiceMasterClient.requestReplicaDeletion( databaseName, relationName, derivateName); } catch(...) { return false; } } bool DBServiceClient::pingDBService() { printFunction("DBServiceClient::pingDBService", std::cout); LOG_SCOPE_FUNCTION(INFO); CommunicationClient dbServiceMasterClient(dbServiceHost, atoi(dbServicePort.c_str()), 0); return dbServiceMasterClient.pingDBService(); } bool DBServiceClient::getStreamType( const string& databaseName, const string& relationName, string& nestedListAsString) { printFunction("DBServiceClient::getRelType", std::cout); LOG_SCOPE_FUNCTION(INFO); CommunicationClient dbServiceMasterClient(dbServiceHost, atoi(dbServicePort.c_str()), 0); string dbServiceWorkerHost; string dummyTransferPort; string commPort; vector dummy; // for extracting the type, we can ignore // other objects if(!dbServiceMasterClient.getReplicaLocation( databaseName, relationName, dummy, dbServiceWorkerHost, dummyTransferPort, commPort)) { print("The relation does not exist in DBService", std::cout); LOG_F(WARNING, "The relation (%s, %s) does not exist in DBService.", databaseName.c_str(), relationName.c_str()); return false; } CommunicationClient dbServiceWorkerClient(dbServiceWorkerHost, atoi(commPort.c_str()), 0); return dbServiceWorkerClient.getRelType( RelationInfo::getIdentifier(databaseName, relationName), nestedListAsString); } bool DBServiceClient::getDerivedType( const string& databaseName, const string& relName, const string& derivedName, string& nestedListAsString) { printFunction("DBServiceClient::getDerivedType", std::cout); LOG_SCOPE_FUNCTION(INFO); CommunicationClient dbServiceMasterClient(dbServiceHost, atoi(dbServicePort.c_str()), 0); string dbServiceWorkerHost; string dummyTransferPort; string commPort; vector derived; derived.push_back(derivedName); if(!dbServiceMasterClient.getReplicaLocation( databaseName, relName, derived, dbServiceWorkerHost, dummyTransferPort, commPort)) { print("Relation and/or Derived Object does not exist in DBService", std::cout); LOG_F(WARNING, "Relation and/or Derived Object (%s, %s) does " "not exist in DBService", databaseName.c_str(), relName.c_str()); return false; } CommunicationClient dbServiceWorkerClient(dbServiceWorkerHost, atoi(commPort.c_str()), 0); return dbServiceWorkerClient.getDerivedType( RelationInfo::getIdentifier(databaseName, relName), derivedName, nestedListAsString); } bool DBServiceClient::relationExists( const std::string& databaseName, const std::string& relationName) { printFunction("DBServiceClient::relationExists", std::cout); LOG_SCOPE_FUNCTION(INFO); CommunicationClient dbServiceMasterClient(dbServiceHost, atoi(dbServicePort.c_str()), 0); string dbServiceWorkerHost; string dummyTransferPort; string commPort; vector dummy; if(!dbServiceMasterClient.getReplicaLocation( databaseName, relationName, dummy, dbServiceWorkerHost, dummyTransferPort, commPort)) { print("The Relation does not exist in DBService", std::cout); LOG_F(WARNING, "The Relation (%s, %s) does not exist in DBService", databaseName.c_str(), relationName.c_str()); return false; } return true; } bool DBServiceClient::allExists( const std::string& databaseName, const std::string& relationName, const std::vector& derivates){ printFunction(__PRETTY_FUNCTION__, std::cout); LOG_SCOPE_FUNCTION(INFO); CommunicationClient dbServiceMasterClient(dbServiceHost, atoi(dbServicePort.c_str()), 0); string dbServiceWorkerHost; string dummyTransferPort; string commPort; if(!dbServiceMasterClient.getReplicaLocation( databaseName, relationName, derivates, dbServiceWorkerHost, dummyTransferPort, commPort)) { print("Relation or a derivate does not exist in " "DBService at the same location", std::cout); LOG_F(WARNING, "Relation or a derivate (%s, %s) does not exist in " "DBService at the same location", databaseName.c_str(), relationName.c_str()); return false; } return true; } bool DBServiceClient::addNode( const std::string& nodeHost, const int& nodePort, const std::string& pathToNodeConfig) { LOG_SCOPE_FUNCTION(INFO); LOG_F(INFO, "nodeHost: %s", nodeHost.c_str()); LOG_F(INFO, "nodePort: %d", nodePort); LOG_F(INFO, "pathToNodeConfig: %s", pathToNodeConfig.c_str()); // Connect to the DBService Master to submit the worker nodes' // details. CommunicationClient dbServiceMasterClient(dbServiceHost, atoi(dbServicePort.c_str()), 0); bool success = dbServiceMasterClient.addNode( nodeHost, nodePort, pathToNodeConfig); if (!success) { LOG_F(ERROR, "%s", "Couldn't addNode."); return false; } return true; } DBServiceClient* DBServiceClient::_instance = nullptr; } /* namespace DBService */