Files
secondo/Algebras/DBService2/CommunicationServer.cpp

1070 lines
35 KiB
C++
Raw Permalink Normal View History

2026-01-23 17:03:45 +08:00
/*
1.1.1 Class Definition
----
This file is part of SECONDO.
Copyright (C) 2017,
Faculty of Mathematics and Computer Science,
Database Systems for New Applications.
SECONDO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
SECONDO is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SECONDO; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
----
*/
#include "Algebra.h"
#include "FileSystem.h"
#include "StringUtils.h"
#include "SecondoCatalog.h"
#include "SecondoSystem.h"
#include "Algebras/Distributed2/FileRelations.h"
#include "Algebras/DBService2/CommunicationProtocol.hpp"
#include "Algebras/DBService2/CommunicationServer.hpp"
#include "Algebras/DBService2/CommunicationUtils.hpp"
#include "Algebras/DBService2/DBServiceManager.hpp"
#include "Algebras/DBService2/ReplicationClientRunnable.hpp"
#include "Algebras/DBService2/ReplicationUtils.hpp"
#include "Algebras/DBService2/SecondoUtilsLocal.hpp"
#include "Algebras/DBService2/TriggerFileTransferRunnable.hpp"
#include "Algebras/DBService2/TriggerReplicaDeletionRunnable.hpp"
#include "Algebras/DBService2/DerivationClient.hpp"
#include "Algebras/DBService2/CreateDerivateRunnable.hpp"
#include "boost/filesystem.hpp"
#include <loguru.hpp>
#include <algorithm>
#include <random>
#include <sstream>
using namespace distributed2;
using namespace std;
namespace fs = boost::filesystem;
extern boost::recursive_mutex nlparsemtx;
namespace DBService {
CommunicationServer::CommunicationServer(int port) :
MultiClientServer(port)
{
LOG_SCOPE_FUNCTION(INFO);
LOG_F(INFO, "CommunicationServer port: %d", port);
string context("CommunicationServer");
traceWriter= unique_ptr<TraceWriter>
(new TraceWriter(context, port, std::cout));
traceWriter->writeFunction("CommunicationServer::CommunicationServer");
traceWriter->write("Initializing CommunicationServer");
traceWriter->write("port", port);
lookupMinimumReplicaCount();
}
CommunicationServer::~CommunicationServer()
{
LOG_SCOPE_FUNCTION(INFO);
traceWriter->writeFunction("CommunicationServer::~CommunicationServer");
}
int CommunicationServer::start()
{
LOG_SCOPE_FUNCTION(INFO);
traceWriter->writeFunction("CommunicationServer::start");
return MultiClientServer::start();
}
void CommunicationServer::lookupMinimumReplicaCount()
{
LOG_SCOPE_FUNCTION(INFO);
traceWriter->writeFunction(
"CommunicationServer::lookupMinimumReplicaCount");
string replicaNumber;
SecondoUtilsLocal::readFromConfigFile(replicaNumber,
"DBService",
"ReplicaNumber",
"");
minimumReplicaCount = atoi(replicaNumber.c_str());
}
int CommunicationServer::communicate(iostream& io)
{
const boost::thread::id tid = boost::this_thread::get_id();
traceWriter->writeFunction(tid, "CommunicationServer::communicate");
LOG_SCOPE_FUNCTION(INFO);
try
{
traceWriter->write(tid, "Communicating...");
CommunicationUtils::sendLine(io,
CommunicationProtocol::CommunicationServer());
if (!CommunicationUtils::receivedExpectedLine(io,
CommunicationProtocol::CommunicationClient()))
{
traceWriter->write(tid,
"Protocol error: Not connected to CommunicationClient");
LOG_F(ERROR,
"Protocol error: Not connected to CommunicationClient");
return 1;
}
string request;
CommunicationUtils::receiveLine(io, request);
traceWriter->write(tid, "request", request);
LOG_F(INFO, "Received request: %s", request.c_str());
if(request ==
CommunicationProtocol::TriggerReplication())
{
handleTriggerReplicationRequest(io, tid);
}else if(request ==
CommunicationProtocol::StartingSignal())
{
handleStartingSignalRequest(io, tid);
}else if(request ==
CommunicationProtocol::TriggerFileTransfer())
{
handleTriggerFileTransferRequest(io, tid);
}else if(request ==
CommunicationProtocol::ReplicaLocationRequest())
{
handleProvideReplicaLocationRequest(io, tid);
}else if(request ==
CommunicationProtocol::ReplicationSuccessful())
{
reportSuccessfulReplication(io, tid);
}else if(request ==
CommunicationProtocol::DeleteReplicaRequest())
{
handleRequestReplicaDeletion(io, tid);
}else if(request ==
CommunicationProtocol::TriggerReplicaDeletion())
{
handleTriggerReplicaDeletion(io, tid);
}else if(request ==
CommunicationProtocol::Ping())
{
handlePing(io, tid);
}else if(request ==
CommunicationProtocol::RelTypeRequest())
{
handleRelTypeRequest(io, tid);
}else if(request ==
CommunicationProtocol::DerivedTypeRequest())
{
handleDerivedTypeRequest(io, tid);
}else if(request ==
CommunicationProtocol::TriggerDerivation())
{
handleTriggerDerivation(io,tid);
}else if(request ==
CommunicationProtocol::CreateDerivation())
{
handleCreateDerivation(io,tid);
}else if(request ==
CommunicationProtocol::CreateDerivateSuccessful()){
reportSuccessfulDerivation(io,tid);
} else if (request == CommunicationProtocol::AddNodeRequest()) {
handleAddNodeRequest(io,tid);
} else
{
traceWriter->write(
tid, "Protocol error: invalid request: ", request);
LOG_F(ERROR, "Protocol error: invalid request: %s",
request.c_str());
return 1;
}
} catch (char const* exception) {
traceWriter->write(tid, "CommunicationServer: communication char "
"const* exception:");
traceWriter->write(tid, exception);
// LOG_F(ERROR, "CommunicationServer: communication char "
// "const* exception: %s", exception);
return 2;
} catch (std::string const* exception) {
traceWriter->write(tid, "CommunicationServer: communication string \
const* exception:");
traceWriter->write(tid, exception->c_str());
// LOG_F(ERROR, "CommunicationServer: communication string "
// "exception: %s", exception->c_str());
return 2;
} catch (std::string const exception) {
traceWriter->write(tid, "CommunicationServer: communication string \
const exception:");
traceWriter->write(tid, exception.c_str());
LOG_F(ERROR, "CommunicationServer: communication const string "
"exception: %s", exception.c_str());
return 2;
}
return 0;
}
bool CommunicationServer::handleTriggerReplicationRequest(
std::iostream& io, const boost::thread::id tid)
{
bool success = false;
// The original system (e.g. the master) triggers the actual replication
traceWriter->writeFunction(tid,
"CommunicationServer::handleTriggerReplicationRequest");
LOG_SCOPE_FUNCTION(INFO);
CommunicationUtils::sendLine(io,
CommunicationProtocol::RelationRequest());
queue<string> receiveBuffer;
CommunicationUtils::receiveLines(io, 2, receiveBuffer);
string databaseName = receiveBuffer.front();
receiveBuffer.pop();
string relationName = receiveBuffer.front();
receiveBuffer.pop();
traceWriter->write(tid, "databaseName", databaseName);
traceWriter->write(tid, "relationName", relationName);
LOG_F(INFO, "databaseName: %s", databaseName.c_str());
LOG_F(INFO, "relationName: %s", relationName.c_str());
DBServiceManager* dbService = DBServiceManager::getInstance();
traceWriter->write(tid, "Got a DBServiceManager instance.");
LOG_F(INFO, "Got a DBServiceManager instance.");
// Check if the DBService already has a replica of the given database &
// relation combination ...
if(dbService->replicaExists(databaseName, relationName))
{
/*
Updates are not supported.
If the DBService already has a replica the replication is cancelled.
*/
traceWriter->write(tid,
"The relation exists and has replicas. Aborting.");
LOG_F(ERROR, "The relation exists and has replicas."
"This is currently not supported. Aborting");
CommunicationUtils::sendLine(io,
CommunicationProtocol::ReplicaExists());
return false;
}
traceWriter->write(tid, "The relation doesn't exist. Initiating \
replication. Requesting a target node...");
LOG_F(INFO, "The relation doesn't exist. Initiating "
"replication. Requesting a target node...");
CommunicationUtils::sendLine(io,
CommunicationProtocol::LocationRequest());
traceWriter->write(tid, "Location request issued.");
LOG_F(INFO, "Target node requested.");
CommunicationUtils::receiveLines(io, 4, receiveBuffer);
string host = receiveBuffer.front();
receiveBuffer.pop();
string port = receiveBuffer.front();
receiveBuffer.pop();
string disk = receiveBuffer.front();
receiveBuffer.pop();
string transferPort = receiveBuffer.front();
receiveBuffer.pop();
traceWriter->write(tid, "Received the following target node:");
traceWriter->write(tid, "host", host);
traceWriter->write(tid, "port", port);
traceWriter->write(tid, "disk", disk);
traceWriter->write(tid, "transferPort", transferPort);
LOG_F(INFO, "Received the following target node: "
"Host: %s, Port: %s, Disk: %s, TransferPort: %s",
host.c_str(), port.c_str(), disk.c_str(), transferPort.c_str());
// Given the database and relation which needs to replicated, it now needs
// to be determined
// whereto the relation can be replicated. This can be a single or
// mulitple worker nodes
// as replication targets.
traceWriter->write(tid, "Determinig replica locations...");
LOG_F(INFO, "Determinig replica locations...");
/*
TODO Rename determineReplicaLocations as it also creates the relation,
original node, do the replica placement and stores all records if
a placement can be done.
*/
success = dbService->determineReplicaLocations(databaseName,
relationName,
host,
port,
disk);
traceWriter->write(tid, "Done determinig replica locations...");
LOG_F(INFO, "Done determinig replica locations...");
if (success) {
traceWriter->write(tid, "Replica placement successful.");
LOG_F(INFO, "Replica placement successful.");
CommunicationUtils::sendLine(io,
CommunicationProtocol::ReplicationTriggered());
} else {
traceWriter->write(tid, "Replica placement failed.");
traceWriter->write(tid, dbService->getMessages().c_str());
LOG_F(ERROR, "The replica placement has failed. "
"Placement strategy message is: %s",
dbService->getMessages().c_str());
CommunicationUtils::sendLine(io,
CommunicationProtocol::ReplicationCanceled());
}
return true;
}
bool CommunicationServer::handleStartingSignalRequest(
std::iostream& io, const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handleStartingSignalRequest");
LOG_SCOPE_FUNCTION(INFO);
CommunicationUtils::sendLine(io,
CommunicationProtocol::RelationRequest());
// relID = {DATABASE}xDBSx{RELATION_NAME}
string relID;
CommunicationUtils::receiveLine(io, relID);
traceWriter->write(tid, "relID", relID);
LOG_F(INFO, "relID: %s", relID.c_str());
string relationDatabase;
string relationName;
//TODO Refactor -> Eliminate dependency to RelationInfo
RelationInfo::parseIdentifier(relID, relationDatabase, relationName);
//TODO use shared_ptr
DBServiceManager* dbService = DBServiceManager::getInstance();
shared_ptr<DBService::Relation> relation = dbService->getRelation(
relationDatabase, relationName);
if (relation == nullptr) {
stringstream msg;
msg << "Couldn't find the relation (relationDatabase: ";
msg << relationDatabase << ", relationName: " << relationName;
traceWriter->write(msg.str());
LOG_F(WARNING, "%s", msg.str().c_str());
return false;
}
/* JF:
Locations have been pretermined as during this step it was also possible
to reject the replication, e.g. due to missing nodes.
In this phase it is about starting the file transfers, one for each
selected replica locations.
*/
shared_ptr<DBService::Node> originalNode = relation->getOriginalNode();
traceWriter->write("Triggering file transfers...");
LOG_F(INFO, "Triggering file transfers...");
shared_ptr<DBService::Node> replicaTargetNode;
for( auto& replica : relation->getReplicas()) {
replicaTargetNode = replica->getTargetNode();
TriggerFileTransferRunnable clientToDBServiceWorker(
originalNode->getHost().getHostname(),
originalNode->getTransferPort(),
replicaTargetNode->getHost().getHostname(), // DBService Worker
replicaTargetNode->getComPort(),
relation->getRelationDatabase(),
relation->getName());
clientToDBServiceWorker.run();
}
return true;
}
/*
The CommunicationClient triggering this is located on the DBService Master.
This CommunicationServer is running on a DBService Worker Node.
It will trigger a ReplicationClientRunnable which in turn creates a
ReplicationClient which then will retrieve the Relation from the original
Node.
*/
bool CommunicationServer::handleTriggerFileTransferRequest(
std::iostream& io, const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handleTriggerFileTransferRequest");
LOG_SCOPE_FUNCTION(INFO);
CommunicationUtils::sendLine(io,
CommunicationProtocol::ReplicationDetailsRequest());
traceWriter->write(tid, "Sent ReplicationDetailsRequest");
LOG_F(INFO, "Sent the ReplicationDetailsRequest");
queue<string> receiveBuffer;
CommunicationUtils::receiveLines(io, 5, receiveBuffer);
string host = receiveBuffer.front();
receiveBuffer.pop();
string port = receiveBuffer.front();
receiveBuffer.pop();
string fileName = receiveBuffer.front();
receiveBuffer.pop();
string databaseName = receiveBuffer.front();
receiveBuffer.pop();
string relationName = receiveBuffer.front();
receiveBuffer.pop();
//TODO Explain what these details are about. From or to location?
traceWriter->write(tid, "received replication details");
traceWriter->write(tid, "host", host);
traceWriter->write(tid, "port", port);
// TODO remove filename, is determined automatically
traceWriter->write(tid, "fileName", fileName);
traceWriter->write(tid, "databaseName", databaseName);
traceWriter->write(tid, "relationName", relationName);
LOG_F(INFO, "Received replication details. Host: %s, Port: %s, "
"filename: %s, database: %s, relation: %s",
host.c_str(), port.c_str(), fileName.c_str(), databaseName.c_str(),
relationName.c_str());
LOG_F(INFO, "%s", "Creating ReplicationClientRunnable...");
ReplicationClientRunnable replicationClient(
host,
atoi(port.c_str()),
databaseName,
relationName);
LOG_F(INFO, "%s", "Running ReplicationClientRunnable...");
replicationClient.run();
LOG_F(INFO, "%s", "Done running ReplicationClientRunnable.");
return true;
}
bool CommunicationServer::handleProvideReplicaLocationRequest(
std::iostream& io, const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handleProvideReplicaLocationRequest");
LOG_SCOPE_FUNCTION(INFO);
CommunicationUtils::sendLine(io,
CommunicationProtocol::RelationRequest());
queue<string> receiveBuffer;
CommunicationUtils::receiveLines(io, 2, receiveBuffer);
// START receving derivate information
// read number of other objects
string n;
CommunicationUtils::receiveLine(io, n);
bool correct;
int number = stringutils::str2int<int>(n,correct); //TODO: error handling
if(number<0) number = 0;
queue<string> otherObjects;
if(number>0){
CommunicationUtils::receiveLines(io,number,otherObjects);
}
// END receving derivate information
string databaseName = receiveBuffer.front();
receiveBuffer.pop();
string relationName = receiveBuffer.front();
receiveBuffer.pop();
traceWriter->write(tid, "databaseName", databaseName);
traceWriter->write(tid, "relationName", relationName);
LOG_F(INFO, "Database: %s, Relation: %s",
databaseName.c_str(), relationName.c_str());
DBServiceManager* dbService = DBServiceManager::getInstance();
/*
- Retrieve replicas of the requestion relation,
- Select a random replica
- Return its targetNode
*/
shared_ptr<DBService::Node> node = dbService->getRandomNodeWithReplica(
databaseName, relationName);
/*
CommunicationProtocol
Didn't find a node
This is a non-success case.
*/
queue<string> sendBuffer;
if(node == nullptr)
{
traceWriter->write(tid, "Didn't find a targetNode. Maybe relation "
"doesn't exist or relation has no replicas.");
traceWriter->write(tid, "My data:");
stringstream msg;
dbService->printMetadata(msg);
traceWriter->write(tid, msg.str().c_str());
LOG_F(WARNING, "Didn't find a targetNode. Maybe relation "
"doesn't exist or relation has no replicas. My data: %s",
msg.str().c_str());
sendBuffer.push(CommunicationProtocol::None());
sendBuffer.push(CommunicationProtocol::None());
sendBuffer.push(CommunicationProtocol::None());
CommunicationUtils::sendBatch(io, sendBuffer);
return false;
}
traceWriter->write(tid, "Found a target Node: ");
traceWriter->write(tid, node->str().c_str());
LOG_F(INFO, "Found target node: %s", node->str().c_str());
/*
CommunicationProtocol
Sending the location by providing:
- host
- transferPort
- comPort
This is the success case.
*/
sendBuffer.push(node->getHost().getHostname());
sendBuffer.push(to_string(node->getTransferPort()));
sendBuffer.push(to_string(node->getComPort()));
CommunicationUtils::sendBatch(io, sendBuffer);
return true;
}
bool CommunicationServer::reportSuccessfulReplication(
iostream& io,
const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::reportSuccessfulReplication");
LOG_SCOPE_FUNCTION(INFO);
CommunicationUtils::sendLine(io,
CommunicationProtocol::RelationRequest());
string relID;
CommunicationUtils::receiveLine(io, relID);
traceWriter->write(tid, "relID", relID);
LOG_F(INFO, "reldID: %s", relID.c_str());
CommunicationUtils::sendLine(io,
CommunicationProtocol::LocationRequest());
queue<string> receiveBuffer;
CommunicationUtils::receiveLines(io, 2, receiveBuffer);
string host = receiveBuffer.front();
receiveBuffer.pop();
string port = receiveBuffer.front();
receiveBuffer.pop();
traceWriter->write(tid, "host", host);
traceWriter->write(tid, "port", port);
LOG_F(INFO, "Host: %s, Port: %s", host.c_str(), port.c_str());
DBServiceManager::getInstance()->maintainSuccessfulReplication(
relID, host, port);
return true;
}
bool CommunicationServer::handleRequestReplicaDeletion(
iostream& io, const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handleRequestReplicaDeletion");
LOG_SCOPE_FUNCTION(INFO);
string databaseName;
string relationName;
string derivateName;
CommunicationUtils::receiveLine(io, databaseName);
CommunicationUtils::receiveLine(io, relationName);
CommunicationUtils::receiveLine(io, derivateName);
// if the relation and all derivates should be removed,
// the derivateName will be empty
traceWriter->write("database", databaseName);
traceWriter->write("relation", relationName);
traceWriter->write("derived", derivateName);
LOG_F(INFO, "Database: %s, Relation: %s, Derivative: %s",
databaseName.c_str(), relationName.c_str(), derivateName.c_str());
DBServiceManager* dbService = DBServiceManager::getInstance();
try{
dbService->deleteReplicaMetadata(databaseName,relationName,
derivateName);
}catch(...)
{
traceWriter->write(tid, "Relation does not exist");
LOG_F(WARNING, "The relation does not exist.");
return false;
}
return true;
}
bool CommunicationServer::handleTriggerReplicaDeletion(
std::iostream& io,
const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handleTriggerReplicaDeletion");
LOG_SCOPE_FUNCTION(INFO);
string databaseName;
string relationName;
string derivateName;
CommunicationUtils::receiveLine(io, databaseName);
CommunicationUtils::receiveLine(io, relationName);
CommunicationUtils::receiveLine(io, derivateName);
string victim;
bool success;
if(derivateName.empty())
{
// remove File
fs::path filename = ReplicationUtils::getFilePathOnDBServiceWorker(
databaseName,
relationName);
traceWriter->write("filename", filename.string());
LOG_F(INFO, "Filename: %s", filename.string().c_str());
success = FileSystem::DeleteFileOrFolder( filename.string() );
if (success) {
traceWriter->write("Successfully deleted file.");
LOG_F(INFO, "Successfully deleted file.");
} else {
traceWriter->write("Couldn't delete file.");
LOG_F(ERROR, "Couldn't delete file");
}
victim = ReplicationUtils::getRelName(filename.filename().string());
} else {
string relId = RelationInfo::getIdentifier(databaseName, relationName);
victim = DerivateInfo::getIdentifier(relId, derivateName);
}
// ensure to have only one access to the catalog
static boost::mutex mtx;
LOG_F(INFO, "%s", "Acquiring lock for SecondoCatalog...");
boost::lock_guard<boost::mutex> guard(mtx);
LOG_F(INFO, "%s", "Successfully acquired lock for SecondoCatalog.");
traceWriter->write("database", databaseName);
traceWriter->write("relation", relationName),
traceWriter->write("derivate", derivateName);
traceWriter->write("victim", victim);
LOG_F(INFO, "Database: %s, Relation: %s, Derivative: %s, Victim: %s",
databaseName.c_str(), relationName.c_str(), derivateName.c_str(),
victim.c_str());
SecondoCatalog* ctlg = SecondoSystem::GetCatalog();
SecondoSystem::BeginTransaction();
ctlg->DeleteObject(victim);
ctlg->CleanUp(false,true);
SecondoSystem::CommitTransaction(false);
traceWriter->write("Deleted object (victim).");
LOG_F(INFO, "Deleted object (victim).");
//TODO check return code etc
//TODO tracing
return true;
}
bool CommunicationServer::handleAddNodeRequest(
std::iostream& io,
const boost::thread::id tid) {
traceWriter->writeFunction(tid,
"CommunicationServer::handleAddNodeRequest");
LOG_SCOPE_FUNCTION(INFO);
// Confirm the AddNodeRequest
CommunicationUtils::sendLine(io,
CommunicationProtocol::AddNodeRequest());
string nodeHost;
string nodePort;
string nodeConfigPath;
CommunicationUtils::receiveLine(io, nodeHost);
CommunicationUtils::receiveLine(io, nodePort);
CommunicationUtils::receiveLine(io, nodeConfigPath);
traceWriter->write("nodeHost", nodeHost);
traceWriter->write("nodePort", nodePort);
traceWriter->write("nodeConfigPath", nodeConfigPath);
LOG_F(INFO, "Node: (Host: %s, Port: %s, ConfigPath: %s",
nodeHost.c_str(), nodePort.c_str(), nodeConfigPath.c_str());
// TODO Talk to DBServiceManager then return
DBServiceManager* dbService = DBServiceManager::getInstance();
bool success = dbService->addNode(nodeHost, stoi(nodePort), nodeConfigPath);
if (success) {
// Success
traceWriter->write("Successfully added node.");
LOG_F(INFO, "Successfully added node.");
CommunicationUtils::sendLine(io,
CommunicationProtocol::NodeAdded());
return true;
} else {
traceWriter->write("dbServiceManager::addNode failed.");
LOG_F(WARNING, "dbServiceManager::addNode failed.");
CommunicationUtils::sendLine(io,
CommunicationProtocol::AddNodeFailed());
return false;
}
return false;
}
bool CommunicationServer::handlePing(
std::iostream& io, const boost::thread::id tid)
{
LOG_SCOPE_FUNCTION(INFO);
CommunicationUtils::sendLine(io,
CommunicationProtocol::Ping());
return true;
}
bool CommunicationServer::handleRelTypeRequest(
std::iostream& io, const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handleRelTypeRequest");
LOG_SCOPE_FUNCTION(INFO);
string relID;
CommunicationUtils::receiveLine(io, relID);
string databaseName;
string relationName;
RelationInfo::parseIdentifier(relID, databaseName, relationName);
traceWriter->write("databaseName", databaseName);
traceWriter->write("relationName", relationName);
LOG_F(INFO, "Database: %s, Relation: %s",
databaseName.c_str(), relationName.c_str());
// Here filename is enough. No no need to use a path as all that mattes
// is extracting the relationName.
// JF: Strange enough as the relationName is also passed as a parameter?!s
string fileName = ReplicationUtils::getFileNameOnDBServiceWorker(
databaseName,
relationName);
traceWriter->write("fileName", fileName);
string relname = ReplicationUtils::getRelName(fileName);
traceWriter->write("relName ", relname);
LOG_F(INFO, "Filename: %s, Relation: %s",
fileName.c_str(), relname.c_str());
LOG_F(INFO, "%s", "Acquiring lock for nlparsemtx...");
// Restrict access to the nested list
boost::lock_guard<boost::recursive_mutex> guard(nlparsemtx);
LOG_F(INFO, "%s", "Successfully acquired lock for nlparsemtx...");
SecondoCatalog* ctlg = SecondoSystem::GetCatalog();
if(!ctlg->IsObjectName(relname)){
traceWriter->write(relname + " is not a database object");
LOG_F(WARNING, "%s is not a database object.", relname.c_str());
CommunicationUtils::sendLine(io, CommunicationProtocol::None());
} else {
traceWriter->write(relname + " is a database object");
LOG_F(INFO, "%s is a database object.", relname.c_str());
ListExpr type = ctlg->GetObjectTypeExpr(relname);
traceWriter->write("relType is " , nl->ToString(type));
LOG_F(INFO, "Relation type is %s.", nl->ToString(type).c_str());
type = nl->TwoElemList(
nl->SymbolAtom(Symbol::STREAM()),
nl->Second(type));
CommunicationUtils::sendLine(io, nl->ToString(type));
}
return true;
}
bool CommunicationServer::handleDerivedTypeRequest(
std::iostream& io, const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handleDerivedTypeRequest");
LOG_SCOPE_FUNCTION(INFO);
// ensure to have only one access to the catalog
string relID;
CommunicationUtils::receiveLine(io, relID);
string derivedName;
CommunicationUtils::receiveLine(io, derivedName);
string databaseName;
string relationName;
RelationInfo::parseIdentifier(relID, databaseName, relationName);
traceWriter->write("databaseName", databaseName);
traceWriter->write("relationName", relationName);
traceWriter->write("derivedName ", derivedName);
LOG_F(INFO, "Database: %s, Relation: %s, Derivative: %s",
databaseName.c_str(), relationName.c_str(), derivedName.c_str());
string objectName = ReplicationUtils::getDerivedName(
databaseName,
relationName,
derivedName);
traceWriter->write("ObjectName ", objectName);
LOG_F(INFO, "ObjectName: %s", objectName.c_str());
LOG_F(INFO, "%s", "Acquiring lock for nlparsemtx...");
boost::lock_guard<boost::recursive_mutex> guard(nlparsemtx);
LOG_F(INFO, "%s", "Successfully acquired lock for nlparsemtx...");
SecondoCatalog* ctlg = SecondoSystem::GetCatalog();
if(!ctlg->IsObjectName(objectName)){
traceWriter->write(objectName + " is not a database object");
LOG_F(WARNING, "%s is not a database object.", objectName.c_str());
CommunicationUtils::sendLine(io, CommunicationProtocol::None());
} else {
traceWriter->write(objectName + " is a database object");
LOG_F(INFO, "%s is a database object.", objectName.c_str());
ListExpr type = ctlg->GetObjectTypeExpr(objectName);
traceWriter->write("Object type is " , nl->ToString(type));
LOG_F(INFO, "Object type is: %s.", nl->ToString(type).c_str());
CommunicationUtils::sendLine(io, nl->ToString(type));
}
return true;
}
bool CommunicationServer::handleTriggerDerivation(
std::iostream& io, const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handleTriggerDerivation");
LOG_SCOPE_FUNCTION(INFO);
LOG_F(INFO, "%s", "Acquiring lock for SecondoCatalog...");
// ensure to have only one access to the catalog
static boost::mutex mtx;
boost::lock_guard<boost::mutex> guard(mtx);
LOG_F(INFO, "%s", "Successfully acquired lock for SecondoCatalog...");
// request derivation information
CommunicationUtils::sendLine(io,
CommunicationProtocol::DerivationRequest());
queue<string> receiveBuffer;
CommunicationUtils::receiveLines(io, 4, receiveBuffer);
string databaseName = receiveBuffer.front();
receiveBuffer.pop();
string targetName = receiveBuffer.front();
receiveBuffer.pop();
string relName = receiveBuffer.front();
receiveBuffer.pop();
string fundef = receiveBuffer.front();
receiveBuffer.pop();
traceWriter->write(tid, "databaseName", databaseName);
traceWriter->write(tid, "targetName", targetName);
traceWriter->write(tid, "relationName", relName);
traceWriter->write(tid, "fundef", fundef);
LOG_F(INFO, "Database: %s, TargetName: %s, Relation: %s, Fundef: %s",
databaseName.c_str(), targetName.c_str(), relName.c_str(),
fundef.c_str());
DBServiceManager* dbService = DBServiceManager::getInstance();
if(!dbService->replicaExists(databaseName, relName))
{
CommunicationUtils::sendLine(io,
CommunicationProtocol::RelationNotExists());
return false;
}
if(dbService->derivateExists(targetName)){
CommunicationUtils::sendLine(io,
CommunicationProtocol::ObjectExists());
return false;
}
dbService->addDerivative(databaseName, relName, targetName, fundef);
CommunicationUtils::sendLine (
io, CommunicationProtocol::DerivationTriggered ());
return true;
}
bool CommunicationServer::handleCreateDerivation(
std::iostream& io, const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handleCreateDerivation");
LOG_SCOPE_FUNCTION(INFO);
LOG_F(INFO, "%s", "Acquiring lock for SecondoCatalog...");
// ensure to have only one access to the catalog
static boost::mutex mtx;
boost::lock_guard<boost::mutex> guard(mtx);
LOG_F(INFO, "%s", "Successfully acquired lock for SecondoCatalog...");
CommunicationUtils::sendLine(io,
CommunicationProtocol::DerivationRequest());
queue<string> receiveBuffer;
CommunicationUtils::receiveLines(io, 4, receiveBuffer);
string databaseName = receiveBuffer.front();
receiveBuffer.pop();
string targetName = receiveBuffer.front();
receiveBuffer.pop();
string relName = receiveBuffer.front();
receiveBuffer.pop();
string fundef = receiveBuffer.front();
receiveBuffer.pop();
traceWriter->write(tid, "databaseName", databaseName);
traceWriter->write(tid, "targetName", targetName);
traceWriter->write(tid, "relationName", relName);
traceWriter->write(tid, "fundef", fundef);
LOG_F(INFO, "Database: %s, TargetName: %s, Relation: %s, Fundef: %s",
databaseName.c_str(), targetName.c_str(), relName.c_str(),
fundef.c_str());
DerivationClient dc(databaseName, targetName, relName, fundef);
dc.start();
return true;
}
bool CommunicationServer::reportSuccessfulDerivation(
std::iostream& io, const boost::thread::id tid){
traceWriter->writeFunction(tid,
"CommunicationServer::reportSuccessfulDerivation");
LOG_SCOPE_FUNCTION(INFO);
LOG_F(INFO, "%s", "Acquiring lock for SecondoCatalog...");
// ensure to have only one access to the catalog
static boost::mutex mtx;
boost::lock_guard<boost::mutex> guard(mtx);
LOG_F(INFO, "%s", "Successfully acquired lock for SecondoCatalog...");
CommunicationUtils::sendLine(io,
CommunicationProtocol::ObjectRequest());
string objectID;
CommunicationUtils::receiveLine(io, objectID);
traceWriter->write(tid, "objectID", objectID);
LOG_F(INFO, "ObjectID: %s", objectID.c_str());
CommunicationUtils::sendLine(io,
CommunicationProtocol::LocationRequest());
queue<string> receiveBuffer;
CommunicationUtils::receiveLines(io, 2, receiveBuffer);
string host = receiveBuffer.front();
receiveBuffer.pop();
string port = receiveBuffer.front();
receiveBuffer.pop();
traceWriter->write(tid, "host", host);
traceWriter->write(tid, "port", port);
LOG_F(INFO, "Host: %s, Port: %s", host.c_str(), port.c_str());
DBServiceManager::getInstance()->maintainSuccessfulDerivation(
objectID, host, port);
return true;
}
} /* namespace DBService */