/* 1.1.1 Class Implementation ---- This file is part of SECONDO. Copyright (C) 2017, Faculty of Mathematics and Computer Science, Database Systems for New Applications. SECONDO is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. SECONDO is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with SECONDO; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ---- */ #include "Algebras/DBService2/Relation.hpp" #include "Algebras/DBService2/Query.hpp" #include "Algebras/DBService2/SecondoRelationAdapter.hpp" #include "Algebras/DBService2/Node.hpp" #include #include #include #include #include #include #include using namespace std; namespace DBService { //class Node; Relation::Relation() : Record::Record() { setName(""); setDatabase(""); } Relation::Relation(string newRelationDatabase, string relationName) : Record::Record() { setRelationDatabase(newRelationDatabase); setName(relationName); } Relation::Relation(string newRelationDatabase, string relationName, shared_ptr newOriginalNode) : Record::Record() { setRelationDatabase(newRelationDatabase); setName(relationName); setOriginalNode(newOriginalNode); } Relation::Relation(string relationDatabase, string relationName, string originalNodeHost, int originalNodePort, string originalNodeDiskPath) : Record::Record() { // At this point we don't have a config -> "" shared_ptr newOriginalNode = make_shared( originalNodeHost, originalNodePort, ""); newOriginalNode->setDiskPath(originalNodeDiskPath); setRelationDatabase(relationDatabase); setName(relationName); setOriginalNode(newOriginalNode); } shared_ptr Relation::build() { LOG_SCOPE_FUNCTION(INFO); shared_ptr instance(new DBService::Relation()); return instance; } shared_ptr Relation::build(string relationDatabase, string relationName) { LOG_SCOPE_FUNCTION(INFO); shared_ptr instance(new DBService::Relation( relationDatabase, relationName)); return instance; } shared_ptr Relation::build(string relationDatabase, string relationName, shared_ptr originalNode) { LOG_SCOPE_FUNCTION(INFO); shared_ptr instance(new DBService::Relation( relationDatabase, relationName, originalNode)); return instance; } shared_ptr Relation::build(string relationDatabase, string relationName, string originalNodeHost, int originalNodePort, string originalNodeDisk) { LOG_SCOPE_FUNCTION(INFO); shared_ptr instance(new DBService::Relation( relationDatabase, relationName, originalNodeHost, originalNodePort, originalNodeDisk)); instance->getOriginalNode()->setType(Node::nodeTypeOriginal()); return instance; } string Relation::getName() const { return name; } void Relation::setName(string newName) { if(newName != getName()) { if(getIsNew() == true) { name = newName; setDirty(); } else { throw "Can't set relation name for non-new relation!"; } } } vector > Relation::getReplicas() const { return replicas; } int Relation::getReplicaCount() const { return replicas.size(); } void Relation::addReplica(shared_ptr newReplica) { newReplica->setDatabase(getDatabase()); // https://stackoverflow.com/questions/11711034/stdshared-ptr-of-this newReplica->setRelation(shared_from_this()); if(!doesReplicaExist(newReplica)) { replicas.push_back(newReplica); setDirty(); } else { throw "Can't add replica. Replica already exists!"; } } void Relation::addReplica(shared_ptr targetNode) { shared_ptr replica = make_shared(); replica->setTargetNode(targetNode); // Also sets db and relation. Marks the relation as dirty. addReplica(replica); } bool Relation::doesReplicaExist(shared_ptr newReplica) { //TODO // if (find(replicas.begin(), replicas.end(), newReplica) != replicas.end()) for(const auto& replica : replicas) { if(replica == newReplica) return true; } return false; } shared_ptr Relation::findReplica(string targetHost, int targetPort, std::string replicaType) { LOG_SCOPE_FUNCTION(INFO); /* In order to compare nodes including DNS name resolution the comparison operator of Node is being used. */ DBService::Node nodeToFind( as_const(targetHost), as_const(targetPort), ""); for(auto& replica : replicas) { if(replica->getType() == replicaType && *replica->getTargetNode() == nodeToFind) { return replica; } } return nullptr; } shared_ptr Relation::findReplicaByDerivativeId(string targetHost, int targetPort, int derivativeId) { LOG_SCOPE_FUNCTION(INFO); string replicaType = Replica::replicaTypeDerivative; /* In order to compare nodes including DNS name resolution the comparison operator of Node is being used. */ DBService::Node nodeToFind( as_const(targetHost), as_const(targetPort), ""); for(auto& replica : replicas) { if(replica->getType() == replicaType && *replica->getTargetNode() == nodeToFind && replica->getDerivativeId() == derivativeId) { return replica; } } return nullptr; } void Relation::updateReplicaStatus(string targetHost, int targetPort, std::string replicaStatus) { LOG_SCOPE_FUNCTION(INFO); /* Hypothesis: It may happen that the dbservice is slower in creating the relation than the replication has been completed. This is because the relation is a much more complicated structure triggering a cascade of insert statements. Therefore, it may happen that the update statement of a replica is triggered before the relation's replicas have been saved. In this case we defer the upgrade for a few seconds a try again. */ shared_ptr replica; bool goOn = true; do { LOG_F(INFO, "Trying to find replica of relation %s...", getName().c_str()); replica = findReplica(targetHost, targetPort); if(replica == nullptr) { LOG_F(WARNING, "Couldn't find replica of relation %s by targetHost %s " " and targetPort %d. Will try again later.", getName().c_str(), targetHost.c_str(), targetPort); //TODO make configurable int randomDuration = rand() % 5000; boost::this_thread::sleep_for( boost::chrono::milliseconds(randomDuration)); } else { // A replica has been found. LOG_F(INFO, "Found replica of relation %s...", getName().c_str()); goOn = false; } } while (goOn); replica->setStatus(replicaStatus); replica->save(); } void Relation::resetReplicas() { LOG_SCOPE_FUNCTION(INFO); shared_ptr replica; for(int i = 0; i < replicas.size(); i++) { replica = replicas[i]; if(replica->getIsDirty() == true) { replicas.erase(replicas.begin() + i); } } } shared_ptr Relation::getRandomReplica() const { // If the relation has no replicas if(getReplicaCount() < 1) return nullptr; auto replicas = getReplicas(); // Selecting random Replica srand(time(0)); int randomIndex = rand() % replicas.size(); return replicas[randomIndex]; } /* Set the id of the original Node and load the node. Does nothing if an original Node has already been loaded. => There is not reload functionality. */ void Relation::loadOriginalNode(int originalNodeId) { LOG_SCOPE_FUNCTION(INFO); if(originalNode == nullptr) { /* The node manager has already loaded nodes. It should be avoided that objects exist several times in memory. This may lead to inconsistencies. Marking a record as read-only could also prevent this. An original node may have to be created during replication along with its relation. > a new relation with a new node will be saved. */ //using Record::findByTid; originalNode = DBService::Node::findByTid(database, originalNodeId); if(originalNode == nullptr) throw "Couldn't load the Relation's originalNode: " + str(); } } /* Load the Relation's replicas. Does not load derivative-replicas into the ~replicas~ vector. */ void Relation::loadReplicas() { LOG_SCOPE_FUNCTION(INFO); if(replicas.empty()) { // Relation.cpp:113:16: error: no viable overloaded '=' // replicas = Replica::findByRelationId(database, getId()); vector > shrPtrReplicas = Replica::findByRelationId( database, getId()); for(shared_ptr shrdPtrReplica : shrPtrReplicas) { // true -> avoid dirty checking by forcefully setting the Relation shrdPtrReplica->setRelation(shared_from_this(), true); replicas.push_back(shrdPtrReplica); } } } void Relation::setOriginalNode(shared_ptr newOriginalNode) { if(newOriginalNode != getOriginalNode()) { if(getIsNew() == true) { originalNode = newOriginalNode; originalNode->setDatabase(database); } else { throw "Can't change Original Node for non-new Relation!"; } } } shared_ptr Relation::getOriginalNode() const { return originalNode; } bool Relation::operator==(const Relation& other) const { return (boost::to_upper_copy(getRelationDatabase()) == boost::to_upper_copy( other.getRelationDatabase()) && getName() == other.getName()); } bool Relation::operator!=(const Relation& other) const { return (boost::to_upper_copy(getRelationDatabase()) != boost::to_upper_copy( other.getRelationDatabase()) || getName() != other.getName()); } string Relation::getRelationName() { return "dbs_relations"; } string Relation::getRelationDatabase() const { return relationDatabase; } void Relation::setRelationDatabase(string newRelationDatabase) { if(newRelationDatabase != relationDatabase) { if(getIsNew() == true) { relationDatabase = newRelationDatabase; // Comparison of DB names is done in uppercase boost::to_upper(relationDatabase); setDirty(); } else { throw "Can't change the relation-database for a non-new Relation!"; } } } // Derivatives START shared_ptr Relation::addDerivative( string derivativeName, string function) { LOG_SCOPE_FUNCTION(INFO); shared_ptr derivative = DBService::Derivative::build( derivativeName, function, shared_from_this()); derivative->setDatabase(getDatabase()); /* Add a derivative-replica per relation-replica using the corresponding target node. */ derivative->syncReplicasWithRelation(); derivatives.push_back(derivative); setDirty(); return derivative; } int Relation::getDerivativeCount() const { return derivatives.size(); } vector > Relation::getDerivatives() { return derivatives; } void Relation::loadDerivatives() { LOG_SCOPE_FUNCTION(INFO); /* TODO There is a strong similarity between Replicas and Derivatives as both are dependent (composite) objects. Thus there is a lot of code duplication which can surely be further reduced. */ if(derivatives.empty()) { // Relation.cpp:113:16: error: no viable overloaded '=' // replicas = Replica::findByRelationId(database, getId()); vector > loadedDerivatives = Derivative::findByRelationId(database, getId()); for(shared_ptr derivative : loadedDerivatives) { // true -> avoid dirty checking by forcefully setting the Relation derivative->setRelation(shared_from_this(), true); derivatives.push_back(derivative); } } } shared_ptr Relation::findDerivative(int derivativeId) { for(auto& derivative : derivatives) { if(derivative->getId() == derivativeId) return derivative; } return nullptr; } shared_ptr Relation::findDerivative(string derivativeName) { for(auto& derivative : derivatives) { if(derivative->getName() == derivativeName) return derivative; } return nullptr; } void Relation::updateDerivativeReplicaStatus(string derivativeName, string targetHost, int targetPort, string replicaStatus) { auto derivative = findDerivative(derivativeName); derivative->updateReplicaStatus(targetHost, targetPort, replicaStatus); } bool Relation::doesDerivativeExist(std::string derivativeName) { for(auto& derivative : derivatives) { if(derivative->getName() == derivativeName) return true; } return false; } // Derivatives END string Relation::str(int indentationLevel) const { string ind = std::string(2 * indentationLevel, ' '); string ind2 = ind + " "; stringstream ret; ret << ind << "{" << endl; ret << ind2 << "RelationName: " << getName() << ", " << endl; ret << ind2 << "RelationDatabase: " << getRelationDatabase() << endl; ret << ind << "}"; return ret.str(); } string Relation::createRelationStatement() { //TODO this neglects the existence of schema migrations - // the evolution of database schemas over time. return "let " + getRelationName() + " = [const rel(tuple([Name: string, \ Database: string, OriginalNodeId: int])) value ()]"; } string Relation::createStatement() const { const string quote = "\""; stringstream createStatement; if(getOriginalNode() == nullptr) throw "Can't execute Relation::createStatement() without original Node!"; if(getOriginalNode()->getIsNew() == true) throw "Can't execute Relation::createStatement() with an unsaved \ original Node!"; createStatement << "query " << Relation::getRelationName(); createStatement << " inserttuple" << "["; createStatement << quote << getName() << quote << ", "; createStatement << quote << getRelationDatabase() << quote << ", "; createStatement << originalNode->getId(); createStatement << "] consume"; return createStatement.str(); } string Relation::updateStatement() const { // A neutral, non-mutating query as an Relation can't be updated. return "query 1"; } ostream& operator<<(ostream& os, Relation& relation) { int indentationLevel = 1; string ind = std::string(2 * indentationLevel, ' '); string ind2 = ind + " "; os << ind << "{" << endl; os << ind2 << "Relation Name: " << relation.getName() << endl; os << ind2 << "Relation Database: " << relation.getRelationDatabase(); os << endl; if(relation.getOriginalNode() != nullptr) { os << ind2 << "Original Node: " << endl; os << relation.getOriginalNode()->str(indentationLevel + 1) << endl; } os << ind2 << "Relation Replicas (" << relation.getReplicaCount(); os << "):" << endl; if(relation.getReplicaCount() > 0) { for(const auto& replica : relation.getReplicas()) os << replica->str(indentationLevel + 1) << endl; } os << ind2 << "Relation Derivatives ("; os << relation.getDerivativeCount() << "):" << endl; if(relation.getDerivativeCount() > 0) { for(auto& derivative : relation.getDerivatives()) { os << derivative->str(indentationLevel + 1) << endl; } } os << "}" << endl; // os << "\tOriginal node id: " << relation.getOriginalNode return os; } Query Relation::query(string database) { stringstream query; query << "query" << " " << getRelationName(); return Query(database, query.str()); } Query Relation::queryByDatabaseAndName(std::string database, std::string relationDatabase, std::string relationName) { return DBService::Relation::query(database).feed().filter(".Database = ?", boost::to_upper_copy(relationDatabase)).filter(".Name = ?", relationName).addid().consume(); } string Relation::findAllStatement(string database) { //TODO passing an empty query is dangerous as the resulting query is not // executable. // Maybe there should be a database parameter and the function should be // renamed to findAll instead of findAllStatement! return Relation::query(database).feed().addid().consume().str(); // This approach leads to a #relations * #replica results and is harder to // process in RecordAdapters. Caching is much simpler to implement. // return Relation::query(database).appendString("addid feed {dbsrel} " // "dbs_replicas feed {dbsrep} " // "symmjoin[tid2int(.TID_dbsrel) = ..RelationId_dbsrep]").consume().str(); } vector > Relation::findAll(string database) { Query query = Relation::query(database).feed().addid().consume(); return query.retrieveVector(); } //TODO Remove database field as it is redundant to // the one contained in the query object. //TODO Establish findAll(...) and findOne(...) for all Record types. shared_ptr Relation::findOne(string database, Query findQuery) { return findQuery.retrieveObject(); } void Relation::beforeSave() { if(originalNode != nullptr) { /* Ensure that the original Nodes' database is equal to the Relations' database which may get out of sync as setDatabase from the Record class template does not cascade setDatabase to the Relation's originalNode. */ originalNode->setDatabase(getDatabase()); originalNode->save(); } } void Relation::afterSave() { saveReplicas(); saveDerivatives(); } void Relation::saveReplicas() { if(getReplicaCount() <= 0) return; for(auto& replica : getReplicas()) { if(replica->getIsDirty()) { replica->setDatabase(getDatabase()); replica->setRelationId(getId()); replica->save(); } } } void Relation::saveDerivatives() { if(getDerivativeCount() <= 0) return; for(auto& derivative : getDerivatives()) { if(derivative->getIsDirty()) { derivative->setDatabase(getDatabase()); derivative->save(); } } } bool Relation::empty() const { return (getName() == "" && getRelationDatabase() == ""); } void Relation::afterDestroy() { deleteReplicas(); deleteDerivatives(); } void Relation::deleteDerivatives() { for(auto& derivative : derivatives) derivative->destroy(); } void Relation::deleteReplicas() { for(auto& replica : replicas) replica->destroy(); } void Relation::deleteDerivative(std::string derivativeName) { auto derivative = findDerivative(derivativeName); // Also remove derivative from the derivatives vector if(derivative != nullptr) { derivative->destroy(); derivatives.erase(std::remove(derivatives.begin(), derivatives.end(), derivative), derivatives.end()); } } }