Files
secondo/Algebras/DBService/CommunicationServer.cpp

854 lines
29 KiB
C++
Raw Permalink Normal View History

2026-01-23 17:03:45 +08:00
/*
1.1.1 Class Definition
----
This file is part of SECONDO.
Copyright (C) 2017,
Faculty of Mathematics and Computer Science,
Database Systems for New Applications.
SECONDO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
SECONDO is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SECONDO; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
----
*/
#include <random>
#include "Algebra.h"
#include "FileSystem.h"
#include "StringUtils.h"
#include "SecondoCatalog.h"
#include "SecondoSystem.h"
#include "Algebras/Distributed2/FileRelations.h"
#include "Algebras/DBService/CommunicationProtocol.hpp"
#include "Algebras/DBService/CommunicationServer.hpp"
#include "Algebras/DBService/CommunicationUtils.hpp"
#include "Algebras/DBService/DBServiceManager.hpp"
#include "Algebras/DBService/ReplicationClientRunnable.hpp"
#include "Algebras/DBService/ReplicationUtils.hpp"
#include "Algebras/DBService/SecondoUtilsLocal.hpp"
#include "Algebras/DBService/TriggerFileTransferRunnable.hpp"
#include "Algebras/DBService/TriggerReplicaDeletionRunnable.hpp"
#include "Algebras/DBService/DerivationClient.hpp"
#include "Algebras/DBService/CreateDerivateRunnable.hpp"
using namespace distributed2;
using namespace std;
namespace DBService {
CommunicationServer::CommunicationServer(int port) :
MultiClientServer(port)
{
string context("CommunicationServer");
traceWriter= unique_ptr<TraceWriter>
(new TraceWriter(context, port, std::cout));
traceWriter->writeFunction("CommunicationServer::CommunicationServer");
traceWriter->write("Initializing CommunicationServer");
traceWriter->write("port", port);
lookupMinimumReplicaCount();
}
CommunicationServer::~CommunicationServer()
{
traceWriter->writeFunction("CommunicationServer::~CommunicationServer");
}
int CommunicationServer::start()
{
traceWriter->writeFunction("CommunicationServer::start");
return MultiClientServer::start();
}
void CommunicationServer::lookupMinimumReplicaCount()
{
traceWriter->writeFunction(
"CommunicationServer::lookupMinimumReplicaCount");
string replicaNumber;
SecondoUtilsLocal::readFromConfigFile(replicaNumber,
"DBService",
"ReplicaNumber",
"");
minimumReplicaCount = atoi(replicaNumber.c_str());
}
int CommunicationServer::communicate(iostream& io)
{
const boost::thread::id tid = boost::this_thread::get_id();
traceWriter->writeFunction(tid, "CommunicationServer::communicate");
try
{
traceWriter->write(tid, "Communicating...");
CommunicationUtils::sendLine(io,
CommunicationProtocol::CommunicationServer());
if (!CommunicationUtils::receivedExpectedLine(io,
CommunicationProtocol::CommunicationClient()))
{
traceWriter->write(tid,
"Protocol error: Not connected to CommunicationClient");
return 1;
}
string request;
CommunicationUtils::receiveLine(io, request);
traceWriter->write(tid, "request", request);
if(request ==
CommunicationProtocol::TriggerReplication())
{
handleTriggerReplicationRequest(io, tid);
}else if(request ==
CommunicationProtocol::StartingSignal())
{
handleStartingSignalRequest(io, tid);
}else if(request ==
CommunicationProtocol::TriggerFileTransfer())
{
handleTriggerFileTransferRequest(io, tid);
}else if(request ==
CommunicationProtocol::ReplicaLocationRequest())
{
handleProvideReplicaLocationRequest(io, tid);
}else if(request ==
CommunicationProtocol::ReplicationSuccessful())
{
reportSuccessfulReplication(io, tid);
}else if(request ==
CommunicationProtocol::DeleteReplicaRequest())
{
handleRequestReplicaDeletion(io, tid);
}else if(request ==
CommunicationProtocol::TriggerReplicaDeletion())
{
handleTriggerReplicaDeletion(io, tid);
}else if(request ==
CommunicationProtocol::Ping())
{
handlePing(io, tid);
}else if(request ==
CommunicationProtocol::RelTypeRequest())
{
handleRelTypeRequest(io, tid);
}else if(request ==
CommunicationProtocol::DerivedTypeRequest())
{
handleDerivedTypeRequest(io, tid);
}else if(request ==
CommunicationProtocol::TriggerDerivation())
{
handleTriggerDerivation(io,tid);
}else if(request ==
CommunicationProtocol::CreateDerivation())
{
handleCreateDerivation(io,tid);
}else if(request ==
CommunicationProtocol::CreateDerivateSuccessful()){
reportSuccessfulDerivation(io,tid);
}else
{
traceWriter->write(
tid, "Protocol error: invalid request: ", request);
return 1;
}
} catch (...)
{
traceWriter->write(tid, "CommunicationServer: communication error");
return 2;
}
return 0;
}
bool CommunicationServer::handleTriggerReplicationRequest(
std::iostream& io, const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handleTriggerReplicationRequest");
CommunicationUtils::sendLine(io,
CommunicationProtocol::RelationRequest());
queue<string> receiveBuffer;
CommunicationUtils::receiveLines(io, 2, receiveBuffer);
string databaseName = receiveBuffer.front();
receiveBuffer.pop();
string relationName = receiveBuffer.front();
receiveBuffer.pop();
traceWriter->write(tid, "databaseName", databaseName);
traceWriter->write(tid, "relationName", relationName);
DBServiceManager* dbService = DBServiceManager::getInstance();
if(dbService->replicaExists(databaseName, relationName))
{
CommunicationUtils::sendLine(io,
CommunicationProtocol::ReplicaExists());
return false;
}
CommunicationUtils::sendLine(io,
CommunicationProtocol::LocationRequest());
CommunicationUtils::receiveLines(io, 4, receiveBuffer);
string host = receiveBuffer.front();
receiveBuffer.pop();
string port = receiveBuffer.front();
receiveBuffer.pop();
string disk = receiveBuffer.front();
receiveBuffer.pop();
string transferPort = receiveBuffer.front();
receiveBuffer.pop();
traceWriter->write(tid, "host", host);
traceWriter->write(tid, "port", port);
traceWriter->write(tid, "disk", disk);
traceWriter->write(tid, "transferPort", transferPort);
dbService->determineReplicaLocations(databaseName,
relationName,
host,
port,
disk);
vector<pair<ConnectionID, bool> > locations;
dbService->getReplicaLocations(RelationInfo::getIdentifier(
databaseName, relationName),
locations);
dbService->setOriginalLocationTransferPort(
RelationInfo::getIdentifier(databaseName, relationName),
transferPort);
if(locations.size() < (size_t)minimumReplicaCount)
{
dbService->deleteRelationLocations(databaseName, relationName);
CommunicationUtils::sendLine(io,
CommunicationProtocol::ReplicationCanceled());
}else
{
dbService->persistReplicaLocations(databaseName, relationName);
CommunicationUtils::sendLine(io,
CommunicationProtocol::ReplicationTriggered());
}
return true;
}
bool CommunicationServer::handleStartingSignalRequest(
std::iostream& io, const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handleStartingSignalRequest");
CommunicationUtils::sendLine(io,
CommunicationProtocol::RelationRequest());
string relID;
CommunicationUtils::receiveLine(io, relID);
traceWriter->write(tid, "relID", relID);
vector<pair<ConnectionID, bool> > locations;
DBServiceManager* dbService = DBServiceManager::getInstance();
dbService->getReplicaLocations(relID, locations);
RelationInfo& relationInfo = dbService->getRelationInfo(relID);
string originalLocationHost = relationInfo.
getOriginalLocation().getHost();
traceWriter->write("originalLocationHost", originalLocationHost);
string originalLocationTransferPort = relationInfo.
getOriginalLocation().
getTransferPort();
traceWriter->write("originalLocationTransferPort",
originalLocationTransferPort);
traceWriter->write("Triggering file transfers");
for(vector<pair<ConnectionID, bool> >::const_iterator it
= locations.begin(); it != locations.end(); it++)
{
LocationInfo locationInfo = dbService->getLocation((*it).first);
traceWriter->write(locationInfo);
TriggerFileTransferRunnable clientToDBServiceWorker(
originalLocationHost, /*original location*/
atoi(originalLocationTransferPort.
c_str()), /*original location*/
locationInfo.getHost(), /*DBService*/
atoi(locationInfo.getCommPort().c_str()), /*DBService*/
relationInfo.getDatabaseName(),
relationInfo.getRelationName());
clientToDBServiceWorker.run();
}
return true;
}
bool CommunicationServer::handleTriggerFileTransferRequest(
std::iostream& io, const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handleTriggerFileTransferRequest");
CommunicationUtils::sendLine(io,
CommunicationProtocol::ReplicationDetailsRequest());
traceWriter->write(tid, "sent ReplicationDetailsRequest");
queue<string> receiveBuffer;
CommunicationUtils::receiveLines(io, 5, receiveBuffer);
string host = receiveBuffer.front();
receiveBuffer.pop();
string port = receiveBuffer.front();
receiveBuffer.pop();
string fileName = receiveBuffer.front();
receiveBuffer.pop();
string databaseName = receiveBuffer.front();
receiveBuffer.pop();
string relationName = receiveBuffer.front();
receiveBuffer.pop();
traceWriter->write(tid, "received replication details");
traceWriter->write(tid, "host", host);
traceWriter->write(tid, "port", port);
// TODO remove filename, is determined automatically
traceWriter->write(tid, "fileName", fileName);
traceWriter->write(tid, "databaseName", databaseName);
traceWriter->write(tid, "relationName", relationName);
ReplicationClientRunnable replicationClient(
host,
atoi(port.c_str()),
databaseName,
relationName);
replicationClient.run();
return true;
}
bool CommunicationServer::handleProvideReplicaLocationRequest(
std::iostream& io, const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handleProvideReplicaLocationRequest");
CommunicationUtils::sendLine(io,
CommunicationProtocol::RelationRequest());
queue<string> receiveBuffer;
CommunicationUtils::receiveLines(io, 2, receiveBuffer);
// read number of other objects
string n;
CommunicationUtils::receiveLine(io, n);
bool correct;
int number = stringutils::str2int<int>(n,correct); //TODO: error handling
if(number<0) number = 0;
queue<string> otherObjects;
if(number>0){
CommunicationUtils::receiveLines(io,number,otherObjects);
}
string databaseName = receiveBuffer.front();
receiveBuffer.pop();
string relationName = receiveBuffer.front();
receiveBuffer.pop();
traceWriter->write(tid, "databaseName", databaseName);
traceWriter->write(tid, "relationName", relationName);
DBServiceManager* dbService = DBServiceManager::getInstance();
ConnectionID randomReplicaLocation = 0;
try
{
vector<ConnectionID> possibleLocations;
string relName = RelationInfo::getIdentifier(databaseName, relationName);
RelationInfo relInfo =dbService->getRelationInfo(relName);
relInfo.getAllLocations(possibleLocations);
std::sort(possibleLocations.begin(), possibleLocations.end());
traceWriter->write("check for presence of derived objects");
while(!otherObjects.empty() && !possibleLocations.empty())
{
string n = otherObjects.front();
otherObjects.pop();
traceWriter->write(tid, "look for derivate " , n);
string oname = DerivateInfo::getIdentifier(relName, n);
traceWriter->write("derivateID : " , oname);
DerivateInfo derInfo = dbService->getDerivateInfo(oname);
traceWriter->write("derivateInfo found");
vector<ConnectionID> derLocs;
derInfo.getAllLocations(derLocs);
sort(derLocs.begin(), derLocs.end());
vector<ConnectionID> inter;
set_intersection(possibleLocations.begin(), possibleLocations.end(),
derLocs.begin(), derLocs.end(),
back_inserter(inter));
inter.swap(possibleLocations);
}
if(!possibleLocations.empty()){
traceWriter->write(tid, "choose random location");
shuffle(possibleLocations.begin(), possibleLocations.end(),
std::mt19937(std::random_device()()));
randomReplicaLocation = possibleLocations.front();
} else {
traceWriter->write(tid, "No locations available");
dbService->printDerivates( std::cout);
}
} catch(...)
{
traceWriter->write(tid, "Exception: No location found");
dbService->printDerivates(std::cout);
}
queue<string> sendBuffer;
if(randomReplicaLocation == 0)
{
sendBuffer.push(CommunicationProtocol::None());
sendBuffer.push(CommunicationProtocol::None());
sendBuffer.push(CommunicationProtocol::None());
CommunicationUtils::sendBatch(io, sendBuffer);
return false;
}
LocationInfo location = dbService->getLocation(randomReplicaLocation);
sendBuffer.push(location.getHost());
sendBuffer.push(location.getTransferPort());
sendBuffer.push(location.getCommPort());
CommunicationUtils::sendBatch(io, sendBuffer);
return true;
}
bool CommunicationServer::reportSuccessfulReplication(
iostream& io,
const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::reportSuccessfulReplication");
CommunicationUtils::sendLine(io,
CommunicationProtocol::RelationRequest());
string relID;
CommunicationUtils::receiveLine(io, relID);
traceWriter->write(tid, "relID", relID);
CommunicationUtils::sendLine(io,
CommunicationProtocol::LocationRequest());
queue<string> receiveBuffer;
CommunicationUtils::receiveLines(io, 2, receiveBuffer);
string host = receiveBuffer.front();
receiveBuffer.pop();
string port = receiveBuffer.front();
receiveBuffer.pop();
traceWriter->write(tid, "host", host);
traceWriter->write(tid, "port", port);
DBServiceManager::getInstance()->maintainSuccessfulReplication(
relID, host, port);
return true;
}
void CommunicationServer::deleteRemoteDerivate(
const string& databaseName,
const string& relationName,
const string& derivateName){
string derId = DerivateInfo::getIdentifier(databaseName,relationName,
derivateName);
DBServiceManager* dbService = DBServiceManager::getInstance();
DerivateInfo& derInfo = dbService->getDerivateInfo(derId);
for(ReplicaLocations::const_iterator it =
derInfo.nodesBegin(); it != derInfo.nodesEnd(); it++)
{
if(it->second)
{
LocationInfo& locationInfo =dbService->getLocation(it->first);
TriggerReplicaDeletionRunnable replicaEraser(
locationInfo.getHost(),
atoi(locationInfo.getCommPort().c_str()),
databaseName, relationName, derivateName);
replicaEraser.run();
}
}
}
void CommunicationServer::deleteRemoteRelation(
const string& databaseName,
const string& relationName)
{
string relId = RelationInfo::getIdentifier(databaseName, relationName);
DBServiceManager* dbService = DBServiceManager::getInstance();
RelationInfo& relationInfo = dbService->getRelationInfo(relId);
// remove the relation itselfs
for(ReplicaLocations::const_iterator it =
relationInfo.nodesBegin(); it!=relationInfo.nodesEnd(); it++)
{
if(it->second)
{
LocationInfo& locationInfo = dbService->getLocation(it->first);
TriggerReplicaDeletionRunnable replicaEraser(
locationInfo.getHost(),
atoi(locationInfo.getCommPort().c_str()),
databaseName, relationName,"");
replicaEraser.run();
}
}
// delete all derivates that depend on relation
vector<string> derivates;
dbService->findDerivates(relId, derivates);
for(auto& t: derivates)
{
string db, rel, der;
if(!ReplicationUtils::extractDerivateInfo(t, db, rel, der)){
traceWriter->write("problem in extracting derivate info "
"from id");
} else {
deleteRemoteDerivate(db,rel,der);
}
}
}
void CommunicationServer::deleteRemoteDatabase(const string& databaseName){
DBServiceManager* dbService = DBServiceManager::getInstance();
vector<string> relations;
dbService->findRelations(databaseName, relations);
for(auto& r: relations){
string db;
string rel;
if(ReplicationUtils::extractRelationInfo(r,db,rel)){
deleteRemoteRelation(db,rel);
}
}
}
bool CommunicationServer::handleRequestReplicaDeletion(
iostream& io, const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handleRequestReplicaDeletion");
string databaseName;
string relationName;
string derivateName;
CommunicationUtils::receiveLine(io, databaseName);
CommunicationUtils::receiveLine(io, relationName);
CommunicationUtils::receiveLine(io, derivateName);
// if the relation and all derivates should be removed,
// the derivateName will be empty
traceWriter->write("database", databaseName);
traceWriter->write("relation", relationName);
traceWriter->write("derived", derivateName);
DBServiceManager* dbService = DBServiceManager::getInstance();
try{
if(relationName.empty()){
deleteRemoteDatabase(databaseName);
} else if(derivateName.empty()){
deleteRemoteRelation(databaseName, relationName);
} else {
deleteRemoteDerivate(databaseName, relationName, derivateName);
}
dbService->deleteReplicaMetadata(databaseName,relationName,
derivateName);
}catch(...)
{
traceWriter->write(tid, "Relation does not exist");
return false;
}
return true;
}
bool CommunicationServer::handleTriggerReplicaDeletion(
std::iostream& io,
const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handleTriggerReplicaDeletion");
string databaseName;
string relationName;
string derivateName;
CommunicationUtils::receiveLine(io, databaseName);
CommunicationUtils::receiveLine(io, relationName);
CommunicationUtils::receiveLine(io, derivateName);
string victim;
if(derivateName.empty())
{
// remove File
string filename = ReplicationUtils::getFileNameOnDBServiceWorker(
databaseName,
relationName);
FileSystem::DeleteFileOrFolder( filename );
victim = ReplicationUtils::getRelName(filename);
}
else
{
string relId = RelationInfo::getIdentifier(databaseName, relationName);
victim = DerivateInfo::getIdentifier(relId, derivateName);
}
// ensure to have only one access to the catalog
static boost::mutex mtx;
boost::lock_guard<boost::mutex> guard(mtx);
traceWriter->write("database", databaseName);
traceWriter->write("relation", relationName),
traceWriter->write("derivate", derivateName);
traceWriter->write("victim", victim);
SecondoCatalog* ctlg = SecondoSystem::GetCatalog();
SecondoSystem::BeginTransaction();
ctlg->DeleteObject(victim);
ctlg->CleanUp(false,true);
SecondoSystem::CommitTransaction(false);
//TODO check return code etc
//TODO tracing
return true;
}
bool CommunicationServer::handlePing(
std::iostream& io, const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handlePing");
CommunicationUtils::sendLine(io,
CommunicationProtocol::Ping());
return true;
}
bool CommunicationServer::handleRelTypeRequest(
std::iostream& io, const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handleRelTypeRequest");
string relID;
CommunicationUtils::receiveLine(io, relID);
string databaseName;
string relationName;
RelationInfo::parseIdentifier(relID, databaseName, relationName);
traceWriter->write("databaseName", databaseName);
traceWriter->write("relationName", relationName);
string fileName = ReplicationUtils::getFileNameOnDBServiceWorker(
databaseName,
relationName);
traceWriter->write("fileName", fileName);
string relname = ReplicationUtils::getRelName(fileName);
traceWriter->write("relName ", relname);
SecondoCatalog* ctlg = SecondoSystem::GetCatalog();
if(!ctlg->IsObjectName(relname)){
traceWriter->write(relname + " is not a database object");
CommunicationUtils::sendLine(io, CommunicationProtocol::None());
} else {
traceWriter->write(relname + " is a database object");
ListExpr type = ctlg->GetObjectTypeExpr(relname);
traceWriter->write("relType is " , nl->ToString(type));
type = nl->TwoElemList(
nl->SymbolAtom(Symbol::STREAM()),
nl->Second(type));
CommunicationUtils::sendLine(io, nl->ToString(type));
}
return true;
}
bool CommunicationServer::handleDerivedTypeRequest(
std::iostream& io, const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handleDerivedTypeRequest");
string relID;
CommunicationUtils::receiveLine(io, relID);
string derivedName;
CommunicationUtils::receiveLine(io, derivedName);
string databaseName;
string relationName;
RelationInfo::parseIdentifier(relID, databaseName, relationName);
traceWriter->write("databaseName", databaseName);
traceWriter->write("relationName", relationName);
traceWriter->write("derivedName ", derivedName);
string objectName = ReplicationUtils::getDerivedName(
databaseName,
relationName,
derivedName);
traceWriter->write("ObjectName ", objectName);
SecondoCatalog* ctlg = SecondoSystem::GetCatalog();
if(!ctlg->IsObjectName(objectName)){
traceWriter->write(objectName + " is not a database object");
CommunicationUtils::sendLine(io, CommunicationProtocol::None());
} else {
traceWriter->write(objectName + " is a database object");
ListExpr type = ctlg->GetObjectTypeExpr(objectName);
traceWriter->write("objectName is " , nl->ToString(type));
CommunicationUtils::sendLine(io, nl->ToString(type));
}
return true;
}
bool CommunicationServer::handleTriggerDerivation(
std::iostream& io, const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handleTriggerDerivation");
// request derivation information
CommunicationUtils::sendLine(io,
CommunicationProtocol::DerivationRequest());
queue<string> receiveBuffer;
CommunicationUtils::receiveLines(io, 4, receiveBuffer);
string databaseName = receiveBuffer.front();
receiveBuffer.pop();
string targetName = receiveBuffer.front();
receiveBuffer.pop();
string relName = receiveBuffer.front();
receiveBuffer.pop();
string fundef = receiveBuffer.front();
receiveBuffer.pop();
traceWriter->write(tid, "databaseName", databaseName);
traceWriter->write(tid, "targetName", targetName);
traceWriter->write(tid, "relationName", relName);
traceWriter->write(tid, "fundef", fundef);
DBServiceManager* dbService = DBServiceManager::getInstance();
if(!dbService->replicaExists(databaseName, relName))
{
CommunicationUtils::sendLine(io,
CommunicationProtocol::RelationNotExists());
return false;
}
if(dbService->derivateExists(targetName)){
CommunicationUtils::sendLine(io,
CommunicationProtocol::ObjectExists());
return false;
}
// for all workers holding a replica of the relation
// send command to the worker to derive the object
string relId = RelationInfo::getIdentifier(databaseName, relName);
ReplicaLocations rl;
string derId = dbService->determineDerivateLocations(targetName, relId,
fundef);
dbService->persistDerivateLocations(derId);
dbService->getReplicaLocations(relId, rl);
ReplicaLocations::iterator it;
for(it = rl.begin(); it!=rl.end();it++){
if(it->second){ // relation is replicated
ConnectionID cid = it->first;
LocationInfo& li = dbService->getLocation(cid);
CreateDerivateRunnable cdr(li.getHost(),
atoi(li.getCommPort().c_str()),
databaseName,
targetName,
relName,
fundef);
cdr.run();
}
}
CommunicationUtils::sendLine(io,
CommunicationProtocol::DerivationTriggered());
return true;
}
bool CommunicationServer::handleCreateDerivation(
std::iostream& io, const boost::thread::id tid)
{
traceWriter->writeFunction(tid,
"CommunicationServer::handleCreateDerivation");
CommunicationUtils::sendLine(io,
CommunicationProtocol::DerivationRequest());
queue<string> receiveBuffer;
CommunicationUtils::receiveLines(io, 4, receiveBuffer);
string databaseName = receiveBuffer.front();
receiveBuffer.pop();
string targetName = receiveBuffer.front();
receiveBuffer.pop();
string relName = receiveBuffer.front();
receiveBuffer.pop();
string fundef = receiveBuffer.front();
receiveBuffer.pop();
traceWriter->write(tid, "databaseName", databaseName);
traceWriter->write(tid, "targetName", targetName);
traceWriter->write(tid, "relationName", relName);
traceWriter->write(tid, "fundef", fundef);
DerivationClient dc(databaseName, targetName, relName, fundef);
dc.start();
return true;
}
bool CommunicationServer::reportSuccessfulDerivation(
std::iostream& io, const boost::thread::id tid){
traceWriter->writeFunction(tid,
"CommunicationServer::reportSuccessfulDerivation");
CommunicationUtils::sendLine(io,
CommunicationProtocol::ObjectRequest());
string objectID;
CommunicationUtils::receiveLine(io, objectID);
traceWriter->write(tid, "objectID", objectID);
CommunicationUtils::sendLine(io,
CommunicationProtocol::LocationRequest());
queue<string> receiveBuffer;
CommunicationUtils::receiveLines(io, 2, receiveBuffer);
string host = receiveBuffer.front();
receiveBuffer.pop();
string port = receiveBuffer.front();
receiveBuffer.pop();
traceWriter->write(tid, "host", host);
traceWriter->write(tid, "port", port);
DBServiceManager::getInstance()->maintainSuccessfulDerivation(
objectID, host, port);
return true;
}
} /* namespace DBService */