/* ---- This file is part of SECONDO. Copyright (C) 2004, University in Hagen, Department of Computer Science, Database Systems for New Applications. SECONDO is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. SECONDO is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with SECONDO; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ---- //paragraph [1] Title: [{\Large \bf \begin {center}] [\end {center}}] //[TOC] [\tableofcontents] [1] Header File of the class of operator ~setPregelFunction~ November 2018, J. Mende [TOC] 1 Overview This header file contains definitions of type mapping, vallue mapping and the operator specification. 2 Defines and includes */ #include #include #include #include "SetPregelFunction.h" #include "QueryProcessor.h" #include "../../Helpers/Commander.h" #include "../../typedefs.h" #include "../../../FText/FTextAlgebra.h" namespace pregel { ListExpr SetPregelFunction::typeMapping(ListExpr args) { if (!nl->HasLength(args, 2)) { return listutils::typeError("You must provide 2 arguments."); } const ListExpr function = nl->First(args); auto messageSlotAttribute = nl->Second(args); // Uses args in type mapping if (!nl->HasLength(function, 2) || !nl->HasLength(messageSlotAttribute, 2)) { return listutils::typeError("Internal Failure"); } auto functionType = nl->First(function); auto functionValue = nl->Second(function); if (!listutils::isMap<1>(functionType)) { return listutils::typeError( "The first argument must be a function"); } if (!listutils::isSymbol(nl->First(messageSlotAttribute))) { return listutils::typeError( "The second argument must be a symbol"); } auto argType = nl->Second(functionType); auto returnType = nl->Third(functionType); if (!listutils::isTupleStream(argType) || !listutils::isTupleStream(returnType)) { return listutils::typeError( "The function must take a tuple stream as parameter"); } if (!nl->Equal(argType, returnType)) { return listutils::typeError( "The function must produce a stream of the same type of tuple"); } auto tupleType = nl->Second(argType); PregelContext::get().setMessageType(tupleType); std::string attributeName = nl->SymbolValue(nl->First(messageSlotAttribute)); int index; try { index = findAttribute(attributeName, nl->Second(tupleType)); } catch (std::exception &e) { return listutils::typeError( "\"" + attributeName + "\" doesn't name an attribute in the tuple type, or isn't an int"); } std::string functionText = nl->ToString(functionValue); return nl->ThreeElemList( nl->SymbolAtom(Symbols::APPEND()), nl->TwoElemList(nl->TextAtom(functionText), nl->IntAtom(index - 1)), nl->SymbolAtom(CcBool::BasicType())); } int SetPregelFunction::valueMapping(Word *args, Word &result, int, Word &, Supplier s) { result = qp->ResultStorage(s); // ignore actual function at args[0] // ignore attribute name at args[1] auto functionText = (FText *) args[2].addr; auto addressIndexInt = (CcInt *) args[3].addr; PRECONDITION(PregelContext::get().isSetUp() || PregelContext::get().isReady(), "Please run \"query setupPregel(...) first.\"") std::string function = functionText->GetValue(); int addressIndex = addressIndexInt->GetValue(); PregelContext::get().setAddressIndex(addressIndex); PregelContext::get().setFunction(function); bool success = remoteQueryCall(function, addressIndex); PregelContext::get().ready(); ((CcBool *) result.addr)->Set(true, success); return 0; } OperatorSpec SetPregelFunction::operatorSpec( "map(stream(tuple), stream(tuple)) x symbol -> bool", "# (_,_)", "This operator defines the function type of the Pregel system." "The message type is set implicitly as it is derived from the type of the " "input argument of the function." "The argument of this operator must be of type map and the input and " "return type of the SECONDO function must be of the same tuple type." "This operator checks the function type and delegates the actual " "configuration of the Pregel system to the workers through the operator " "'setPregelFunction(...)'." "The additional (second) argument of the operator denotes an attribute of " "the message type. This attribute serves as the address of each message." "You must provide a symbol that matches one attribute name in the tuple type." "The operator returns TRUE as long as the workers are reachable." "NOTE: You must run the operator 'setupPregel(...) first.'", "query setPregelFunction(Function, Address);", "This operator belongs to the Pregel API." "It may require knowledge of the system to effectively understand and " "use all the operators that are provided." ); Operator SetPregelFunction::setPregelFunction( "setPregelFunction", SetPregelFunction::operatorSpec.getStr(), SetPregelFunction::valueMapping, Operator::SimpleSelect, SetPregelFunction::typeMapping ); bool SetPregelFunction::remoteQueryCall(std::string &function, int index) { const supplier workers = PregelContext::get().getWorkers(); std::string query = "query setPregelFunctionWorker(" + function + ", " + std::to_string(index) + ");"; for (auto worker = workers(); worker != nullptr; worker = workers()) { try { Commander::remoteQuery(worker->connection, query, Commander::throwWhenFalse); } catch (RemoteExecutionException &e) { BOOST_LOG_TRIVIAL(error) << "failed to set function: " << e.getMessage(); return false; } } return true; } int SetPregelFunction::findAttribute(const std::string &attributeName, const ListExpr tupleType) noexcept(false) { ListExpr attributeType; int index = listutils::findAttribute(tupleType, attributeName, attributeType); if (index == 0 || !nl->Equal(attributeType, nl->SymbolAtom(CcInt::BasicType()))) { throw std::exception(); } return index; } }