Files
secondo/Algebras/DBService2/DBServiceClient.cpp

485 lines
15 KiB
C++
Raw Permalink Normal View History

2026-01-23 17:03:45 +08:00
/*
1.1.1 Class Implementation
----
This file is part of SECONDO.
Copyright (C) 2017,
Faculty of Mathematics and Computer Science,
Database Systems for New Applications.
SECONDO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
SECONDO is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SECONDO; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
----
*/
#include <cstdlib>
#include <boost/make_shared.hpp>
#include "SecondoException.h"
#include "Algebras/DBService2/CommunicationClient.hpp"
#include "Algebras/DBService2/DBServiceClient.hpp"
#include "Algebras/DBService2/DebugOutput.hpp"
#include "Algebras/DBService2/ReplicationClient.hpp"
#include "Algebras/DBService2/ReplicationServer.hpp"
#include "Algebras/DBService2/ReplicationUtils.hpp"
#include "Algebras/DBService2/Replicator.hpp"
#include "Algebras/DBService2/SecondoUtilsLocal.hpp"
#include "Algebras/DBService2/ServerRunnable.hpp"
#include <loguru.hpp>
using namespace std;
namespace DBService {
DBServiceClient::DBServiceClient()
{
printFunction("DBServiceClient::DBServiceClient", std::cout);
LOG_SCOPE_FUNCTION(INFO);
if(!SecondoUtilsLocal::lookupDBServiceLocation(
dbServiceHost,
dbServicePort))
{
dbServiceHost = "";
dbServicePort = "";
throw new SecondoException("Unable to connect to DBService");
}
print("dbServiceHost", dbServiceHost, std::cout);
print("dbServicePort", dbServicePort, std::cout);
LOG_F(INFO, "DBService (Host: %s, Port: %s)",
dbServiceHost.c_str(), dbServicePort.c_str());
startReplicationServer();
}
void DBServiceClient::startReplicationServer()
{
printFunction("DBServiceClient::startReplicationServer", std::cout);
LOG_SCOPE_FUNCTION(INFO);
string fileTransferPort;
SecondoUtilsLocal::readFromConfigFile(fileTransferPort,
"DBService",
"FileTransferPort",
"");
print(fileTransferPort, std::cout);
LOG_F(INFO, "FileTransferPort: %s", fileTransferPort.c_str());
ServerRunnable replicationServer(atoi(fileTransferPort.c_str()));
replicationServer.run<ReplicationServer>();
}
DBServiceClient* DBServiceClient::getInstance()
{
printFunction("DBServiceClient::getInstance", std::cout);
LOG_SCOPE_FUNCTION(INFO);
if (!_instance)
{
try{
_instance = new DBServiceClient();
} catch(...){
_instance = 0;
}
}
return _instance;
}
bool DBServiceClient::triggerReplication(const std::string& databaseName,
const std::string& relationName,
const ListExpr relType,
const bool async)
{
//LOG_SCOPE_FUNCTION(INFO);
printFunction("DBServiceClient::triggerReplication", std::cout);
print("databaseName", databaseName, std::cout);
print("relationName", relationName, std::cout);
print(relType, std::cout);
LOG_F(INFO, "Database: %s, Relation: %s",
databaseName.c_str(), relationName.c_str());
CommunicationClient dbServiceMasterClient(dbServiceHost,
atoi(dbServicePort.c_str()),
0);
if(dbServiceMasterClient.triggerReplication(
databaseName,
relationName))
{
Replicator replicator(
*(const_cast<string*>(&databaseName)),
*(const_cast<string*>(&relationName)),
relType);
replicator.run(async);
}
return true;
}
bool DBServiceClient::triggerDerivation(const std::string& databaseName,
const std::string& targetName,
const std::string& relationName,
const std::string& fundef)
{
printFunction("DBServiceClient::triggerDerivation", std::cout);
LOG_SCOPE_FUNCTION(INFO);
print("databaseName", databaseName, std::cout);
print("targetName", targetName, std::cout);
print("relationName", relationName, std::cout);
print("fundef", fundef, std::cout);
LOG_F(INFO, "Database: %s, Target: %s, Relation: %s, Fundef: %s",
databaseName.c_str(), targetName.c_str(), relationName.c_str(),
fundef.c_str());
CommunicationClient dbServiceMasterClient(dbServiceHost,
atoi(dbServicePort.c_str()),
0);
dbServiceMasterClient.triggerDerivation(
databaseName,
targetName,
relationName,
fundef);
return true;
}
bool DBServiceClient::getReplicaLocation(
const string& databaseName,
const string& relationName,
const std::vector<std::string>& otherObjects,
string& host,
string& transferPort,
string& commPort)
{
LOG_SCOPE_FUNCTION(INFO);
LOG_F(INFO, "databaseName: %s", databaseName.c_str());
LOG_F(INFO, "relationName: %s", relationName.c_str());
printFunction("DBServiceClient::getReplicaLocation", std::cout);
print("databaseName", databaseName, std::cout);
print("relationName", relationName, std::cout);
CommunicationClient dbServiceMasterClient(dbServiceHost,
atoi(dbServicePort.c_str()),
0);
return dbServiceMasterClient.getReplicaLocation(databaseName,
relationName,
otherObjects,
host,
transferPort,
commPort);
}
fs::path DBServiceClient::retrieveReplicaAndGetFileName(
const string& databaseName,
const string& relationName,
const vector<string>& otherObjects,
const string& functionAsNestedListString)
{
LOG_SCOPE_FUNCTION(INFO);
LOG_F(INFO, "databaseName: %s", databaseName.c_str());
LOG_F(INFO, "relationName: %s", relationName.c_str());
LOG_F(INFO, "function: %s", functionAsNestedListString.c_str());
printFunction("DBServiceClient::retrieveReplicaAndGetFileName", std::cout);
print("databaseName", databaseName, std::cout);
print("relationName", relationName, std::cout);
print("other objects", otherObjects, std::cout);
print("function", functionAsNestedListString, std::cout);
string host;
string transferPort;
string dummyCommPort;
if(!getReplicaLocation(
databaseName, relationName, otherObjects, host, transferPort,
dummyCommPort))
{
print("No replica available", std::cout);
LOG_F(WARNING, "No replica available.");
return string("");
}
print("host", host, std::cout);
print("transferPort", transferPort, std::cout);
LOG_F(INFO, "Host: %s, TransferPort: %s",
host.c_str(), transferPort.c_str());
fs::path localPathOnClient = ReplicationUtils::expandFilenameToAbsPath(
ReplicationUtils::getFileName(databaseName,relationName));
string filenameOnDBSWorker =
ReplicationUtils::getFileNameOnDBServiceWorker(databaseName,
relationName);
/*
Use case: Transfer from DBS-W to a O-Node.
Here the DBServiceClient is used to get the fileName of the file
to be retrieved by the requesting node. (e.g. an O-W)
*/
ReplicationClient clientToDBServiceWorker(
host,
atoi(transferPort.c_str()),
localPathOnClient, /*local*/
filenameOnDBSWorker, /*remote*/
*(const_cast<string*>(&databaseName)),
*(const_cast<string*>(&relationName)));
// requestReplica sets the fileName passed into it.
// requestReplica acts upon the remoteFile (2nd argument of the constructor)
fs::path fileName;
clientToDBServiceWorker.requestReplica(
functionAsNestedListString,
fileName,
otherObjects);
LOG_F(INFO, "retrieveReplicaAndGetFileName: %s", fileName.string().c_str());
return fileName;
}
bool DBServiceClient::deleteReplicas(const string& databaseName,
const string& relationName,
const string& derivateName)
{
printFunction("DBServiceClient::deleteReplicas", std::cout);
LOG_SCOPE_FUNCTION(INFO);
try{
CommunicationClient dbServiceMasterClient(dbServiceHost,
atoi(dbServicePort.c_str()),
0);
return dbServiceMasterClient.requestReplicaDeletion(
databaseName,
relationName,
derivateName);
} catch(...) {
return false;
}
}
bool DBServiceClient::pingDBService()
{
printFunction("DBServiceClient::pingDBService", std::cout);
LOG_SCOPE_FUNCTION(INFO);
CommunicationClient dbServiceMasterClient(dbServiceHost,
atoi(dbServicePort.c_str()),
0);
return dbServiceMasterClient.pingDBService();
}
bool DBServiceClient::getStreamType(
const string& databaseName,
const string& relationName,
string& nestedListAsString)
{
printFunction("DBServiceClient::getRelType", std::cout);
LOG_SCOPE_FUNCTION(INFO);
CommunicationClient dbServiceMasterClient(dbServiceHost,
atoi(dbServicePort.c_str()),
0);
string dbServiceWorkerHost;
string dummyTransferPort;
string commPort;
vector<string> dummy; // for extracting the type, we can ignore
// other objects
if(!dbServiceMasterClient.getReplicaLocation(
databaseName,
relationName,
dummy,
dbServiceWorkerHost,
dummyTransferPort,
commPort))
{
print("The relation does not exist in DBService", std::cout);
LOG_F(WARNING, "The relation (%s, %s) does not exist in DBService.",
databaseName.c_str(), relationName.c_str());
return false;
}
CommunicationClient dbServiceWorkerClient(dbServiceWorkerHost,
atoi(commPort.c_str()),
0);
return dbServiceWorkerClient.getRelType(
RelationInfo::getIdentifier(databaseName, relationName),
nestedListAsString);
}
bool DBServiceClient::getDerivedType(
const string& databaseName,
const string& relName,
const string& derivedName,
string& nestedListAsString)
{
printFunction("DBServiceClient::getDerivedType", std::cout);
LOG_SCOPE_FUNCTION(INFO);
CommunicationClient dbServiceMasterClient(dbServiceHost,
atoi(dbServicePort.c_str()),
0);
string dbServiceWorkerHost;
string dummyTransferPort;
string commPort;
vector<string> derived;
derived.push_back(derivedName);
if(!dbServiceMasterClient.getReplicaLocation(
databaseName,
relName,
derived,
dbServiceWorkerHost,
dummyTransferPort,
commPort))
{
print("Relation and/or Derived Object does not exist in DBService",
std::cout);
LOG_F(WARNING, "Relation and/or Derived Object (%s, %s) does "
"not exist in DBService",
databaseName.c_str(), relName.c_str());
return false;
}
CommunicationClient dbServiceWorkerClient(dbServiceWorkerHost,
atoi(commPort.c_str()),
0);
return dbServiceWorkerClient.getDerivedType(
RelationInfo::getIdentifier(databaseName, relName),
derivedName,
nestedListAsString);
}
bool DBServiceClient::relationExists(
const std::string& databaseName,
const std::string& relationName)
{
printFunction("DBServiceClient::relationExists", std::cout);
LOG_SCOPE_FUNCTION(INFO);
CommunicationClient dbServiceMasterClient(dbServiceHost,
atoi(dbServicePort.c_str()),
0);
string dbServiceWorkerHost;
string dummyTransferPort;
string commPort;
vector<string> dummy;
if(!dbServiceMasterClient.getReplicaLocation(
databaseName,
relationName,
dummy,
dbServiceWorkerHost,
dummyTransferPort,
commPort))
{
print("The Relation does not exist in DBService", std::cout);
LOG_F(WARNING, "The Relation (%s, %s) does not exist in DBService",
databaseName.c_str(), relationName.c_str());
return false;
}
return true;
}
bool DBServiceClient::allExists(
const std::string& databaseName,
const std::string& relationName,
const std::vector<std::string>& derivates){
printFunction(__PRETTY_FUNCTION__, std::cout);
LOG_SCOPE_FUNCTION(INFO);
CommunicationClient dbServiceMasterClient(dbServiceHost,
atoi(dbServicePort.c_str()),
0);
string dbServiceWorkerHost;
string dummyTransferPort;
string commPort;
if(!dbServiceMasterClient.getReplicaLocation(
databaseName,
relationName,
derivates,
dbServiceWorkerHost,
dummyTransferPort,
commPort))
{
print("Relation or a derivate does not exist in "
"DBService at the same location", std::cout);
LOG_F(WARNING, "Relation or a derivate (%s, %s) does not exist in "
"DBService at the same location",
databaseName.c_str(), relationName.c_str());
return false;
}
return true;
}
bool DBServiceClient::addNode(
const std::string& nodeHost,
const int& nodePort,
const std::string& pathToNodeConfig) {
LOG_SCOPE_FUNCTION(INFO);
LOG_F(INFO, "nodeHost: %s", nodeHost.c_str());
LOG_F(INFO, "nodePort: %d", nodePort);
LOG_F(INFO, "pathToNodeConfig: %s", pathToNodeConfig.c_str());
// Connect to the DBService Master to submit the worker nodes'
// details.
CommunicationClient dbServiceMasterClient(dbServiceHost,
atoi(dbServicePort.c_str()),
0);
bool success = dbServiceMasterClient.addNode(
nodeHost, nodePort, pathToNodeConfig);
if (!success) {
LOG_F(ERROR, "%s", "Couldn't addNode.");
return false;
}
return true;
}
DBServiceClient* DBServiceClient::_instance = nullptr;
} /* namespace DBService */