/* ---- This file is part of SECONDO. Copyright (C) 2004, University in Hagen, Department of Computer Science, Database Systems for New Applications. SECONDO is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. SECONDO is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with SECONDO; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ---- //paragraph [1] Title: [{\Large \bf \begin{center}] [\end{center}}] //paragraph [10] Footnote: [{\footnote{] [}}] //[ue] [\"u] //[ae] [\"a] //[_] [\_] //[TOC] [\tableofcontents] [1] StreamAlgebra - Implementing Generalized (non-tuple) Streams of Objects December 2006, Initial version implemented by Christian D[ue]ntgen, Faculty for Mathematics and Informatics, LG Database Systems for New Applications Feruniversit[ae]t in Hagen. ---- State Operator/Signatures OK use: (stream X) (map X Y) --> (stream Y) OK (stream X) (map X (stream Y)) --> (stream Y) OK use2: (stream X) Y (map X Y Z) --> (stream Z) OK (stream X) Y (map X Y stream(Z)) --> (stream Z) OK X (stream Y) (map X y Z) --> (stream Z) OK X (stream Y) (map X y (stream Z)) --> (stream Z) OK (stream X) (stream Y) (map X Y Z) --> (stream Z) OK (stream X) (stream Y) (map X Y (stream Z)) --> (stream Z) for X,Y,Z of kind DATA OK feed: T --> (stream T) OK transformstream: stream(tuple((Id T))) --> (stream T) OK (stream T) --> stream(tuple((Elem T))) OK aggregateS: (stream T) x (T x T --> T) x T --> T OK count: (stream T) --> int OK filter: ((stream T) (map T bool)) --> int OK printstream: (stream T) --> (stream T) projecttransformstream: stream(tuple((a1 t1) ..(an tn))) x ai -> stream(ti) COMMENTS: (*): These operators have been implemented for T in {bool, int, real, point} (**): These operators have been implemented for T in {bool, int, real, point, string, region} Key to STATE of implementation: OK : Operator has been implemented and fully tested (OK): Operator has been implemented and partially tested Test: Operator has been implemented, but tests have not been done Pre : Operator has not been functionally implemented, but stubs (dummy code) exist n/a : Neither functionally nor dummy code exists for this ones + : Equivalent exists for according mType - : Does nor exist for according mType ? : It is unclear, whether it exists or not ---- */ /* 0. Bug-List ---- (none known) Key: (C): system crash (R): Wrong result ---- */ /* [TOC] 1 Overview This file contains the implementation of the stream operators. 2 Defines, includes, and constants */ #include #include #include #include #include #include #include #include #include #include #include #include #include "CostEstimation.h" #include "NestedList.h" #include "QueryProcessor.h" #include "AlgebraManager.h" #include "Algebra.h" #include "StandardTypes.h" #include "Algebras/Relation-C++/RelationAlgebra.h" #include "SecondoSystem.h" #include "Symbols.h" #include "NList.h" #include "ListUtils.h" #include "Progress.h" #include "AlmostEqual.h" #include "Stream.h" #include "Algebras/Standard-C++/LongInt.h" extern NestedList* nl; extern QueryProcessor* qp; extern AlgebraManager* am; using namespace std; // #define GSA_DEBUG #define STRALG_DEBUG false #define DEBUGMESSAGE(MESSAGE) if(STRALG_DEBUG) cout << __PRETTY_FUNCTION__ \ << " (" << __FILE__ << ":" << __LINE__ << ") " << MESSAGE << endl /* 4 General Selection functions */ /* 5 Implementation of Algebra Operators */ /* 5.19 Operator ~feed~ The operator is used to cast a single value T to a (stream T) having a single element of type T. 5.19.1 Type Mapping for ~feed~ ---- DATA -> stream(DATA) ---- */ ListExpr TypeMapStreamfeed( ListExpr args ) { // if it is a stream, just pass the incoming stream if(nl->HasLength(args,1) && listutils::isStream(nl->First(args))){ return nl->First(args); } string err = "one argument of kind DATA expected"; if(nl->ListLength(args)!=1){ return listutils::typeError(err); } ListExpr arg1 = nl->First(args); if(Tuple::checkType(arg1)){ return Stream::wrap(arg1); } if(!listutils::isDATA(arg1)){ return listutils::typeError(err); } if(nl->ListLength(arg1) == 2){ ListExpr outerType = nl->First(arg1); if (nl->IsAtom(outerType) && (nl->AtomType(outerType)==SymbolType)) { if ((nl->SymbolValue(outerType)=="arel2") || (nl->SymbolValue(outerType)=="nrel2")) { return listutils::typeError("arel2 and nrel2 are processed " "by NestedRelation2Algebra"); } } } return Stream::wrap(arg1); } /* 5.19.2 Value Mapping for ~feed~ T may be of type Attribute or Tuple */ struct SFeedLocalInfo { bool finished; bool sonIsObjectNode; bool progressinitialized; double* attrSize; double* attrSizeExt; int noAttributes; SFeedLocalInfo(Attribute* arg, const bool isObject): finished( false ), sonIsObjectNode( isObject ), progressinitialized( false ) { double coresize = arg->Sizeof(); double flobsize = 0.0; for(int i=0; i < arg->NumOfFLOBs(); i++){ flobsize += arg->GetFLOB(i)->getSize(); } attrSize = new double[1]; attrSize[0] = coresize + flobsize; attrSizeExt = new double[1]; attrSizeExt[0] = coresize; noAttributes = 1; } SFeedLocalInfo(Tuple* arg, const bool isObject): finished( false ), sonIsObjectNode( isObject ), progressinitialized( false ) { attrSize = new double[1]; attrSize[0] = arg->GetExtSize(); attrSizeExt = new double[1]; attrSizeExt[0] = arg->GetRootSize(); noAttributes = arg->GetNoAttributes(); } ~SFeedLocalInfo(){ if(attrSize) {delete[] attrSize; attrSize = 0;} if(attrSizeExt) {delete[] attrSizeExt; attrSizeExt = 0;} } }; template int MappingStreamFeed( Word* args, Word& result, int message, Word& local, Supplier s ) { SFeedLocalInfo *linfo; T* arg = (static_cast(args[0].addr)); switch( message ){ case OPEN:{ linfo = static_cast(local.addr); if(linfo){ delete linfo; } linfo = new SFeedLocalInfo(arg,qp->IsObjectNode(qp->GetSupplierSon(s,0))); local.setAddr(linfo); return 0; } case REQUEST:{ if ( local.addr == 0 ) return CANCEL; linfo = static_cast(local.addr); if ( linfo->finished ) return CANCEL; result.setAddr(arg->Clone()); linfo->finished = true; return YIELD; } case CLOSE:{ // localinfo is disposed by CLOSEPROGRESS return 0; } case CLOSEPROGRESS:{ if ( local.addr ) { linfo = static_cast(local.addr); delete linfo; local.setAddr(0); } return 0; } case REQUESTPROGRESS:{ linfo = static_cast(local.addr); if(!linfo){ return CANCEL; } ProgressInfo *pRes; pRes = (ProgressInfo*) result.addr; ProgressInfo p1; if( !linfo->sonIsObjectNode){ if(!qp->RequestProgress(qp->GetSupplierSon(s,0), &p1) ) { return CANCEL; } // the son is a computed result node // just copy everything pRes->CopyBlocking(p1); pRes->Time = p1.Time; } else { // the son is a database object pRes->BTime = 0.00001; // no blocking time pRes->BProgress = 1.0; // non-blocking pRes->Time = 0.00001; // (almost) zero runtime } if(linfo->progressinitialized){ pRes->sizesChanged = false; linfo->progressinitialized = true; } else { pRes->sizesChanged = true; } pRes->Card = 1; // cardinality pRes->Size = linfo->attrSize[0]; // total size pRes->SizeExt = linfo->attrSizeExt[0]; // size w/o FLOBS pRes->noAttrs = linfo->noAttributes; //no of attributes pRes->attrSize = linfo->attrSize; pRes->attrSizeExt = linfo->attrSizeExt; pRes->sizesChanged = true; //sizes have been recomputed if(linfo->finished){ pRes->Progress = 1.0; pRes->Time = 0.00001; } return YIELD; } } // switch return -1; // should not be reached } int MappingStreamFeedStream( Word* args, Word& result, int message, Word& local, Supplier s ) { switch(message){ case OPEN: qp->Open(args[0].addr); return 0; case REQUEST: qp->Request(args[0].addr, result); return qp->Received(args[0].addr)? YIELD:CANCEL; case CLOSE: qp->Close(args[0].addr); return 0; case CLOSEPROGRESS: break; case REQUESTPROGRESS: ProgressInfo* pRes; pRes = (ProgressInfo*) result.addr; if ( qp->RequestProgress(args[0].addr, pRes) ){ return YIELD; } else { result.addr = 0; return CANCEL; } break; } return 0; } /* 5.19.3 Specification for operator ~feed~ */ const string StreamSpecfeed= "( ( \"Signature\" \"Syntax\" \"Meaning\" \"Example\" ) " "( For T in kind DATA:\n" "T -> (stream T)" "_ feed" "create a single-value stream from " "a single value." "query [const int value 5] feed count; ) )"; /* 5.19.4 Selection Function of operator ~feed~ */ ValueMapping streamfeedmap[] = { MappingStreamFeed, MappingStreamFeedStream, MappingStreamFeed }; int StreamfeedSelect( ListExpr args ) { if(listutils::isStream(nl->First(args))) return 1; return Attribute::checkType(nl->First(args))?0:2; } /* 5.19.5 Definition of operator ~feed~ */ Operator streamfeed( "feed", StreamSpecfeed, 3, streamfeedmap, StreamfeedSelect, TypeMapStreamfeed); /* 5.20 Operator ~use~ The ~use~ class of operators implements a set of functors, that derive stream-valued operators from operators taking scalar arguments and returning scalar values or streams of values: ---- use: (stream X) (map X Y) -> (stream Y) (stream X) (map X (stream Y)) -> (stream Y) (stream X) Y (map X Y Z) -> (stream Z) (stream X) Y (map X Y stream(Z)) -> (stream Z) X (stream Y) (map X y Z) -> (stream Z) X (stream Y) (map X y (stream Z)) -> (stream Z) (stream X) (stream Y) (map X Y Z) -> (stream Z) (stream X) (stream Y) (map X Y (stream Z)) -> (stream Z) for X,Y,Z of kind DATA ---- 5.20.1 Type Mapping for ~use~ */ ListExpr TypeMapUse( ListExpr args ) { string err="stream(S) x ( S -> T) , S,T in DATA expected"; if(!nl->HasLength(args,2)){ return listutils::typeError(err); } if(!Stream::checkType(nl->First(args)) && !Stream::checkType(nl->First(args))){ return listutils::typeError(err); } if(!listutils::isMap<1>(nl->Second(args))){ return listutils::typeError(err); } ListExpr streamType = nl->Second(nl->First(args)); ListExpr funArg = nl->Second(nl->Second(args)); ListExpr funRes = nl->Third(nl->Second(args)); if(!nl->Equal(streamType,funArg)){ return listutils::typeError(err + " (stream type different " "to function argument)"); } if(Stream::checkType(funRes)){ return funRes; } if(!listutils::isDATA(funRes)){ return listutils::typeError(err + " (result of function " "not in kind DATA)"); } return nl->TwoElemList( nl->SymbolAtom(Stream::BasicType()), funRes); } ListExpr TypeMapUse2( ListExpr args ) { string outstr1, outstr2; // output strings ListExpr sarg1, sarg2, map; // arguments to use ListExpr marg1, marg2, mres; // argument to mapping ListExpr sarg1Type, sarg2Type, sresType; // 'flat' arg type ListExpr argConfDescriptor; bool sarg1isstream = false, sarg2isstream = false, resisstream = false; int argConfCode = 0; // 0. Check number of arguments if ( (nl->ListLength( args ) != 3) ) { ErrorReporter::ReportError("Operator use2 expects a list of " "length three "); return nl->SymbolAtom( Symbol::TYPEERROR() ); } // 1. get use arguments sarg1 = nl->First( args ); sarg2 = nl->Second( args ); map = nl->Third( args ); // check basics if(!Stream::checkType(sarg1) && !Stream::checkType(sarg1) && !Attribute::checkType(sarg1)){ return listutils::typeError("first argument has to be a tuple stream, " "a data stream or an attribute"); } if(!Stream::checkType(sarg2) && !Stream::checkType(sarg2) && !Attribute::checkType(sarg2)){ return listutils::typeError("second argument has to be a tuple stream, " "a data stream or an attribute"); } if(!listutils::isMap<2>(map)){ return listutils::typeError("third argument is not a map having 2" " arguments"); } // 2. First argument // check sarg1 for being a stream if( Attribute::checkType(sarg1)) { // attribute sarg1Type = sarg1; sarg1isstream = false; } else { // stream sarg1Type = nl->Second(sarg1); sarg1isstream = true; } // 3. Second Argument // check sarg2 for being a stream if( Attribute::checkType(sarg2)) { // attribute sarg2Type = sarg2; sarg2isstream = false; } else { // stream sarg2Type = nl->Second(sarg2); sarg2isstream = true; } // 4. First and Second argument // check whether at least one stream argument is present if ( !sarg1isstream && !sarg2isstream ) { return listutils::typeError("at leat one of the first two args " " must be a stream(T), with T in" " {DATA, tuple}"); } // 5. Third argument // get map arguments marg1 = nl->Second(map); marg2 = nl->Third(map); mres = nl->Fourth(map); // check marg1 if ( !( nl->Equal(marg1, sarg1Type) ) ) { nl->WriteToString(outstr1, sarg1Type); nl->WriteToString(outstr2, marg1); ErrorReporter::ReportError("Operator use2: 1st argument's stream" "type does not match the type of the " "mapping's 1st argument. If e.g. the first " "is 'stream X', then the latter must be 'X'." "The types passed are '" + outstr1 + "' and '" + outstr2 + "'."); return nl->SymbolAtom( Symbol::TYPEERROR() ); } // check marg2 if ( !( nl->Equal(marg2, sarg2Type) ) ) { nl->WriteToString(outstr1, sarg2Type); nl->WriteToString(outstr2, marg2); ErrorReporter::ReportError("Operator use2: 2nd argument's stream" "type does not match the type of the " "mapping's 2nd argument. If e.g. the second" " is 'stream X', then the latter must be 'X'." "The types passed are '" + outstr1 + "' and '" + outstr2 + "'."); return nl->SymbolAtom( Symbol::TYPEERROR() ); } // 6. Determine result type // get map result type 'sresType' // may be a stream of T, T in {DATA,TUPLE} if( Stream::checkType(mres) || Stream::checkType(mres)){ resisstream = true; sresType = mres; // map result type is already a stream } else if( Attribute::checkType(mres) || Tuple::checkType(mres)){ resisstream = false; sresType = nl->TwoElemList(nl->SymbolAtom(Symbol::STREAM()), mres); } else { return listutils::typeError("result of the map must be T or " "stream(T), with T in {DATA,TUPLE}"); } // 7. This check can be removed when operators working on tuplestreams have // been implemented: if ( Tuple::checkType(sarg1Type) || Stream::checkType(sarg1Type)){ return listutils::typeError("use2: support for tuple not implemented"); } // 8. Append flags describing argument configuration for value mapping: // 0: no stream // 1: sarg1 is a stream // 2: sarg2 is a stream // 4: map result is a stream // // e.g. 7=4+2+1: both arguments are streams and the // map result is a stream if(sarg1isstream) argConfCode += 1; if(sarg2isstream) argConfCode += 2; if(resisstream) argConfCode += 4; argConfDescriptor = nl->OneElemList(nl->IntAtom(argConfCode)); return nl->ThreeElemList(nl->SymbolAtom(Symbol::APPEND()), argConfDescriptor, sresType); } /* 5.20.2 Value Mapping for ~use~ */ struct UseLocalInfo{ bool Xfinished, Yfinished, funfinished; // whether we have finished Word X, Y, fun; // pointers to the argument nodes Word XVal, YVal, funVal; // the last arg values int argConfDescriptor; // type of argument configuration }; // (stream X) (map X Y) -> (stream Y) int Use_SN( Word* args, Word& result, int message, Word& local, Supplier s ) { UseLocalInfo *sli; Word instream = args[0], fun = args[1]; Word funResult, argValue; ArgVectorPointer funArgs; switch (message) { case OPEN : #ifdef GSA_DEBUG cout << "Use_SN received OPEN" << endl; #endif sli = new UseLocalInfo; sli->Xfinished = true; qp->Open(instream.addr); sli->Xfinished = false; local.setAddr(sli); #ifdef GSA_DEBUG cout << "Use_SN finished OPEN" << endl; #endif return 0; case REQUEST : // For each REQUEST, we get one value from the stream, // pass it to the parameter function and evalute the latter. // The result is simply passed on. #ifdef GSA_DEBUG cout << "Use_SN received REQUEST" << endl; #endif if( local.addr == 0 ) { #ifdef GSA_DEBUG cout << "Use_SN finished REQUEST: CANCEL (1)" << endl; #endif return CANCEL; } sli = (UseLocalInfo*)local.addr; if (sli->Xfinished) { #ifdef GSA_DEBUG cout << "Use_SN finished REQUEST: CANCEL (2)" << endl; #endif return CANCEL; } funResult.addr = 0; argValue.addr = 0; qp->Request(instream.addr, argValue); // get one arg value from stream if(qp->Received(instream.addr)) { funArgs = qp->Argument(fun.addr); // set argument for the (*funArgs)[0] = argValue; // parameter function qp->Request(fun.addr, funResult); // call parameter function // copy result: result.setAddr(((Attribute*) (funResult.addr))->Clone()); ((Attribute*) (argValue.addr))->DeleteIfAllowed(); // delete argument #ifdef GSA_DEBUG cout << " result.addr =" << result.addr << endl; #endif argValue.addr = 0; #ifdef GSA_DEBUG cout << "Use_SN finished REQUEST: YIELD" << endl; #endif return YIELD; } else // (input stream consumed completely) { qp->Close(instream.addr); sli->Xfinished = true; result.addr = 0; #ifdef GSA_DEBUG cout << "Use_SN finished REQUEST: CANCEL (3)" << endl; #endif return CANCEL; } case CLOSE : #ifdef GSA_DEBUG cout << "Use_SN received CLOSE" << endl; #endif if( local.addr != 0 ) { sli = (UseLocalInfo*)local.addr; if ( !sli->Xfinished ) qp->Close( instream.addr ); delete sli; local.setAddr(0); } #ifdef GSA_DEBUG cout << "Use_SN finished CLOSE" << endl; #endif return 0; } // end switch cout << "Use_SN received UNKNOWN COMMAND" << endl; return -1; // should not be reached } // (stream X) (map X (stream Y)) -> (stream Y) int Use_SS( Word* args, Word& result, int message, Word& local, Supplier s ) { UseLocalInfo *sli; Word funResult; ArgVectorPointer funargs; switch (message) { case OPEN : sli = new UseLocalInfo; sli->X.setAddr( args[0].addr ); sli->fun.setAddr( args[1].addr ); sli->Xfinished = true; sli->funfinished = true; sli->XVal.addr = 0; // open the ("outer") input stream and qp->Open( sli->X.addr ); sli->Xfinished = false; // save the local information local.setAddr(sli); return 0; case REQUEST : // For each value from the 'outer' stream, an 'inner' stream // of values is generated by the parameter function. // For each REQUEST, we pass one value from the 'inner' stream // as the result value. // If the inner stream is consumed, we try to get a new value // from the 'outer' stream and re-open the inner stream #ifdef GSA_DEBUG cout << "\nUse_SS: Received REQUEST"; #endif //1. recover local information if( local.addr == 0 ) return CANCEL; sli = (UseLocalInfo*)local.addr; // create the next result while( !sli->Xfinished ) { if( sli->funfinished ) {// end of map result stream reached -> get next X qp->Request( sli->X.addr, sli->XVal); if (!qp->Received( sli->X.addr )) { // Stream X is exhaused qp->Close( sli->X.addr ); sli->Xfinished = true; result.addr = 0; return CANCEL; } // got an X-elem funargs = qp->Argument( sli->fun.addr ); (*funargs)[0] = sli->XVal; qp->Open( sli->fun.addr ); sli->funfinished = false; } // Now, we have an open map result stream qp->Request( sli->fun.addr, funResult ); if(qp->Received( sli->fun.addr )) { // cloning and passing the result result.setAddr(((Attribute*) (funResult.addr))->Clone()); ((Attribute*) (funResult.addr))->DeleteIfAllowed(); #ifdef GSA_DEBUG cout << " result.addr=" << result.addr << endl; #endif return YIELD; } else { // end of map result stream reached qp->Close( sli->fun.addr ); sli->funfinished = true; ((Attribute*) (sli->XVal.addr))->DeleteIfAllowed(); } } // end while case CLOSE : if( local.addr != 0 ) { sli = (UseLocalInfo*)local.addr; if( !sli->funfinished ) { qp->Close( sli->fun.addr ); ((Attribute*)(sli->X.addr))->DeleteIfAllowed(); } if ( !sli->Xfinished ) qp->Close( sli->X.addr ); delete sli; local.setAddr(0); } return 0; } // end switch cout << "\nUse_SS received UNKNOWN COMMAND" << endl; return -1; // should not be reached } // case stream(Tuple) x (tuple -> DATA) int Use_TsN( Word* args, Word& result, int message, Word& local, Supplier s ) { switch(message){ case OPEN: { qp->Open(args[0].addr); return 0; } case REQUEST: { Word tuple; qp->Request(args[0].addr, tuple); if(qp->Received(args[0].addr)){ ArgVectorPointer funarg = qp->Argument(args[1].addr); (*funarg)[0] = tuple; Word funRes; qp->Request(args[1].addr, funRes); Attribute* res = (Attribute*) funRes.addr; result.addr = res->Clone(); ((Tuple*) tuple.addr)->DeleteIfAllowed(); return YIELD; } else { return CANCEL; } } case CLOSE: { qp->Close(args[0].addr); return 0; } default: assert(false); // unknonwn message } return -1; } struct UseTsSLocal{ UseTsSLocal(): funOpened(false), currentTuple(0){} bool funOpened; Tuple* currentTuple; }; // case stream(Tuple) x (tuple -> stream(DATA) ) int Use_TsS( Word* args, Word& result, int message, Word& local, Supplier s ) { UseTsSLocal* li = (UseTsSLocal*) local.addr; switch (message){ case OPEN: { if(li){ if(li->funOpened){ qp->Close(args[1].addr); li->funOpened = false; } if(li->currentTuple){ li->currentTuple->DeleteIfAllowed(); li->currentTuple = 0; } } else { local.addr = new UseTsSLocal(); } qp->Open(args[0].addr); return 0; } case REQUEST: { if(!li){ return CANCEL; } result.addr = 0; while(!result.addr){ if(!li->funOpened){ // next elem from input stream required Word t; qp->Request(args[0].addr,t); if(!qp->Received(args[0].addr)){ return CANCEL; } if(li->currentTuple){ li->currentTuple->DeleteIfAllowed(); } li->currentTuple = (Tuple*) t.addr; ArgVectorPointer funargs = qp->Argument(args[1].addr); (*funargs)[0] = t; qp->Open(args[1].addr); li->funOpened = true; } // evaluate function Word funRes; qp->Request(args[1].addr, funRes); if(qp->Received(args[1].addr)){ result.addr = funRes.addr; } else { qp->Close(args[1].addr); li->funOpened = false; } } return YIELD; } case CLOSE: { if(li){ if(li->funOpened){ qp->Close(args[1].addr); } if(li->currentTuple){ li->currentTuple->DeleteIfAllowed(); } delete li; local.addr = 0; } return 0; } default : return -1; } } // (stream X) Y (map X Y Z) -> (stream Z) // X (stream Y) (map X y Z) -> (stream Z) int Use_SNN( Word* args, Word& result, int message, Word& local, Supplier s ) { UseLocalInfo *sli; Word xval, funresult; ArgVectorPointer funargs; switch (message) { case OPEN : #ifdef GSA_DEBUG cout << "\nUse_SNN received OPEN" << endl; #endif sli = new UseLocalInfo ; sli->Xfinished = true; sli->X.addr = 0; sli->Y.addr = 0; sli->fun.setAddr(args[2].addr); // get argument configuration info sli->argConfDescriptor = ((CcInt*)args[3].addr)->GetIntval(); if(sli->argConfDescriptor & 4) { delete( sli ); local.addr = 0; #ifdef GSA_DEBUG cout << "\nUse_SNN was called with stream result mapping!" << endl; #endif return 0; } if(sli->argConfDescriptor & 1) { // the first arg is the stream sli->X.setAddr(args[0].addr); // X is the stream sli->Y.setAddr(args[1].addr); // Y is the constant value } else { // the second arg is the stream sli->X.setAddr(args[1].addr); // X is the stream sli->Y.setAddr(args[0].addr); // Y is the constant value } qp->Open(sli->X.addr); // open outer stream argument sli->Xfinished = false; local.setAddr(sli); #ifdef GSA_DEBUG cout << "Use_SNN finished OPEN" << endl; #endif return 0; case REQUEST : // For each REQUEST, we get one value from the stream, // pass it (and the remaining constant argument) to the parameter // function and evalute the latter. The result is simply passed on. // sli->X is the stream, sli->Y the constant argument. #ifdef GSA_DEBUG cout << "Use_SNN received REQUEST" << endl; #endif // 1. get local data object if (local.addr == 0) { result.addr = 0; #ifdef GSA_DEBUG cout << "Use_SNN finished REQUEST: CANCEL (1)" << endl; #endif return CANCEL; } sli = (UseLocalInfo*) local.addr; if (sli->Xfinished) { // stream already exhausted earlier result.addr = 0; #ifdef GSA_DEBUG cout << "Use_SNN finished REQUEST: CANCEL (2)" << endl; #endif return CANCEL; } // 2. request value from outer stream qp->Request( sli->X.addr, xval ); if(!qp->Received( sli->X.addr )) { // stream exhausted now qp->Close( sli->X.addr ); sli->Xfinished = true; #ifdef GSA_DEBUG cout << "Use_SNN finished REQUEST: CANCEL (3)" << endl; #endif return CANCEL; } // 3. call parameter function, delete args and return result funargs = qp->Argument( sli->fun.addr ); if (sli->argConfDescriptor & 1) { (*funargs)[0] = xval; (*funargs)[1] = sli->Y; } else { (*funargs)[0] = sli->Y; (*funargs)[1] = xval; } qp->Request( sli->fun.addr, funresult ); result.setAddr(((Attribute*) (funresult.addr))->Clone()); #ifdef GSA_DEBUG cout << " result.addr=" << result.addr << endl; #endif ((Attribute*) (xval.addr))->DeleteIfAllowed(); #ifdef GSA_DEBUG cout << "Use_SNN finished REQUEST: YIELD" << endl; #endif return YIELD; case CLOSE : #ifdef GSA_DEBUG cout << "Use_SNN received CLOSE" << endl; #endif if( local.addr != 0 ) { sli = (UseLocalInfo*)local.addr; if (!sli->Xfinished) qp->Close( sli->X.addr ); // close input delete sli; local.setAddr(0); } #ifdef GSA_DEBUG cout << "Use_SNN finished CLOSE" << endl; #endif return 0; } // end switch cout << "\nUse_SNN received UNKNOWN COMMAND" << endl; return -1; // should not be reached } // (stream X) Y (map X Y (stream Z)) -> (stream Z) // X (stream Y) (map X y (stream Z)) -> (stream Z) int Use_SNS( Word* args, Word& result, int message, Word& local, Supplier s ) { UseLocalInfo *sli; Word funresult; ArgVectorPointer funargs; switch (message) { case OPEN : #ifdef GSA_DEBUG cout << "\nUse_SNS received OPEN" << endl; #endif sli = new UseLocalInfo ; sli->Xfinished = true; sli->funfinished = true; sli->X.addr = 0; sli->Y.addr = 0; sli->fun.addr = 0; sli->XVal.addr = 0; sli->YVal.addr = 0; // get argument configuration info sli->argConfDescriptor = ((CcInt*)args[3].addr)->GetIntval(); if(! (sli->argConfDescriptor & 4)) { delete( sli ); local.addr = 0; cout << "\nUse_SNS was called with non-stream result mapping!" << endl; return 0; } if(sli->argConfDescriptor & 1) { // the first arg is the stream sli->X.setAddr(args[0].addr); // X is the stream sli->Y.setAddr(args[1].addr); // Y is the constant value } else { // the second arg is the stream sli->X.setAddr(args[1].addr); // X is the stream sli->Y.setAddr(args[0].addr); // Y is the constant value } sli->YVal = sli->Y; // save value of constant argument qp->Open(sli->X.addr); // open the ("outer") input stream sli->Xfinished = false; sli->fun.setAddr(args[2].addr); local.setAddr(sli); #ifdef GSA_DEBUG cout << "Use_SNN finished OPEN" << endl; #endif return 0; case REQUEST : // First, we check whether an inner stream is finished // (sli->funfinished). If so, we try to get a value from // the outer stream and try to re-open the inner stream. // sli->X is a pointer to the OUTER stream, // sli->Y is a pointer to the constant argument. #ifdef GSA_DEBUG cout << "Use_SNN received REQUEST" << endl; #endif // 1. get local data object if (local.addr == 0) { result.addr = 0; #ifdef GSA_DEBUG cout << "Use_SNN finished REQUEST: CANCEL (1)" << endl; #endif return CANCEL; } sli = (UseLocalInfo*) local.addr; // 2. request values from inner stream while (!sli->Xfinished) { while (sli->funfinished) { // the inner stream is closed, try to (re-)open it // try to get the next X-value from outer stream qp->Request(sli->X.addr, sli->XVal); if (!qp->Received(sli->X.addr)) { // stream X exhaused. CANCEL sli->Xfinished = true; qp->Close(sli->X.addr); #ifdef GSA_DEBUG cout << "Use_SNN finished REQUEST: CANCEL (3)" << endl; #endif return CANCEL; } funargs = qp->Argument( sli->fun.addr ); if (sli->argConfDescriptor & 1) { (*funargs)[0] = sli->XVal; (*funargs)[1] = sli->YVal; } else { (*funargs)[0] = sli->YVal; (*funargs)[1] = sli->XVal; } qp->Open( sli->fun.addr ); sli->funfinished = false; } // end while - Now, the inner stream is open again qp->Request(sli->fun.addr, funresult); if (qp->Received(sli->fun.addr)) { // inner stream returned a result result.setAddr(((Attribute*) (funresult.addr))->Clone()); ((Attribute*) (funresult.addr))->DeleteIfAllowed(); #ifdef GSA_DEBUG cout << " result.addr=" << result.addr << endl; cout << "Use_SNN finished REQUEST: YIELD" << endl; #endif return YIELD; } else{ // inner stream exhausted qp->Close(sli->fun.addr); sli->funfinished = true; ((Attribute*)(sli->XVal.addr))->DeleteIfAllowed(); sli->XVal.addr = 0; } } // end while result.addr = 0; #ifdef GSA_DEBUG cout << "Use_SNN finished REQUEST: CANCEL (4)" << endl; #endif return CANCEL; case CLOSE : #ifdef GSA_DEBUG cout << "Use_SNN received CLOSE" << endl; #endif if( local.addr != 0 ) { sli = (UseLocalInfo*)local.addr; if (!sli->funfinished) qp->Close( sli->fun.addr ); // close map result stream if (!sli->Xfinished) qp->Close( sli->X.addr ); // close outer stream delete sli; local.setAddr(Address(0)); } #ifdef GSA_DEBUG cout << "Use_SNN finished CLOSE" << endl; #endif return 0; } // end switch cout << "Use_SNN received UNKNOWN COMMAND" << endl; return -1; // should not be reached } // (stream X) (stream Y) (map X Y Z) -> (stream Z) int Use_SSN( Word* args, Word& result, int message, Word& local, Supplier s ) { UseLocalInfo *sli; Word funresult; ArgVectorPointer funargs; switch (message) { case OPEN : #ifdef GSA_DEBUG cout << "\nUse_SSN received OPEN" << endl; #endif sli = new UseLocalInfo ; sli->Xfinished = true; sli->Yfinished = true; // get argument configuration info sli->argConfDescriptor = ((CcInt*)args[3].addr)->GetIntval(); if(sli->argConfDescriptor & 4) { delete( sli ); local.addr = 0; cout << "\nUse_SSN was called with stream result mapping!" << endl; return 0; } if(!(sli->argConfDescriptor & 3)) { delete( sli ); local.addr = 0; cout << "\nUse_SSN was called with non-stream arguments!" << endl; return 0; } sli->X.setAddr(args[0].addr); // X is the stream sli->Y.setAddr(args[1].addr); // Y is the constant value sli->fun.setAddr(args[2].addr); // fun is the mapping function qp->Open(sli->X.addr); // open outer stream argument sli->Xfinished = false; local.setAddr(sli); #ifdef GSA_DEBUG cout << "Use_SSN finished OPEN" << endl; #endif return 0; case REQUEST : // We do a nested loop to join the elements of the outer (sli->X) // and inner (sli->Y) stream. For each pairing, we evaluate the // parameter function (sli->fun), which return a single result. // A clone of the result is passed as the result. // We also need to delete each element, when it is not required // anymore. #ifdef GSA_DEBUG cout << "Use_SSN received REQUEST" << endl; #endif // get local data object if (local.addr == 0) { result.addr = 0; #ifdef GSA_DEBUG cout << "Use_SSN finished REQUEST: CANCEL (1)" << endl; #endif return CANCEL; } sli = (UseLocalInfo*) local.addr; while(!sli->Xfinished) { if (sli->Yfinished) { // try to (re-) start outer instream qp->Request(sli->X.addr, sli->XVal); if (!qp->Received(sli->X.addr)) { // outer instream exhaused qp->Close(sli->X.addr); sli->Xfinished = true; result.addr = 0; #ifdef GSA_DEBUG cout << "Use_SSN finished REQUEST: CANCEL (2)" << endl; #endif return CANCEL; } // Got next X-elem. (Re-)Start inner instream: qp->Open(sli->Y.addr); sli->Yfinished = false; } // Now, we have open inner and outer streams qp->Request(sli->Y.addr, sli->YVal); if (!qp->Received(sli->Y.addr)) { // inner stream is exhausted qp->Close(sli->Y.addr); // Delete current X-elem: ((Attribute*) (sli->XVal.addr))->DeleteIfAllowed(); sli->Yfinished = true; } // got next Y-elem if (!sli->Xfinished && !sli->Yfinished) { // pass parameters and call mapping, clone result funargs = qp->Argument( sli->fun.addr ); (*funargs)[0] = sli->XVal; (*funargs)[1] = sli->YVal; qp->Request( sli->fun.addr, funresult ); result.setAddr(((Attribute*) (funresult.addr))->Clone()); ((Attribute*) (sli->YVal.addr))->DeleteIfAllowed(); #ifdef GSA_DEBUG cout << "Use_SSN finished REQUEST: YIELD" << endl; #endif return YIELD; } } // end while #ifdef GSA_DEBUG cout << "Use_SSN finished REQUEST: CANCEL (3)" << endl; #endif return CANCEL; case CLOSE : #ifdef GSA_DEBUG cout << "Use_SSN received CLOSE" << endl; #endif if( local.addr != 0 ) { sli = (UseLocalInfo*)local.addr; if (!sli->Yfinished) { qp->Close( sli->Y.addr ); // close inner instream // Delete current X-elem: ((Attribute*) (sli->XVal.addr))->DeleteIfAllowed(); } if (!sli->Xfinished) qp->Close( sli->X.addr ); // close outer instream delete sli; local.setAddr(0); } result.addr = 0; #ifdef GSA_DEBUG cout << "Use_SSN finished CLOSE" << endl; #endif return 0; } // end switch result.addr = 0; cout << "\nUse_SSN received UNKNOWN COMMAND" << endl; return -1; // should not be reached } // (stream X) (stream Y) (map X Y (stream Z)) -> (stream Z) int Use_SSS( Word* args, Word& result, int message, Word& local, Supplier s ) { UseLocalInfo *sli; Word funresult; ArgVectorPointer funargs; switch (message) { case OPEN : #ifdef GSA_DEBUG cout << "\nUse_SSS received OPEN" << endl; #endif sli = new UseLocalInfo ; sli->Xfinished = true; sli->Yfinished = true; sli->funfinished = true; // get argument configuration info sli->argConfDescriptor = ((CcInt*)args[3].addr)->GetIntval(); if(!(sli->argConfDescriptor & 4) ) { delete( sli ); local.addr = 0; cout << "\nUse_SSS was called with non-stream result mapping!" << endl; return 0; } if(!(sli->argConfDescriptor & 3)) { delete( sli ); local.addr = 0; cout << "\nUse_SSS was called with non-stream arguments!" << endl; return 0; } sli->X = args[0]; // X is the stream sli->Y = args[1]; // Y is the constant value sli->fun = args[2]; // fun is the mapping function qp->Open(sli->X.addr); // open X stream argument sli->Xfinished = false; local.setAddr(sli); #ifdef GSA_DEBUG cout << "Use_SSS finished OPEN" << endl; #endif return 0; case REQUEST : // We do a nested loop to join the elements of the outer (sli->X) // and inner (sli->Y) stream. For each pairing, we open the // parameter function (sli->fun), which returns a stream result. // We consume this map result stream one-by-one. // When it is finally consumed, we try to restart it with the next // X/Y value pair. // A clone of the result is passed as the result. // We also need to delete each X/Y element, when it is not required // any more. #ifdef GSA_DEBUG cout << "Use_SSS received REQUEST" << endl; #endif // get local data object if (local.addr == 0) { result.addr = 0; #ifdef GSA_DEBUG cout << "Use_SSS finished REQUEST: CANCEL (1)" << endl; #endif return CANCEL; } sli = (UseLocalInfo*) local.addr; while(!sli->Xfinished) { if (sli->Yfinished) { // get next X-value from outer instream // and restart inner (Y-) instream qp->Request(sli->X.addr, sli->XVal); if (!qp->Received(sli->X.addr)) { // X-instream exhaused qp->Close(sli->X.addr); sli->Xfinished = true; #ifdef GSA_DEBUG cout << "Use_SSS finished REQUEST: CANCEL (2)" << endl; #endif result.addr = 0; return CANCEL; } // Got next X-elem. (Re-)Start inner Y-instream: qp->Open(sli->Y.addr); sli->Yfinished = false; } // Now, we have open X- and Y- streams if (sli->funfinished) { // get next Y-value from inner instream // and open new map result stream qp->Request(sli->Y.addr, sli->YVal); if (!qp->Received(sli->Y.addr)) { qp->Close(sli->Y.addr); ((Attribute*) (sli->XVal.addr))->DeleteIfAllowed(); sli->Yfinished = true; } else { funargs = qp->Argument( sli->fun.addr ); (*funargs)[0] = sli->XVal; (*funargs)[1] = sli->YVal; qp->Open( sli->fun.addr ); sli->funfinished = false; } } // Now, we have an open map result streams if (!sli->Xfinished && !sli->Yfinished && !sli->funfinished) { // pass parameters and call mapping, clone result funargs = qp->Argument( sli->fun.addr ); (*funargs)[0] = sli->XVal; (*funargs)[1] = sli->YVal; qp->Request( sli->fun.addr, funresult ); if ( qp->Received(sli->fun.addr) ) { // got a value from map result stream result.setAddr(((Attribute*)(funresult.addr))->Clone()); ((Attribute*) (funresult.addr))->DeleteIfAllowed(); #ifdef GSA_DEBUG cout << "Use_SSS finished REQUEST: YIELD" << endl; #endif return YIELD; } else { // map result stream exhausted qp->Close( sli->fun.addr) ; ((Attribute*) (sli->YVal.addr))->DeleteIfAllowed(); sli->funfinished = true; } // try to restart with new X/Y pairing } } // end while result.addr = 0; #ifdef GSA_DEBUG cout << "Use_SSS finished REQUEST: CANCEL (3)" << endl; #endif return CANCEL; case CLOSE : #ifdef GSA_DEBUG cout << "Use_SSS received CLOSE" << endl; #endif if( local.addr != 0 ) { sli = (UseLocalInfo*)local.addr; if (!sli->funfinished) { qp->Close( sli->fun.addr ); // close map result stream // Delete current Y-elem: ((Attribute*) (sli->YVal.addr))->DeleteIfAllowed(); } if (!sli->Yfinished) { qp->Close( sli->Y.addr ); // close inner instream // Delete current X-elem: ((Attribute*) (sli->XVal.addr))->DeleteIfAllowed(); } if (!sli->Xfinished) qp->Close( sli->X.addr ); // close outer instream delete sli; local.setAddr(0); } #ifdef GSA_DEBUG cout << "Use_SSS finished CLOSE" << endl; #endif return 0; } // end switch cout << "\nUse_SSS received UNKNOWN COMMAND" << endl; return -1; // should not be reached } /* 5.20.3 Specification for operator ~use~ */ const string StreamSpecUse= "( ( \"Signature\" \"Syntax\" \"Meaning\" \"Example\" ) " "( For X in kind DATA or X = tuple(Z)*, Y in kind DATA:\n" "(*: not yet implemented)\n" "((stream X) (map X Y) ) -> (stream Y) \n" "((stream X) (map X (stream Y))) -> (stream Y)" "_ use [ _ ]" "The use class of operators implements " "a set of functors, that derive stream-valued " "operators from operators taking scalar " "arguments and returning scalar values or " "streams of values." "query intstream(1,5) use[ fun(i:int) i*i ] printstream count;\n" "query intstream(1,5) use[ fun(i:int) intstream(i,5) ] printstream count;" " ) )"; const string StreamSpecUse2= "( ( \"Signature\" \"Syntax\" \"Meaning\" \"Example\" ) " "( For X in kind DATA or X = tuple(W)*, Y,Z in kind DATA:\n" "(*: not yet implemented)\n" "((stream X) Y (map X Y Z) ) -> (stream Z) \n" "((stream X) Y (map X Y stream(Z)) ) -> (stream Z) \n" "(X (stream Y) (map X Y Z) ) -> (stream Z) \n" "(X (stream Y) (map X Y (stream Z))) -> (stream Z) \n" "((stream X) (stream Y) (map X Y Z) ) -> (stream Z) \n" "((stream X) (stream Y) (map X Y (stream Z))) -> (stream Z)" "_ _ use2 [ _ ]" "The use2 class of operators implements " "a set of functors, that derive stream-valued " "operators from operators taking scalar " "arguments and returning scalar values or " "streams of values. use2 performs a product " "between the two first of its arguments, passing each " "combination to the mapped function once." "query intstream(1,5) [const int value 5] use2[ fun(i:int, j:int) " "intstream(i,j) ] printstream count;\n" "query [const int value 3] intstream(1,5) use2[ fun(i:int, j:int) i+j ] " "printstream count;\n" "query intstream(1,5) [const int value 3] use2[ fun(i:int, j:int) i+j ] " "printstream count;\n" "query [const int value 2] intstream(1,5) use2[ fun(i:int, j:int) " "intstream(i,j) ] printstream count;\n" "query [const int value 3] intstream(1,5) use2[ fun(i:int, j:int) " "intstream(i,j) ] printstream count;\n" "query intstream(1,2) intstream(1,3) use2[ fun(i:int, j:int) " "intstream(i,j) ] printstream count; ) )"; /* 5.20.4 Selection Function of operator ~use~ */ ValueMapping streamusemap[] = { Use_SN, Use_SS, Use_TsN, Use_TsS }; int streamUseSelect( ListExpr args ) { ListExpr stream = nl->First(args); ListExpr funRes = nl->Third(nl->Second(args)); bool ts = !Stream::checkType(stream); bool streamRes = listutils::isStream(funRes); if(!ts && !streamRes) return 0; // SN if(!ts && streamRes) return 1; // SS if(ts && !streamRes) return 2; // TsN if(ts && streamRes) return 3; // TsS return -1; } ValueMapping streamuse2map[] = { Use_SNN, Use_SNS, Use_SSN, Use_SSS // , // Use_TsNN, // Use_TsNS, // Use_TsTsN, // Use_TsTsS }; int streamUse2Select( ListExpr args ) { ListExpr X = nl->First(args), Y = nl->Second(args), M = nl->Third(args); bool xIsStream = false, yIsStream = false, resIsStream = false; bool xIsTuple = false, yIsTuple = false; int index = 0; // examine first arg // check type of sarg1 if( Tuple::checkType(X) ) { xIsTuple = true; xIsStream = false; } else if( Stream::checkType(X)) { xIsStream = true; if(Tuple::checkType(nl->Second(X))){ xIsTuple = true; } else { xIsTuple = false; } } else { xIsTuple = false; xIsStream = false; } if( Tuple::checkType(Y) ) { yIsTuple = true; yIsStream = false; } else if( Stream::checkType(Y)) { yIsStream = true; if(Tuple::checkType(nl->Second(Y))){ yIsTuple = true; } else { yIsTuple = false; } } else { yIsTuple = false; yIsStream = false; } // examine mapping result type ListExpr mres = nl->Fourth(M); if(Stream::checkType(mres)){ resIsStream = true; } else { resIsStream = false; } // calculate appropriate index value // tuple variants offest : +4 // both args streams : +2 // mapping result is stream : +1 index = 0; if ( xIsTuple || yIsTuple ) index += 4; if ( xIsStream && yIsStream ) index += 2; if ( resIsStream ) index += 1; if (index > 3) cout << "\nWARNING: index =" << index << ">3 in streamUse2Select!" << endl; return index; } /* 5.20.5 Definition of operator ~use~ */ Operator streamuse( "use", StreamSpecUse, 4, streamusemap, streamUseSelect, TypeMapUse); Operator streamuse2( "use2", StreamSpecUse2, 4, streamuse2map, streamUse2Select, TypeMapUse2); /* 5.24 Operator ~aggregateS~ Stream aggregation operator This operator applies an aggregation function (which must be binary, associative and commutative) to a stream of data using a given neutral (initial) value (which is also returned if the stream is empty). If the stream contains only one single element, this element is returned as the result. The result a single value of the same kind. ---- For T in kind DATA: aggregateS: ((stream T) x (T x T --> T) x T) --> T ---- The first argument is the input stream. The second argument is the function used in the aggregation. The third value is used to initialize the mapping (for the first elem) and will also be return if the input stream is empty. 5.24.1 Type mapping function for ~aggregateS~ */ ListExpr StreamaggregateTypeMap( ListExpr args ) { string outstr1, outstr2; ListExpr TypeT; // check for correct length if (nl->ListLength(args) != 3) { return listutils::typeError("Operator aggregateS expects a " "list of length three."); } // get single arguments ListExpr instream = nl->First(args), map = nl->Second(args), zerovalue = nl->Third(args); if(!Stream::checkType(instream)){ return listutils::typeError("first element must be a stream of DATA"); } TypeT = nl->Second(instream); // check for second to be of length 4, (map T T T) // T of same type as first if ( nl->IsAtom(map) || !(nl->ListLength(map) == 4) || !( nl->IsEqual(nl->First(map), Symbol::MAP()) ) || !( nl->Equal(nl->Fourth(map), nl->Second(map)) ) || !( nl->Equal(nl->Third(map), nl->Second(map)) ) || !( nl->Equal(nl->Third(map), TypeT) ) ) { ErrorReporter::ReportError("Operator aggregateS expects a list of length " "four as second argument, having structure " "'(map T T T)', where T has the base type of " "the first argument."); return nl->SymbolAtom( Symbol::TYPEERROR() ); } // check for third to be atomic and of the same type T if ( !listutils::isDATA(zerovalue) || !nl->Equal(TypeT, zerovalue) ) { ErrorReporter::ReportError("Operator aggregateS expects a list of length" "one as third argument (neutral elem), having " "structure 'T', where T is also the type of " "the mapping's arguments and result. Also, " "T must be of kind DATA."); return nl->SymbolAtom( Symbol::TYPEERROR() ); } // return T as the result type. return TypeT; } /* 5.24.2 Value mapping function of operator ~aggregateS~ The ~aggregateS~ operator uses a stack to compute the aggregation balanced. This will have advantages in geometric aggregation. It may also help to reduce numeric errors in aggregation using double values. 5.24.2.1 ~StackEntry~ A stack entry consist of the level within the (simulated) balanced tree and the corresponding value. Note: The attributes at level 0 come directly from the input stream. We have to free them using the deleteIfAllowed function. On all other levels, the attribute is computes using the parameter function. Because this is outside of stream and tuples, no reference counting is available and we have to delete them using the usual delete function. */ struct AggrStackEntry { inline AggrStackEntry(): level(-1),value(0) { } inline AggrStackEntry( long level, Attribute* value): level( level ) { this->value = value;} inline AggrStackEntry( const AggrStackEntry& a ): level( a.level ) { this->value = a.value;} inline AggrStackEntry& operator=( const AggrStackEntry& a ) { level = a.level; value = a.value; return *this; } inline ~AggrStackEntry(){ } // use destroy !! inline void destroy(){ if(level<0){ return; } if(level==0){ // original from tuple value->DeleteIfAllowed(); } else { delete value; } value = 0; level = -1; } long level; Attribute* value; }; int Streamaggregate(Word* args, Word& result, int message, Word& local, Supplier s) { // The argument vector contains the following values: // args[0] = stream of tuples // args[1] = mapping function // args[2] = zero value Word resultWord; ArgVectorPointer vector = qp->Argument(args[1].addr); Stream stream(args[0]); stream.open(); result = qp->ResultStorage(s); // read the first tuple Attribute* attr = stream.request(); if(attr ==0){ // stream was empty, copy zero element to result ((Attribute*)result.addr)-> CopyFrom( (const Attribute*)args[2].addr ); } else { // ok, there is at least one element in the stream // nonempty stream, consume it stack theStack; while( attr!=0 ){ // put the attribute on the stack merging with existing entries // while possible int level = 0; while(!theStack.empty() && level==theStack.top().level){ // merging is possible AggrStackEntry top = theStack.top(); theStack.pop(); // call the parameter function ((*vector)[0]).setAddr(top.value); ((*vector)[1]).setAddr(attr); qp->Request(args[1].addr, resultWord); qp->ReInitResultStorage(args[1].addr); top.destroy(); // remove stack content if(level==0){ // delete attr; attr->DeleteIfAllowed(); } else { delete attr; } attr = (Attribute*) resultWord.addr; level++; } AggrStackEntry entry(level,attr); theStack.push(entry); attr = stream.request(); } // stream ends, merge stack elements regardless of their level assert(!theStack.empty()); // at least one element must be exist AggrStackEntry tmpResult = theStack.top(); theStack.pop(); while(!theStack.empty()){ AggrStackEntry top = theStack.top(); theStack.pop(); ((*vector)[0]).setAddr(top.value); ((*vector)[1]).setAddr(tmpResult.value); qp->Request(args[1].addr, resultWord); qp->ReInitResultStorage(args[1].addr); tmpResult.destroy(); // destroy temporarly result tmpResult.level = 1; // mark as computed tmpResult.value = (Attribute*) resultWord.addr; top.destroy(); } ((Attribute*)result.addr)-> CopyFrom((Attribute*)tmpResult.value); tmpResult.destroy(); } // close input stream stream.close(); return 0; } /* 5.24.3 Specification for operator ~aggregate~ */ const string StreamaggregateSpec = "( ( \"Signature\" \"Syntax\" \"Meaning\" " "\"Example\" ) " "(" "For T in kind DATA:\n" "((stream T) ((T T) -> T) T ) -> T\n" "_ aggregateS [ fun ; _ ]" "Aggregates the values from the stream (1st arg) " "using a binary associative and commutative " "aggregation function (2nd arg), " "and a 'neutral value' (3rd arg, also passed as the " "result if the stream is empty). If the stream contains" "only one single element, that element will be returned" "as the result." "query intstream(1,5) aggregateS[ " "fun(i1:int, i2:int) i1+i2 ; 0]\n" "query intstream(1,5) aggregateS[ " "fun(i1:STREAMELEM, i2:STREAMELEM) ifthenelse(i1>i2,i1,i2) ; 0]" ") )"; /* 5.24.4 Selection Function of operator ~aggregate~ */ ValueMapping streamaggregatemap[] = { Streamaggregate }; int streamaggregateSelect( ListExpr args ) { return 0; } /* 5.24.5 Definition of operator ~aggregate~ */ Operator streamaggregateS( "aggregateS", StreamaggregateSpec, 1, streamaggregatemap, streamaggregateSelect, StreamaggregateTypeMap); /* 5.27 Operator ~transformstream~ ---- transformstream: (stream T) -> stream(tuple((Elem T))) stream(tuple((Id T))) -> (stream T) for T in kind DATA, id some arbitrary identifier ---- Operator ~transformstream~ transforms a (stream DATA) into a (stream(tuple((element DATA)))) and vice versa. ~element~ is the name for the attribute created. The result of the first variant can e.g. be consumed to form a relation or be processed using ordinary tuplestream operators. */ /* 5.27.1 Type mapping function for ~transformstream~ ---- stream(DATA) --> stream(tuple((elem DATA))) stream(tuple((attrname DATA))) --> stream(DATA) ---- */ ListExpr StreamTransformstreamTypeMap(ListExpr args) { if(!nl->HasLength(args,1)){ return listutils::typeError("one argument expected"); } ListExpr arg = nl->First(args); // variant 1: stream -> stream if(Stream::checkType(arg)){ ListExpr res = nl->TwoElemList( nl->SymbolAtom(Stream::BasicType()), nl->TwoElemList( nl->SymbolAtom(Tuple::BasicType()) , nl->OneElemList( nl->TwoElemList( nl->SymbolAtom("Elem"), nl->Second(arg))))); return res; } // variant 2: stream(tuple( a: b)) -> stream(b) if(Stream::checkType(arg)){ ListExpr attrList = nl->Second(nl->Second(arg)); if(!nl->HasLength(attrList,1)){ return listutils::typeError("Only one attribute within the " "tuple allowed"); } ListExpr res = nl->TwoElemList( nl->SymbolAtom(Stream::BasicType()), nl->Second(nl->First(attrList))); return res; } return listutils::typeError("stream(DATA) or stream(tuple([a : X]))" " expected"); } /* 5.27.3 Type Mapping for the ~namedtransformstream~ operator This operator works as the transformstream operator. It takes a stream of elements with kind data and produces a tuple stream, from it. So, the value mapping is Transformstream[_]S[_]TS which is alos used by the transformstream operator. The only difference is, additional to the stream argument, this operator receives also a name for the attribute instead using the defaul name 'elem'. ---- stream(DATA) x ident --> stream(tuple((ident DATA))) ---- */ ListExpr NamedtransformstreamTypemap(ListExpr args){ if(nl->ListLength(args)!=2){ return listutils::typeError("two arguments required"); } ListExpr stream = nl->First(args); if(!Stream::checkType(stream)){ return listutils::typeError("First argument must be a stream(DATA)."); } ListExpr nameList = nl->Second(args); if(!listutils::isSymbol(nameList)){ return listutils::typeError("Second argument muts be an attribute name"); } string name = nl->SymbolValue(nameList); string symcheckmsg = ""; if(!SecondoSystem::GetCatalog()->IsValidIdentifier(name,symcheckmsg)){ return listutils::typeError("Symbol unusable: "+symcheckmsg+"."); } char f = name[0]; if(f<'A' || f>'Z'){ return listutils::typeError("An attribute name has to " "start with an upper case"); } return nl->TwoElemList(nl->SymbolAtom(Stream::BasicType()), nl->TwoElemList( nl->SymbolAtom(Tuple::BasicType()), nl->OneElemList( nl->TwoElemList( nl->SymbolAtom(name), nl->Second(stream) ) ))); } /* 5.27.2 Value mapping for operator ~transformstream~ */ template struct TransformstreamLocalInfo { TransformstreamLocalInfo( Word arg) : finished(false), resultTupleType(0), progressinitialized(false), stream(arg) {} ~TransformstreamLocalInfo() { if(resultTupleType) { resultTupleType->DeleteIfAllowed(); } } bool finished; TupleType* resultTupleType; bool progressinitialized; Stream stream; }; // The first variant creates a tuplestream from a stream: int Transformstream_S_TS(Word* args, Word& result, int message, Word& local, Supplier s) { TransformstreamLocalInfo *sli; Word value; Tuple *newTuple; switch ( message ) { case OPEN:{ sli = (TransformstreamLocalInfo*) local.addr; if(sli){ delete sli; local.addr = 0; } sli = new TransformstreamLocalInfo(args[0]); local.setAddr(sli); ListExpr resultType = GetTupleResultType( s ); sli->resultTupleType = new TupleType( nl->Second( resultType ) ); sli->finished = false; sli->progressinitialized = false; sli->stream.open(); return 0; } case REQUEST:{ if (local.addr == 0) return CANCEL; sli = (TransformstreamLocalInfo*) (local.addr); if (sli->finished){ return CANCEL; } Attribute* attr = sli->stream.request(); if (attr==0) { // input stream consumed sli->stream.close(); sli->finished = true; result.addr = 0; return CANCEL; } // create tuple, copy and pass result, delete value newTuple = new Tuple( sli->resultTupleType ); newTuple->PutAttribute( 0, attr ); result.setAddr(newTuple); return YIELD; } case CLOSE:{ if (local.addr != 0) { sli = (TransformstreamLocalInfo*) (local.addr); if (!sli->finished){ sli->stream.close(); sli->finished = true; } } return 0; } case CLOSEPROGRESS:{ if (local.addr != 0) { sli = (TransformstreamLocalInfo*) (local.addr); if(!sli->finished){ sli->stream.close(); } delete sli; local.setAddr(0); } return 0; } case REQUESTPROGRESS:{ sli = (TransformstreamLocalInfo*) (local.addr); if(!sli){ return CANCEL; } ProgressInfo p1; ProgressInfo* pRes = (ProgressInfo*) result.addr; if( !sli->stream.requestProgress( &p1) ){ return CANCEL; }; const double uProject = 0.00073; //millisecs per tuple const double vProject = 0.0004; //millisecs per tuple and attribute pRes->Copy(p1); pRes->Time = p1.Time + pRes->Card * (uProject + vProject); pRes->Progress = p1.Progress; //a number between 0 and 1 if( !sli->progressinitialized || p1.sizesChanged ) { pRes->sizesChanged = true; sli->progressinitialized = true; } else { pRes->sizesChanged = false; } return YIELD; } } // switch cout << "Transformstream_S_TS: UNKNOWN MESSAGE!" << endl; return 0; } // The second variant creates a stream from a tuplestream: int Transformstream_TS_S(Word* args, Word& result, int message, Word& local, Supplier s) { TransformstreamLocalInfo *sli; switch ( message ){ case OPEN:{ sli = (TransformstreamLocalInfo*) local.addr; if(sli){ delete sli; } sli = new TransformstreamLocalInfo(args[0]); sli->finished = false; sli->progressinitialized = false; sli->stream.open(); local.setAddr(sli); return 0; } case REQUEST:{ if (local.addr == 0) { return CANCEL; } sli = (TransformstreamLocalInfo*) (local.addr); if (sli->finished) { return CANCEL; } Tuple* tuple = sli->stream.request(); if (tuple==0) { // input stream consumed sli->stream.close(); sli->finished = true; result.addr = 0; return CANCEL; } // extract, copy and pass value, delete tuple result.addr = tuple->GetAttribute(0)->Copy(); tuple->DeleteIfAllowed(); return YIELD; } case CLOSE:{ if (local.addr != 0){ sli = (TransformstreamLocalInfo*) (local.addr); if (!sli->finished){ sli->stream.close(); sli->finished = true; // disposal of localinfo done in CLOSEPROGRESS } } return 0; } case CLOSEPROGRESS:{ if (local.addr != 0) { sli = (TransformstreamLocalInfo*) (local.addr); if (!sli->finished){ sli->stream.close(); } delete sli; local.setAddr(0); } return 0; } case REQUESTPROGRESS:{ sli = (TransformstreamLocalInfo*) (local.addr); if( !sli ){ return CANCEL; } ProgressInfo p1; ProgressInfo* pRes = (ProgressInfo*) result.addr; if( !sli->stream.requestProgress( &p1) ){ return CANCEL; }; const double uProject = 0.00073; //millisecs per tuple const double vProject = 0.0004; //millisecs per tuple and attribute pRes->Copy(p1); pRes->Time = p1.Time + pRes->Card * (uProject + vProject); pRes->Progress = p1.Progress; //a number between 0 and 1 if( !sli->progressinitialized || p1.sizesChanged ) { pRes->sizesChanged = true; sli->progressinitialized = true; } else { pRes->sizesChanged = false; } return YIELD; } } cout << __PRETTY_FUNCTION__ <<": UNKNOWN MESSAGE!" << endl; return -1; } /* 5.27.3 Specification for operator ~transformstream~ */ const string StreamTransformstreamSpec = "( ( \"Signature\" \"Syntax\" \"Meaning\" " "\"Example\" ) " "(" "For T in kind DATA:\n" "(stream T) -> stream(tuple((elem T)))\n" "stream(tuple(attrname T)) -> (stream T)" "_ transformstream" "Transforms a 'stream T' into a tuplestream " "with a single attribute 'elem' containing the " "values coming from the input stream and vice " "versa. The identifier 'elem' is fixed, the " "attribute name 'attrname' may be arbitrary " "chosen, but the tuplestream's tupletype may " "have only a single attribute." "query intstream(1,5) transformstream consume\n " "query ten feed transformstream printstream count" ") )"; const string NamedtransformstreamSpec = "( ( \"Signature\" \"Syntax\" \"Meaning\" " "\"Example\" ) " "(" "stream(t) x name -> stream (tuple(t name)))" " _ namedtransformstream [ _ ] " " Converts a stream to a tuple stream with" " given attribute name " "query intsteam(0,100)" " namedtransformstream [Number] consume" ") )"; /* 5.27.4 Selection Function of operator ~transformstream~ */ ValueMapping streamtransformstreammap[] = { Transformstream_S_TS, Transformstream_TS_S }; int streamTransformstreamSelect( ListExpr args ) { ListExpr first = nl->First( args); if(Stream::checkType(first)){ return 1; } else { return 0; } } /* 5.27.5 Definition of operator ~transformstream~ */ Operator streamtransformstream( "transformstream", StreamTransformstreamSpec, 2, streamtransformstreammap, streamTransformstreamSelect, StreamTransformstreamTypeMap); /* 5.28 Operator ~projecttransformstream~ 5.28.1 Type Mapping ---- stream(tuple((a1 t1) (a2 t2)...(an tn))) x ai --> stream(ti) ---- */ ListExpr ProjecttransformstreamTM(ListExpr args){ if(!nl->HasLength(args,2)){ return listutils::typeError("Two arguments expected"); } ListExpr stream = nl->First(args); if(!Stream::checkType(stream)){ return listutils::typeError("first argument must be a tuple stream"); } ListExpr nameList = nl->Second(args); if(!listutils::isSymbol(nameList)){ return listutils::typeError("Second argument is not a " "valid attribute name"); } string name = nl->SymbolValue(nameList); ListExpr attrType; ListExpr attrList = nl->Second(nl->Second(stream)); int pos = listutils::findAttribute(attrList, name, attrType); if(pos<=0){ return listutils::typeError("Attribute " + name + " not found in tuple"); } pos--; return nl->ThreeElemList( nl->SymbolAtom(Symbol::APPEND()), nl->OneElemList(nl->IntAtom(pos)), nl->TwoElemList(nl->SymbolAtom(Stream::BasicType()), attrType)); } /* 5.28.2 Value Mapping */ class ProjectTransformLI{ public: ProjectTransformLI(Word& s, CcInt* p):stream(s), pos(p->GetIntval()){ stream.open(); } ~ProjectTransformLI(){ stream.close(); } Attribute* next(){ Tuple* t = stream.request(); if(t==0){ return 0; } else { Attribute* a = t->GetAttribute(pos)->Copy(); t->DeleteIfAllowed(); return a; } } private: Stream stream; int pos; }; int Projecttransformstream(Word* args, Word& result, int message, Word& local, Supplier s) { switch ( message ) { case OPEN:{ if(local.addr){ delete (ProjectTransformLI*)local.addr; } local.addr = new ProjectTransformLI(args[0], (CcInt*)(args[2].addr)); return 0; } case REQUEST:{ if(!local.addr){ return CANCEL; } result.addr = ((ProjectTransformLI*)local.addr)->next(); return result.addr?YIELD:CANCEL; } case CLOSE: if(local.addr){ delete (ProjectTransformLI*)local.addr; local.addr=0; } return 0; } cerr << "Projecttransformstream: UNKNOWN MESSAGE!" << endl; return -1; } /* 5.28.3 Specification */ const string ProjecttransformstreamSpec = "( ( \"Signature\" \"Syntax\" \"Meaning\" " "\"Example\" ) " "(" "stream(tuple((a1 t1)...(an tn))) x an -> (stream tn)" "_ project transformstream [ _ ] " " extracts an attribute from a tuple stream " "query Staedte feed projecttransformstream" " [PLZ] printintstream count" ") )"; /* 5.28.4 Definition of the operator instance */ Operator projecttransformstream ( "projecttransformstream", //name ProjecttransformstreamSpec, //specification Projecttransformstream, //value mapping Operator::SimpleSelect, //trivial selection function ProjecttransformstreamTM //type mapping ); Operator namedtransformstream ( "namedtransformstream", //name NamedtransformstreamSpec, //specification Transformstream_S_TS, //value mapping Operator::SimpleSelect, //trivial selection function NamedtransformstreamTypemap //type mapping ); /* 5.29 The ~echo~ operator stream(X) x bool x DATA -> STREAM(X) X x DATA -> X (X can be all but stream) */ ListExpr EchoTypeMap(ListExpr args){ int len = nl->ListLength(args); if(len!=2 && len!=3){ ErrorReporter::ReportError("Wrong number of parameters"); return nl->TypeError(); } ListExpr errorInfo = nl->OneElemList(nl->SymbolAtom("ERROR")); if(len==2){ // T x S -> T , T # stream(...) // check for kind DATA ListExpr typeToPrint = nl->Second(args); if(! SecondoSystem::GetAlgebraManager() ->CheckKind(Kind::DATA(),typeToPrint,errorInfo)){ ErrorReporter::ReportError("last arg has to be in kind DATA"); return nl->TypeError(); } // check for T# stream if(listutils::isStream(nl->First(args))){ ErrorReporter::ReportError("If the first argument is a stream, two " "further parameters are required"); return nl->TypeError(); } return nl->First(args); } else { // len==3 // first argument has to be a stream if(!listutils::isStream(nl->First(args))){ return listutils::typeError("When 3 parameters are given, the" " first of them must be a stream"); } if(!nl->IsEqual(nl->Second(args),CcBool::BasicType())){ ErrorReporter::ReportError("bool expected as second argument."); return nl->TypeError(); } ListExpr typeToPrint = nl->Third(args); if(! SecondoSystem::GetAlgebraManager() ->CheckKind(Kind::DATA(),typeToPrint,errorInfo)){ ErrorReporter::ReportError("last arg has to be in kind DATA"); return nl->TypeError(); } return nl->First(args); } } /* 5.28.2 Value Mapping for the echo operator */ int Echo_Stream(Word* args, Word& result, int message, Word& local, Supplier s) { bool each = ((CcBool*) args[1].addr)->GetBoolval(); Attribute* s1 = (Attribute*) args[2].addr; Word elem; switch(message){ case OPEN: cout << "OPEN: "; s1->Print(cout) << endl; qp->Open(args[0].addr); return 0; case REQUEST: if(each){ cout << "REQUEST: "; s1->Print(cout) << endl; } qp->Request(args[0].addr,elem); if(qp->Received(args[0].addr)){ result.setAddr(elem.addr); return YIELD; } else{ return CANCEL; } case CLOSE: cout << "CLOSE: "; s1->Print(cout) << endl; qp->Close(args[0].addr); return 0; } return 0; } int Echo_Other(Word* args, Word& result, int message, Word& local, Supplier s) { Attribute* s1 = (Attribute*) args[1].addr; result.setAddr(args[0].addr); s1->Print(cout) << endl; return 0; } /* 5.29.3 Selection function and VM array */ ValueMapping echovm[] = {Echo_Stream, Echo_Other}; int EchoSelect(ListExpr args){ if(nl->ListLength(args)==2){ return 1; } else { return 0; } } /* 5.29.4 Specification */ const string EchoSpec = "( ( \"Signature\" \"Syntax\" \"Meaning\" " "\"Example\" ) " "(" "stream(T) x bool x string -> stream(T) \n" " T x string -> T , T # stream" "_ echo [ _ ] " " prints the given string if operator mapping is called " "query Staedte feed echo[TRUE, \"called\"] count" ") )"; /* 5.29.5 Creatinmg the operator instance */ Operator echo( "echo", EchoSpec, 2, echovm, EchoSelect, EchoTypeMap); /* 5.28 Operator ~count~ Signature: ---- For T in kind DATA: (stream T) -> int ---- The operator counts the number of stream elements. */ /* 5.28.1 Type mapping function for ~count~ */ ListExpr streamCountType( ListExpr args ) { if ( nl->ListLength(args) != 1 ){ return listutils::typeError("one argument expected"); } ListExpr arg1 = nl->First(args); if(!Stream::checkType(arg1)){ return listutils::typeError("stream(DATA) expected"); } return nl->SymbolAtom(CcInt::BasicType()); } /* 5.28.2 Value mapping for operator ~count~ */ int streamCountFun (Word* args, Word& result, int message, Word& local, Supplier s) /* Count the number of elements in a stream. An example for consuming a stream. */ { struct streamCountFunLocalInfo { bool initializedprogress; double *attrSize; double *attrSizeExt; streamCountFunLocalInfo(): initializedprogress( false ) { attrSize = new double[1]; attrSize[0] = sizeof(CcInt); attrSizeExt = new double[1]; attrSizeExt[0] = sizeof(CcInt); } ~streamCountFunLocalInfo() { delete[] attrSize; delete[] attrSizeExt; } }; Word elem; int count = 0; streamCountFunLocalInfo* li; li = static_cast(local.addr); switch(message){ case OPEN: case CLOSE: case REQUEST: { if(!li){ li = new streamCountFunLocalInfo(); local.addr = li; } qp->Open(args[0].addr); qp->Request(args[0].addr, elem); while ( qp->Received(args[0].addr) ){ count++; Attribute* attr = static_cast( elem.addr ); attr->DeleteIfAllowed(); // consume the stream object qp->Request(args[0].addr, elem); } result = qp->ResultStorage(s); static_cast(result.addr)->Set(true, count); qp->Close(args[0].addr); return 0; } case REQUESTPROGRESS:{ if(!local.addr){ return CANCEL; } ProgressInfo* pRes; pRes = (ProgressInfo*) result.addr; pRes->Card = 1 ; //expected cardinality pRes->Size = sizeof(CcInt); //expected total size pRes->SizeExt = sizeof(CcInt); //expected root+ext size (no FLOBs) pRes->noAttrs = 1; //no of attributes pRes->attrSize = li->attrSize; // the complete size pRes->attrSizeExt = li->attrSizeExt; //the root and extension size pRes->sizesChanged = true; //sizes have been recomputed li->initializedprogress = true; ProgressInfo p1; if ( qp->RequestProgress(args[0].addr, &p1) ){ pRes->BTime = p1.Time; // this is a blocking operator! pRes->BProgress = p1.Progress; // this is a blocking operator! pRes->Progress = p1.Progress; pRes->Time = p1.Time; return YIELD; } else { result.addr = 0; return CANCEL; } break; } case CLOSEPROGRESS:{ if(li){ delete li; local.addr = 0; } return 0; } default: { return -1; } } return 0; } /* 5.28.3 Specification for operator ~count~ */ const string streamCountSpec = "( ( \"Signature\" \"Syntax\" \"Meaning\" \"Example\" ) " "( For T in kind DATA:\n" "((stream T)) -> int" "_ count" "Counts the number of elements of a stream." "query intstream (1,10) count" ") )"; /* 5.28.4 Selection Function of operator ~count~ */ int streamCountSelect (ListExpr args ) { return 0; } /* 5.28.5 Definition of operator ~count~ */ Operator streamcount ( "count", //name streamCountSpec, //specification streamCountFun, //value mapping streamCountSelect, //trivial selection function streamCountType //type mapping ); /* 5.29 Operator ~printstream~ ---- For T in kind DATA: (stream T) -> (stream T) ---- For every stream element, the operator calls the ~print~ function and passes on the element. */ /* 5.29.1 Type mapping function for ~printstream~ */ ListExpr streamPrintstreamType( ListExpr args ) { if(!nl->HasLength(args,1)){ return listutils::typeError("One argument expected."); } ListExpr stream = nl->First(args); // case: stream if(Stream::checkType(stream)){ return stream; } if(!Stream::checkType(stream)){ return listutils::typeError("stream or stream expected"); } // case : stream // collect and append the attribute names ListExpr attrList = nl->Second(nl->Second(stream)); bool firstcall = true; ListExpr attrNames = nl->TheEmptyList(); ListExpr last = nl->TheEmptyList(); while( !nl->IsEmpty(attrList) ) { ListExpr attr = nl->First(attrList); attrList = nl->Rest(attrList); ListExpr name = nl->StringAtom(nl->SymbolValue(nl->First(attr))); if(firstcall){ attrNames = nl->OneElemList(name); last = attrNames; firstcall = false; } else { last = nl->Append(last, name); } } // return stream@(noAttrs,attrList) ListExpr res = nl->ThreeElemList( nl->SymbolAtom(Symbol::APPEND()), attrNames, stream); return res; } /* 5.29.2 Value mapping for operator ~printstream~ */ int streamPrintstreamFun (Word* args, Word& result, int message, Word& local, Supplier s) /* Print the elements of an Attribute-type stream. An example for a pure stream operator (input and output are streams). */ { Word elem; switch( message ) { case OPEN: qp->Open(args[0].addr); return 0; case REQUEST: qp->Request(args[0].addr, elem); if ( qp->Received(args[0].addr) ) { ((Attribute*) elem.addr)->Print(cout); cout << endl; result = elem; return YIELD; } else return CANCEL; case CLOSE: qp->Close(args[0].addr); return 0; } /* should not happen */ return -1; } int streamPrintTupleStreamFun (Word* args, Word& result, int message, Word& local, Supplier s) /* Print the elements of a Tuple-type stream. */ { Word tupleWord, elem; string attrName; switch(message) { case OPEN: qp->Open(args[0].addr); return 0; case REQUEST: qp->Request(args[0].addr, tupleWord); if(qp->Received(args[0].addr)) { cout << "Tuple: (" << endl; Tuple* tuple = (Tuple*) (tupleWord.addr); for(int i=0; iGetNoAttributes(); i++){ string attrName = (static_cast (args[i+1].addr))->GetValue(); cout << attrName << ": "; Attribute* attr = (Attribute*) (tuple->GetAttribute(i)); if(attr){ attr->Print(cout); } else { cout << "Invalid attribute: NULL"; } cout << endl; } cout << " )" << endl; result = tupleWord; return YIELD; } else { return CANCEL; } case CLOSE: qp->Close(args[0].addr); return 0; } return 0; } /* 5.29.3 Specification for operator ~printstream~ */ const string streamPrintstreamSpec = "( ( \"Signature\" \"Syntax\" \"Meaning\" \"Example\" ) " "( For T in kind DATA:\n" "((stream T)) -> (stream T)\n" "((stream tuple(X))) -> (stream tuple(X))" "_ printstream" "Prints the elements of an arbitrary stream or tuplestream." "query intstream (1,10) printstream count" ") )"; /* 5.29.4 Selection Function of operator ~printstream~ Uses the same function as for ~count~. */ int streamPrintstreamSelect (ListExpr args ) { ListExpr streamType = nl->Second(nl->First(args)); if( (nl->ListLength(streamType) == 2) && (nl->IsEqual(nl->First(streamType),Tuple::BasicType()))) return 0; else return 1; } ValueMapping streamprintstreammap[] = { streamPrintTupleStreamFun, streamPrintstreamFun }; /* 5.29.5 Definition of operator ~printstream~ */ Operator streamprintstream ( "printstream", //name streamPrintstreamSpec, //specification 2, streamprintstreammap, //value mapping streamPrintstreamSelect, //own selection function streamPrintstreamType //type mapping ); /* 5.30 printstream2 */ ListExpr printstream2TM(ListExpr args){ if(!nl->HasLength(args,3)){ return listutils::typeError("3 arguments expected"); } ListExpr stream = nl->First(args); if(!Stream::checkType(stream) && !Stream::checkType(stream)){ return listutils::typeError("first argument must be a stream of " "tuple or a stream of attribute"); } if(!CcString::checkType(nl->Second(args))){ return listutils::typeError("second argument is not a string"); } if(!CcString::checkType(nl->Third(args))){ return listutils::typeError("third argument is not a string"); } return nl->First(args); } template class printstream2Info{ public: printstream2Info(Word& _stream, CcString* _pre, CcString* _after): stream(_stream){ stream.open(); pre = _pre->IsDefined()?_pre->GetValue():""; after = _after->IsDefined()?_after->GetValue():""; elem = 1; } ~printstream2Info(){ stream.close(); } StreamType* next(){ StreamType* res = stream.request(); if(res){ print(res); } return res; } private: Stream stream; string pre; string after; int elem; void print(StreamType* elem){ cout << pre << " " << this->elem << endl; this->elem++; elem->Print(cout); cout << endl << after << endl; } }; template int printstream2VMT (Word* args, Word& result, int message, Word& local, Supplier s){ printstream2Info* li = (printstream2Info*) local.addr; switch(message){ case OPEN: if(li){ delete li; } local.addr = new printstream2Info(args[0], (CcString*) args[1].addr, (CcString*) args[2].addr); return 0; case REQUEST : { result.addr = li?li->next():0; return result.addr?YIELD:CANCEL; } case CLOSE: { if(li){ delete li; local.addr = 0; } return 0; } } return -1; } ValueMapping printstream2VM[] = { printstream2VMT, printstream2VMT }; OperatorSpec printstream2Spec( "stream(X) x string x string -> stream(X)", "_ printstream2[_,_]", "Outputs elements in the stream in a stream to standard out.", "Each element in enclosed in the strings given as second and third" "argument. The header is extended by the number of the element." "query plz feed printstream2[\"Elem : \", \"-----\"] count" ); int printstream2Select(ListExpr args){ return Stream::checkType(nl->First(args))?0:1; } Operator printstream2Op( "printstream2", printstream2Spec.getStr(), 2, printstream2VM, printstream2Select, printstream2TM ); /* 5.30 Operator ~filter~ ---- For T in kind DATA: ((stream T) (map T bool)) -> (stream T) ---- The operator filters the elements of an arbitrary stream by a predicate. */ /* 5.30.1 Type mapping function for ~filter~ */ ListExpr streamFilterType( ListExpr args ) { ListExpr stream, map; string out, out2; if ( nl->ListLength(args) == 2 ) { stream = nl->First(args); map = nl->Second(args); // test first argument for stream(T), T in kind DATA if(!Stream::checkType(stream)){ return listutils::typeError("Operator filter expects a (stream T), " "T in kind DATA as its first argument. " "The argument provided " "has type '" + out + "' instead."); } // test second argument for map T' bool. T = T' if ( nl->IsAtom(map) || (nl->ListLength(map) != 3) || !nl->IsEqual(nl->First(map), Symbol::MAP()) || !nl->IsEqual(nl->Third(map), CcBool::BasicType()) ) { nl->WriteToString(out, map); ErrorReporter::ReportError("Operator filter expects a " "(map T bool), T in kind DATA, " "as its second argument. " "The second argument provided " "has type '" + out + "' instead."); return nl->SymbolAtom(Symbol::TYPEERROR()); } if ( !( nl->Equal( nl->Second(stream), nl->Second(map) ) ) ) { nl->WriteToString(out, nl->Second(stream)); nl->WriteToString(out2, nl->Second(map)); ErrorReporter::ReportError("Operator filter: the stream base type " "T must match the map's argument type, " "e.g. 1st: (stream T), 2nd: (map T bool). " "The actual types are 1st: '" + out + "', 2nd: '" + out2 + "'."); return nl->SymbolAtom(Symbol::TYPEERROR()); } } else { // wrong number of arguments ErrorReporter::ReportError("Operator filter expects two arguments."); return nl->SymbolAtom(Symbol::TYPEERROR()); } return stream; // return type of first argument } /* 5.30.2 Value mapping for operator ~filter~ */ int streamFilterFun (Word* args, Word& result, int message, Word& local, Supplier s) /* Filter the elements of a stream by a predicate. An example for a stream operator and also for one calling a parameter function. */ { struct StreamFilterLocalInfo{ StreamFilterLocalInfo():current( 0 ), returned( 0 ), done( false ){}; int current; //tuples read int returned; //tuples returned bool done; //arg stream exhausted }; Word elem, funresult; ArgVectorPointer funargs; StreamFilterLocalInfo *fli = static_cast(local.addr); switch( message ){ case OPEN:{ if(fli){ delete fli; } fli = new StreamFilterLocalInfo(); local.setAddr(fli); qp->Open(args[0].addr); return 0; } case REQUEST:{ if(!fli || fli->done){ return CANCEL; } funargs = qp->Argument(args[1].addr); //Get the argument vector for //the parameter function. qp->Request(args[0].addr, elem); while ( qp->Received(args[0].addr) ) { fli->current++; (*funargs)[0] = elem; //Supply the argument for the //parameter function. qp->Request(args[1].addr, funresult); //Ask the parameter function //to be evaluated. if ( ((CcBool*) funresult.addr)->GetBoolval() ){ // object fulfills condition - pass it on: result = elem; fli->returned++; return YIELD; } //otherwise: consume the stream object: ((Attribute*) elem.addr)->DeleteIfAllowed(); qp->Request(args[0].addr, elem); // get next element } // while return CANCEL; } case CLOSE:{ qp->Close(args[0].addr); return 0; } case CLOSEPROGRESS:{ if( fli ){ delete fli; local.setAddr(0); } return 0; } case REQUESTPROGRESS:{ ProgressInfo p1; ProgressInfo* pRes; const double uFilter = 0.01; pRes = (ProgressInfo*) result.addr; if ( qp->RequestProgress(args[0].addr, &p1) ){ pRes->CopySizes(p1); if ( fli ){ //filter was started if ( fli->done ){ //arg stream exhausted, all known pRes->Card = (double) fli->returned; pRes->Time = p1.Time + (double) fli->current * qp->GetPredCost(s) * uFilter; pRes->Progress = 1.0; pRes->CopyBlocking(p1); return YIELD; } if ( fli->returned >= enoughSuccessesSelection ){ //stable state assumed now pRes->Card = p1.Card * ( (double) fli->returned / (double) (fli->current)); pRes->Time = p1.Time + p1.Card * qp->GetPredCost(s) * uFilter; if ( p1.BTime < 0.1 && pipelinedProgress ){ //non-blocking, use pipelining pRes->Progress = p1.Progress; } else { pRes->Progress = (p1.Progress * p1.Time + fli->current * qp->GetPredCost(s) * uFilter) / pRes->Time; } pRes->CopyBlocking(p1); return YIELD; } } //filter not yet started or not enough seen pRes->Card = p1.Card * qp->GetSelectivity(s); pRes->Time = p1.Time + p1.Card * qp->GetPredCost(s) * uFilter; if ( p1.BTime < 0.1 && pipelinedProgress ){ //non-blocking, use pipelining pRes->Progress = p1.Progress; } else { pRes->Progress = (p1.Progress * p1.Time) / pRes->Time; } pRes->CopyBlocking(p1); return YIELD; } else { return CANCEL; } } } // switch /* should not happen */ return -1; } /* 5.30.3 Specification for operator ~filter~ */ const string streamFilterSpec = "( ( \"Signature\" \"Syntax\" \"Meaning\" \"Example\" ) " "( For T in kind DATA:\n" "((stream T) (map T bool)) -> (stream T)" "_ filter [ fun ]" "Filters the elements of a stream by a predicate." "query intstream (1,10) filter[. > 7] printintstream count" ") )"; /* 5.30.4 Selection Function of operator ~filter~ Uses the same function as for ~count~. */ /* 5.30.5 Definition of operator ~filter~ */ Operator streamfilter ( "filter", //name streamFilterSpec, //specification streamFilterFun, //value mapping streamCountSelect, //trivial selection function streamFilterType //type mapping ); /* 5.41 Operator ~realstream~ ---- real x real x real -> stream(real) ---- The ~realstream~ operator takes three arguments of type ~real~. It produces a stream of real values in range provided by the first two arguments with a stepwide taken from the third argument. */ /* 5.41.1 Type mapping function for ~realstream~ */ ListExpr realstreamTypeMap( ListExpr args ){ ListExpr arg1, arg2, arg3; if ( nl->ListLength(args) == 3 ) { arg1 = nl->First(args); arg2 = nl->Second(args); arg3 = nl->Third(args); if ( nl->IsEqual(arg1, CcReal::BasicType()) && nl->IsEqual(arg2,CcReal::BasicType()) && nl->IsEqual(arg3, CcReal::BasicType()) ){ return nl->TwoElemList(nl->SymbolAtom(Stream::BasicType()), nl->SymbolAtom(CcReal::BasicType())); } } ErrorReporter::ReportError("real x real x real expected"); return nl->TypeError(); } /* 5.41.2 Value mapping for operator ~realstream~ */ int realstreamFun (Word* args, Word& result, int message, Word& local, Supplier s) { struct RangeAndDiff { double first, last, diff; long iter; long card; bool initializedprogress; double* attrSize; double* attrSizeExt; RangeAndDiff(Word* args) { initializedprogress = false; CcReal* r1 = ((CcReal*)args[0].addr); CcReal* r2 = ((CcReal*)args[1].addr); CcReal* r3 = ((CcReal*)args[2].addr); iter = 0; bool defined = r1->IsDefined() && r2->IsDefined() && r3->IsDefined(); if (defined) { first = r1->GetRealval(); last = r2->GetRealval(); diff = r3->GetRealval(); } else { first = 0; last = -1; diff = 1; } if(diff > 0.0) { card = (long)(ceil(fabs( (last - first) / diff ) + 1.0)); } else { card = 1; } attrSize = new double[1]; attrSize[0] = sizeof(CcReal); attrSizeExt = new double[1]; attrSizeExt[0] = sizeof(CcReal); } ~RangeAndDiff() { if(attrSize) { delete[] attrSize; attrSize = 0;} if(attrSizeExt) {delete[] attrSizeExt; attrSizeExt = 0;} } }; RangeAndDiff* range_d = 0; double current = 0; double cd = 0; CcReal* elem = 0; switch( message ) { case OPEN: { range_d = new RangeAndDiff(args); local.addr = range_d; return 0; } case REQUEST: { range_d = ((RangeAndDiff*) local.addr); cd = (double) range_d->iter * range_d->diff; current = range_d->first + cd; if(range_d->diff == 0.0){ // don't allow endless loops return CANCEL; } else if(range_d->diff < 0.0){ if(current < range_d->last){ return CANCEL; } else { elem = new CcReal(true,current); result.addr = elem; range_d->iter++; return YIELD; } } else { // diff > 0.0 if(current > range_d->last){ return CANCEL; } else { elem = new CcReal(true,current); result.addr = elem; range_d->iter++; return YIELD; } } } case CLOSE: { // localinfo is destroyed in CLOSEPROGRESS return 0; } case CLOSEPROGRESS: { range_d = ((RangeAndDiff*) local.addr); if(range_d){ delete range_d; local.setAddr(0); } range_d = 0; return 0; } case REQUESTPROGRESS: { ProgressInfo* pRes = (ProgressInfo*) result.addr; range_d = ((RangeAndDiff*) local.addr); if( range_d ){ if(range_d->initializedprogress){ pRes->sizesChanged = false; //sizes were not recomputed range_d->initializedprogress = true; } else { pRes->sizesChanged = true; //first call } pRes->Size = sizeof(CcReal); //total tuple size // (including FLOBs) pRes->SizeExt = sizeof(CcReal); //tuple root and extension part pRes->noAttrs = 1; //no of attributes pRes->attrSize = range_d->attrSize; // complete size per attr pRes->attrSizeExt = range_d->attrSizeExt; // root +extension // size per attr const double feedccreal = 0.001; //milliseconds per CcReal pRes->Card = range_d->card; //expected cardinality pRes->Time = (range_d->card) * feedccreal; //expected time, [ms] pRes->Progress = (double) range_d-> iter / (double) range_d->card; pRes->BTime = 0.00001; // blocking time must not be 0 pRes->BProgress = 1.0; // blocking progress [0,1] return YIELD; } else { return CANCEL; } } default: { return -1; /* should not happen */ } } // switch return -1; /* should not happen */ } /* 5.41.3 Specification for operator ~~ */ struct realstreamInfo : OperatorInfo { realstreamInfo() : OperatorInfo() { name = "realstream"; signature = CcReal::BasicType()+" x "+CcReal::BasicType()+ " -> stream(real)"; syntax = "realstream(_ , _, _)"; meaning = "Creates a stream of reals containing the numbers " "between the first and the second argument. The third " "argument defines the step width."; example = "realstream(-100.0, 100.0, 0.5) count"; supportsProgress = true; } }; /* 5.41 Operator ~intstream~ ---- int x int --> stream(int) ---- */ // TypeMappingFunction ListExpr intstreamTypeMap( ListExpr args ) { string err = "int x int expected"; if(!nl->HasLength(args,2)){ return listutils::typeError(err); } if(!listutils::isSymbol(nl->First(args),CcInt::BasicType()) || !listutils::isSymbol(nl->Second(args),CcInt::BasicType())){ return listutils::typeError(err); } return nl->TwoElemList(nl->SymbolAtom(Stream::BasicType()), nl->SymbolAtom(CcInt::BasicType())); } // ValueMappingFunction int intstreamValueMap(Word* args, Word& result, int message, Word& local, Supplier s) { // An auxiliary type which keeps the state of this // operation during two requests struct Range { int current; int last; int card; bool initializedprogress; double* attrSize; double* attrSizeExt; Range(CcInt* i1, CcInt* i2): initializedprogress(false), attrSize(0), attrSizeExt(0) { // Do a proper initialization even if one of the // arguments has an undefined value if (i1->IsDefined() && i2->IsDefined()) { current = i1->GetIntval(); last = i2->GetIntval(); } else { // this initialization will create an empty stream current = 1; last = 0; } card = last - current + 1; attrSize = new double[1]; attrSizeExt = new double[1]; attrSize[0] = i1->Sizeof(); // core size of a CcInt attrSizeExt[0] = i1->Sizeof(); // ext size of a CcInt is the same } ~Range() { delete[] attrSize; delete[] attrSizeExt; } }; Range* range = static_cast(local.addr); switch( message ) { case OPEN: { // initialize the local storage CcInt* i1 = static_cast( args[0].addr ); CcInt* i2 = static_cast( args[1].addr ); if(range){ delete range; } range = new Range(i1, i2); local.addr = range; return 0; } case REQUEST: { // return the next stream element if(!range) { return CANCEL; } else if ( range->current <= range->last ) { CcInt* elem = new CcInt(true, range->current++); result.addr = elem; return YIELD; } else { result.addr = 0; return CANCEL; } } case CLOSE: { // free the local storage #ifndef USE_PROGRESS if (range != 0) { delete range; local.addr = 0; } #endif return 0; } #ifdef USE_PROGRESS case CLOSEPROGRESS: { if (range != 0) { delete range; local.addr = 0; } return 0; } case REQUESTPROGRESS: { ProgressInfo* pRes = (ProgressInfo*) result.addr; if(!range){ return CANCEL; } if( !range->initializedprogress ){ pRes->sizesChanged = true; //first call range->initializedprogress = true; } else { pRes->sizesChanged = false; //sizes were not recomputed } pRes->Size = sizeof(CcInt); //total tuple size // (including FLOBs) pRes->SizeExt = sizeof(CcInt); //tuple root and extension part pRes->noAttrs = 1; //no of attributes pRes->attrSize = range->attrSize; // complete size per attr pRes->attrSizeExt = range->attrSizeExt; // root +extension // size per attr const double feedccint = 0.001; //milliseconds per CcReal pRes->Card = range->card; //expected cardinality pRes->Time = (range->card) * feedccint; //expected time, [ms] pRes->Progress = ((double)(range->card - (range->last - range->current + 1))) / ((double) range->card); pRes->BTime = 0.00001; // blocking time must not be 0 pRes->BProgress = 1.0; // blocking progress [0,1] return YIELD; } #endif default: { /* should never happen */ assert(false); return -1; } } } // Specification struct intstreamInfo : OperatorInfo { intstreamInfo() : OperatorInfo() { name = "intstream"; signature = CcInt::BasicType() + " x " + CcInt::BasicType() + " -> stream(int)"; syntax = "intstream(_ , _)"; meaning = "Creates a stream of all integers starting with the first and " "ending with the second argument."; supportsProgress = true; } }; /* 5 TestOperator the following operator does the same as the intstream operator. But it uses a new method for progress estimation. 5.1.1 Type Mapping */ ListExpr intstream2TM(ListExpr args){ string err = "int x int expected"; if(!nl->HasLength(args,2)){ return listutils::typeError(err); } if(!CcInt::checkType(nl->First(args)) || !CcInt::checkType(nl->Second(args))){ return listutils::typeError(err); } return nl->TwoElemList( listutils::basicSymbol >(), listutils::basicSymbol()); } /* 5.1.2 LocalInfo class Don't care about progress estimation! */ class IntStream2Info{ public: IntStream2Info(CcInt* i1, CcInt* i2){ if(!i1->IsDefined() || !i2->IsDefined()){ current = 1; max = 0; } else { current = i1->GetValue(); max = i2->GetValue(); } } CcInt* next(){ return current>max?0:new CcInt(true,++current); } private: int current; int max; }; /* 5.1.3 Value Mapping Call the init Function of the CostEstimation class when open is called. */ int intstream2VM(Word* args, Word& result, int message, Word& local, Supplier s){ IntStream2Info* li = (IntStream2Info*) local.addr; switch(message){ case OPEN: { if(li){ delete li; } local.addr = new IntStream2Info((CcInt*) args[0].addr, (CcInt*) args[1].addr); qp->getCostEstimation(s)->init(args,local.addr); return 0; } case REQUEST:{ if(!li){ result.addr = 0; } else { result.addr = li->next(); } return result.addr?YIELD:CANCEL; } case CLOSE:{ if(li){ delete li; local.addr = 0; } return 0; } } return -1; } /* 5.1.4 Specification */ OperatorSpec intstream2Spec( "int x int -> int", "intstream2(min, max)", "returns a stream of integer from min to max (both included)", "query intstream2(3, 5) count "); /* 5.1.5 CostEstimation */ class IntStream2CE: public CostEstimation{ public: IntStream2CE():initialized(false), firstCall(true),pi(0), timePerElem(0.001) { } virtual void init(Word* args, void* localInfo){ CcInt* i1 = (CcInt*) args[0].addr; CcInt* i2 = (CcInt*) args[1].addr; returned = 0; initialized = true; firstCall = true; if(!pi){ pi = new ProgressInfo(); pi->attrSize = new double[1]; pi->attrSizeExt = new double[1]; } if(!i1->IsDefined() || !i2->IsDefined()){ pi->Card = 0; } else { int v1 = i1->GetValue(); int v2 = i2->GetValue(); pi->Card = v1>v2?0: (v2 - v1) + 1; } pi->Size = sizeof(CcInt); pi->SizeExt = i1->Sizeof(); pi->noAttrs = 1; pi->attrSize[0] = pi->Size; pi->attrSizeExt[0] = pi->SizeExt; pi->sizesChanged = true; pi->Time = 0; pi->Progress = 0; pi->BTime = 0; pi->BProgress = 1; } virtual ~IntStream2CE(){ if(pi){ delete[] pi->attrSize; delete[] pi->attrSizeExt; delete pi; pi = 0; } } virtual int requestProgress(Word* args, ProgressInfo* result, void* localInfo, const bool argsAvailable) { if(!initialized){ return CANCEL; } pi->sizesChanged = firstCall; firstCall = false; result->CopySizes(*pi); result->Progress = pi->Card>0?(double)returned / (double)pi->Card : 1.0; result->Time = (double) (pi->Card - returned) * timePerElem; return YIELD; } private: bool initialized; bool firstCall; ProgressInfo* pi; double timePerElem; }; CostEstimation* intstream2CECreator(){ return new IntStream2CE(); } /* 5.1.5 Operator instance */ Operator intstream2( "intstream2", intstream2Spec.getStr(), intstream2VM, Operator::SimpleSelect, intstream2TM, intstream2CECreator); /* 6 Type operators Type operators are used only for inferring argument types of parameter functions. They have a type mapping but no evaluation function. */ /* 6.1 Type Operator ~STREAMELEM~ This type operator extracts the type of the elements from a stream type given as the first argument and otherwise just forwards its type. ---- ((stream T1) ...) -> T1 (T1 ...) -> T1 ---- */ ListExpr STREAMELEMTypeMap( ListExpr args ) { if(nl->ListLength(args) >= 1) { ListExpr first = nl->First(args); if (nl->ListLength(first) == 2) { if (nl->IsEqual(nl->First(first), Symbol::STREAM())) { return nl->Second(first); } else { return first; } } else { return first; } } return listutils::typeError("at least one argument expected"); } const string STREAMELEMSpec = "(( \"Signature\" \"Syntax\" \"Meaning\" \"Remarks\" )" "( ((stream T1) ... ) -> T1\n" "(T1 ... ) -> T1" "type operator" "Extracts the type of the stream elements if the first " "argument is a stream and forwards the first argument's type " "otherwise." "Not for use with sos-syntax ))"; Operator STREAMELEM ( "STREAMELEM", STREAMELEMSpec, 0, Operator::SimpleSelect, STREAMELEMTypeMap ); /* 6.2 Type Operator ~STREAMELEM2~ This type operator extracts the type of the elements from the stream type within the second element within a list of argument types. Otherwise, the first arguments type is simplyforwarded. ---- (T1 (stream T2) ...) -> T2 (T1 T2 ...) -> T2 ---- */ ListExpr STREAMELEM2TypeMap( ListExpr args ) { if(nl->ListLength(args) >= 2) { ListExpr second = nl->Second(args); if (nl->ListLength(second) == 2) { if (nl->IsEqual(nl->First(second), Symbol::STREAM())) { return nl->Second(second); } else { return second; } } else { return second; } } return nl->SymbolAtom(Symbol::TYPEERROR()); } const string STREAMELEM2Spec = "(( \"Signature\" \"Syntax\" \"Meaning\" \"Remarks\" )" "( (T1 (stream T2) ... ) -> T2\n" "( T1 T2 ... ) -> T2" "type operator" "Extracts the type of the elements from a stream given " "as the second argument if it is a stream. Otherwise, it forwards " "the original type." "Not for use with sos-syntax. ))"; Operator STREAMELEM2 ( "STREAMELEM2", STREAMELEM2Spec, 0, Operator::SimpleSelect, STREAMELEM2TypeMap ); /* 6.3 Operator ~ensure~ 6.3.1 The Specification */ struct ensure_Info : OperatorInfo { ensure_Info(const string& opName) : OperatorInfo() { name = opName; signature = "stream(T) x int -> bool"; syntax = "ensure[n]"; meaning = "Returns true if at least n tuples are received" ", otherwise false."; } }; /* 6.3.2 Type mapping of operator ~ensure~ stream(DATA) x int -> bool stream(tuple(...) x int -> bool */ ListExpr ensure_tm( ListExpr args ) { if(nl->ListLength(args)!=2){ ErrorReporter::ReportError("two arguments expected"); return nl->TypeError(); } ListExpr first = nl->First(args); ListExpr second = nl->Second(args); if(!nl->IsEqual(second,CcInt::BasicType())){ ErrorReporter::ReportError("second argument must be of type int"); return nl->TypeError(); } if(!listutils::isDATAStream(first) && !listutils::isTupleStream(first)){ ErrorReporter::ReportError("first argument must be of type" " stream(tuple(...)) or stream(DATA)"); return nl->TypeError(); } return nl->SymbolAtom(CcBool::BasicType()); } int ensure_sf( ListExpr args ) { NList list(args); list = list.first(); int num = 0; NList attrs; if ( list.checkStreamTuple(attrs) ) { num = 0; } else { num = 1; } return num; } /* 6.3.3 Value mapping function of operator ~ensure~ */ template int ensure_vm(Word* args, Word& result, int message, Word& local, Supplier s) { Word elem(Address(0) ); result = qp->ResultStorage(s); CcBool* res = static_cast( result.addr ); CcInt* CcNum = (CcInt*)args[1].addr; if(!CcNum->IsDefined()){ res->SetDefined(false); return 0; } int num = CcNum->GetValue(); qp->Open(args[0].addr); qp->Request(args[0].addr, elem); while ((num>0) && qp->Received(args[0].addr)) { static_cast( elem.addr )->DeleteIfAllowed(); qp->Request(args[0].addr, elem); num--; } qp->Close(args[0].addr); bool ensure = (num == 0); res->Set( true, ensure ); return 0; } ValueMapping ensure_vms[] = { ensure_vm, ensure_vm, 0 }; /* 6.4 Operator ~tail~ */ /* 6.4.1 Type Mapping for Operator ~tail~: --- (stream (tuple X)) x int ---> (append TRUE (stream (tuple X))) (stream (tuple X)) x int x bool ---> (stream (tuple X)) (stream T) x int ---> (append TRUE (stream T)) (stream (T)) x int x bool ---> (stream T) --- */ ListExpr streamTypeMapTail( ListExpr args ) { NList type(args); bool doAppend = false; ListExpr errorInfo = nl->OneElemList(nl->SymbolAtom("ErrorInfo")); if(type.length() < 2 || type.length() > 3){ return NList::typeError( "Expected 2 or 3 arguments."); } if(type.hasLength(3)){ if(type.third() != NList(CcBool::BasicType())){ return NList::typeError( "Optional 3rd argument must be " "'bool', if specified!"); } }else{ // appending the default for unspecified optional 3rd argument required doAppend = true; } if(type.second() != NList(CcInt::BasicType())){ return NList::typeError( "Expected 'int' for 2nd argument!"); } // 1st argument must be a stream... if(!( type.first().hasLength(2) && type.first().first().isSymbol(sym.STREAM()))){ return NList::typeError( "Expected a stream as 1st argument!"); } NList streamtype = type.first().second(); // stream elements must be in kind DATA or (tuple X) if( !( streamtype.hasLength(2) && streamtype.first().isSymbol(sym.TUPLE()) && IsTupleDescription(streamtype.second().listExpr()) ) && !(am->CheckKind(Kind::DATA(),streamtype.listExpr(),errorInfo))){ return NList::typeError( "Expected a stream of DATA or TUPLE."); } if(doAppend){ NList resType1 =NList( NList(Symbol::APPEND()), NList(true, false).enclose(), type.first() ); return resType1.listExpr(); }else{ DEBUGMESSAGE("Resulttype = " << type.first().convertToString()); return type.first().listExpr(); } } // localinfo used within // value mapping for stream(tuple(X)) x int [ x bool ]--> stream(tuple(X)) class TailLocalInfo { public: TailLocalInfo(const int mN, const bool mKeepOrder, Supplier s) : n ( mN ), keepOrder ( mKeepOrder ), finished ( true ), bufferSize ( 0 ), returnedResults( 0 ), buffer ( (size_t)qp->GetMemorySize(s)*1024*1024 ) { // member translationTable initialized automatically }; ~TailLocalInfo() { // destructor for members buffer, it and // translationTable will be called automatically }; // Store 'tuple' within the local buffer and delete it, if allowed. void AppendTuple(Tuple *tuple) { buffer.AppendTuple( tuple ); // append current stream elem if(bufferSize == 0){ // DEBUGMESSAGE(" Inserting the first tuple..."); finished = false; } bufferSize++; // increase element counter // DEBUGMESSAGE(" Inserting tuple " << bufferSize << "/" << n); if(bufferSize > n){ // DEBUGMESSAGE(" Queue full. Pop front."); translationTable.pop_front(); // remove head of buffer } // The tuplebuffer should use subsequent tupleids starting with 0 and // proceeding up to bufferSize. translationTable.push_back((TupleId)(bufferSize-1)); // append tupleId tuple->DeleteIfAllowed(); // delete appended element from memory return; }; // Get the next tuple from the local buffer // set member ~finished~ when done // return 0, iff no further result exists Tuple* GetNextTuple() { TupleId Id; // Since TupeId is defined as a long. and the tuplebuffer uses // subsequent long values as TupleIds (starting with 0), we can simply // enumerate all used TupleIds starting with the first one needed. if(finished || returnedResults >= n){ // DEBUGMESSAGE(" Finished " << returnedResults << "/" << n); finished = true; return ((Tuple*) 0); } if(keepOrder){ // get first elem first // DEBUGMESSAGE(" Getting from front"); Id = translationTable.front(); translationTable.pop_front(); }else{ // get last elem first // DEBUGMESSAGE(" Getting from back"); Id = translationTable.back(); translationTable.pop_back(); } returnedResults++; // DEBUGMESSAGE(" Getting tuple " << returnedResults << "/" << n); finished = translationTable.empty(); return buffer.GetTuple( Id , false); }; int n; bool keepOrder; bool finished; protected: long bufferSize; long returnedResults; TupleBuffer buffer; deque translationTable; }; // value mapping for stream(tuple(X)) x int [ x bool ]--> stream(tuple(X)) int StreamTailTupleTreamVM(Word* args, Word& result, int message, Word& local, Supplier s) { TailLocalInfo* li; Word InputStream = args[0]; CcInt* CcN = static_cast(args[1].addr); CcBool* CcKeepOrder = static_cast(args[2].addr); Word elem; switch( message ) { case OPEN:{ DEBUGMESSAGE("Start OPEN"); if( !CcN->IsDefined() || !CcKeepOrder->IsDefined() || !InputStream.addr || CcN->GetIntval() <= 0 ) { local.setAddr(0); DEBUGMESSAGE("End OPEN 1"); return 0; } // else: consume the InputStream li = new TailLocalInfo( CcN->GetIntval(), CcKeepOrder->GetBoolval(), s ); local.setAddr(li); // open and consume the input stream qp->Open(InputStream.addr); qp->Request(InputStream.addr, elem); // get first stream elem while (qp->Received(args[0].addr)) { Tuple *tuple = static_cast(elem.addr); li->AppendTuple(tuple); qp->Request(InputStream.addr, elem); // get next stream elem } // InputStream will be closed when calling CLOSE DEBUGMESSAGE("End OPEN 2"); return 0; } case REQUEST:{ DEBUGMESSAGE("Start REQUEST"); if(!local.addr){ DEBUGMESSAGE("End REQUEST: CANCEL1 "); return CANCEL; } li = static_cast(local.addr); if(li->finished){ DEBUGMESSAGE("End REQUEST: CANCEL2 "); return CANCEL; } Tuple *restuple = li->GetNextTuple(); if(!restuple){ DEBUGMESSAGE("End REQUEST: CANCEL3 "); return CANCEL; } // else { // restuple->IncReference(); // reference for the stream // } result.setAddr( restuple ); DEBUGMESSAGE("End REQUEST: YIELD"); return YIELD; } case CLOSE:{ DEBUGMESSAGE("Start CLOSE"); qp->Close(InputStream.addr); if(local.addr){ li = static_cast(local.addr); delete li; local.setAddr(0); } DEBUGMESSAGE("End CLOSE"); return 0; } } return 0; } // localinfo used within value mapping for // stream T x int [ x bool ]--> stream T, T in DATA // This is a specialization of TailLocalInfo. class DataTailLocalInfo: public TailLocalInfo { public: DataTailLocalInfo(const int mN, const bool mKeepOrder, const ListExpr elemNumType, Supplier s) : TailLocalInfo( mN, mKeepOrder, s ) { ListExpr numericElemType = elemNumType; ListExpr attrExpr = nl->TwoElemList(nl->SymbolAtom("elem"),numericElemType); ListExpr tupleExpr = nl->TwoElemList(nl->SymbolAtom(Tuple::BasicType()), nl->OneElemList(attrExpr)); bufferTupleType = new TupleType(tupleExpr); }; ~DataTailLocalInfo() { if(bufferTupleType) bufferTupleType->DeleteIfAllowed(); }; // move elem into internal tuplebuffer void AppendElem(Attribute *elem) { Tuple *tuple = new Tuple(bufferTupleType); tuple->PutAttribute( 0, elem ); AppendTuple(tuple); // AppendTuple(...) already calls tuple->DeleteIfAllowed()! }; // return the next element from the local tuplebuffer Attribute* GetNextElem(){ Tuple *tuple = GetNextTuple(); if(tuple){ Attribute *elem = (tuple->GetAttribute(0))->Copy(); tuple->DeleteIfAllowed(); return elem; } // else: No elem left! return static_cast(0); }; private: TupleType *bufferTupleType; }; // value mapping for stream T x int [ x bool ]--> stream T, T in DATA int StreamTailDataStreamVM(Word* args, Word& result, int message, Word& local, Supplier s) { DataTailLocalInfo* li; Word InputStream = args[0]; CcInt* CcN = static_cast(args[1].addr); CcBool* CcKeepOrder = static_cast(args[2].addr); Word elem; switch( message ) { case OPEN:{ DEBUGMESSAGE("Start OPEN"); if( !CcN->IsDefined() || !CcKeepOrder->IsDefined() || !InputStream.addr || CcN->GetIntval() <= 0 ) { local.setAddr(0); DEBUGMESSAGE("End OPEN 1"); return 0; } // else: consume the InputStream ListExpr elemTypeNL = nl->Second(qp->GetNumType( s )); li = new DataTailLocalInfo( CcN->GetIntval(), CcKeepOrder->GetBoolval(), elemTypeNL, s ); local.setAddr(li); // open and consume the input stream qp->Open(InputStream.addr); qp->Request(InputStream.addr, elem); // get first stream elem while (qp->Received(args[0].addr)) { Attribute *myObj = static_cast(elem.addr); li->AppendElem(myObj); // store the tuple in a tuplebuffer qp->Request(InputStream.addr, elem); // get next stream elem } // InputStream will be closed when calling CLOSE DEBUGMESSAGE("End OPEN 2"); return 0; } case REQUEST:{ DEBUGMESSAGE("Start REQUEST"); if(!local.addr){ DEBUGMESSAGE("End REQUEST: CANCEL1 "); return CANCEL; } li = static_cast(local.addr); if(li->finished){ DEBUGMESSAGE("End REQUEST: CANCEL2 "); return CANCEL; } result.setAddr(li->GetNextElem()); // extract the object if(!result.addr){ DEBUGMESSAGE("End REQUEST: CANCEL3 "); return CANCEL; } DEBUGMESSAGE("End REQUEST: YIELD"); return YIELD; } case CLOSE:{ DEBUGMESSAGE("Start CLOSE"); qp->Close(InputStream.addr); if(local.addr){ li = static_cast(local.addr); delete li; local.setAddr(0); } DEBUGMESSAGE("End CLOSE"); return 0; } } return 0; } ValueMapping streamtailmap[] = { StreamTailTupleTreamVM, StreamTailDataStreamVM }; int streamTailSelect( ListExpr args ) { NList type(args); if( type.first().second().hasLength(2) && type.first().second().first() == Tuple::BasicType()) return 0; return 1; } const string StreamSpecTail = "( ( \"Signature\" \"Syntax\" \"Meaning\" \"Example\" ) " "( stream(tuple(X)) x int [ x bool] -> stream(tuple(X))\n" "stream(T)) x int [ x bool] -> stream(T), T in DATA" "_ tail[ n ], _ tail[ n, keepOrder ]" "Delivers only the last 'n' stream elements. Optional parameter " "'keepOrder' controls the ordering of the result. If set to TRUE (default) " "the original ordering is maintained. Otherwise, the tuples are returned " "in reverse order." "query ten feed head[6] tail[2] tconsume" ") )"; Operator streamtail( "tail", StreamSpecTail, 2, streamtailmap, streamTailSelect, streamTypeMapTail); /* 6.6 Operator ~kinds~ 6.6.1 Type Mapping */ ListExpr KindsTypeMap(const ListExpr args){ if(nl->ListLength(args)!=1){ ErrorReporter::ReportError("Wrong number of arguments "); return nl->TypeError(); } else { if(nl->IsEqual(nl->First(args),CcString::BasicType())){ return nl->TwoElemList(nl->SymbolAtom(Stream::BasicType()), nl->SymbolAtom(CcString::BasicType())); } else { ErrorReporter::ReportError("Wrong number of arguments "); return nl->TypeError(); } } } /* 6.6.2 Value Mapping */ class KindsLocalInfo{ public: KindsLocalInfo(CcString* name):pos(0),kinds(){ if(!name) { return; } if(!name->IsDefined()){ return; } string type = name->GetValue(); if(!SecondoSystem::GetCatalog()->IsTypeName(type)){ return; } int algId=0; int typeId = 0; SecondoSystem::GetCatalog()->GetTypeId(name->GetValue(),algId,typeId); TypeConstructor* tc = am->GetTC(algId,typeId); kinds = tc->GetKinds(); } CcString* nextKind(){ if(pos>=kinds.size()){ return 0; } else { CcString* res = new CcString(true,kinds[pos]); pos++; return res; } } private: unsigned int pos; vector kinds; }; int KindsVM(Word* args, Word& result, int message, Word& local, Supplier s) { switch(message){ case OPEN: { local.setAddr( new KindsLocalInfo( static_cast(args[0].addr))); return 0; } case REQUEST: { KindsLocalInfo* li = static_cast(local.addr); if(!li){ return CANCEL; }else{ result.setAddr(li->nextKind()); return result.addr ? YIELD : CANCEL; } } case CLOSE: { KindsLocalInfo* li = static_cast(local.addr); if(li){ delete li; local.setAddr(0); } return 0; } default: return 0; } } /* 6.6.3 Specification */ const string KindsSpec = "(( \"Signature\" \"Syntax\" \"Meaning\" \"Remarks\" )" "( string -> stream(string)" "_ kinds " "Produces a stream of strings for a given type" "query string kinds transformstream consume ))"; Operator kinds ( "kinds", KindsSpec, KindsVM, Operator::SimpleSelect, KindsTypeMap ); /* 6.7 Operator ~timeout~ This operator will terminate stream procesing, when it its result is requested a specified time after opening it. Until then, is just returns the result of its stream predecessor. */ /* Type Mapping Fubnction: ---- stream(X) x real --> stream(X) ---- */ ListExpr TimeoutTypeMap(const ListExpr args){ if(nl->ListLength(args)!=2){ return listutils::typeError("one argument expected"); } ListExpr first = nl->First(args); if(!listutils::isStream(first)){ return listutils::typeError("Expected stream as 1st argument."); } ListExpr second = nl->Second(args); if( !CcReal::checkType(second)) { return listutils::typeError("Expected real as 2nd argument."); } return nl->First(args); } struct TimeoutLocalInfo { TimeoutLocalInfo(const double _seconds): useProgress( false ), seconds( _seconds), elemcounter( 0 ), finished( false ), streamisopen( false ) { initial = time( 0 ); // get current time if(seconds < 0.0){ seconds = 0.0; finished = true; } }; bool useProgress; // check during RequestProgress only time_t initial; // the time, when the stopwatch started double seconds; // the time difference for the timeout (in seconds) long elemcounter; // number of already returned stream elements bool finished; // true iff finished bool streamisopen; }; int TimeoutVM(Word* args, Word& result, int message, Word& local, Supplier s) { switch(message){ case OPEN: { // set termination conditions TimeoutLocalInfo* li = new TimeoutLocalInfo( (static_cast(args[1].addr))->GetRealval()); local.setAddr(li); qp->Open(args[0].addr); li->streamisopen = true; return 0; } case REQUEST: { TimeoutLocalInfo* li = static_cast(local.addr); if(!li){ return CANCEL; } else if( li->finished || ( !li->useProgress && (difftime(time(0),li->initial) >= li->seconds) ) ) { li->finished = true; return CANCEL; } else { qp->Request(args[0].addr,result); if(result.addr == 0){ li->finished = true; return CANCEL; } else { li->elemcounter++; return YIELD; } } } case CLOSE: { TimeoutLocalInfo* li = static_cast(local.addr); if(li && li->streamisopen){ qp->Close(args[0].addr); } return 0; } case CLOSEPROGRESS: { TimeoutLocalInfo* li = (TimeoutLocalInfo*) local.addr; if ( li ) { delete li; local.setAddr(0); } return 0; } case REQUESTPROGRESS: { TimeoutLocalInfo* li = (TimeoutLocalInfo*) local.addr; if( !li ) { return CANCEL; } ProgressInfo *pRes; pRes = (ProgressInfo*) result.addr; ProgressInfo p1; double runtime = difftime(time(0),li->initial); li->finished = ( runtime >= li->seconds ); if ( qp->RequestProgress(args[0].addr, &p1) ) { pRes->Copy(p1); double myprogress = runtime / li->seconds; if(myprogress <= 0.0){ myprogress = 0.0000001; // avoid div/0 } double mycard = li->elemcounter / myprogress; if(mycard <= 1){ mycard = 1; // avoid div/0 } pRes->Progress = min(max(p1.Progress, myprogress), pRes->BProgress); pRes->Time = min( p1.Time, li->seconds*1000 ); if( p1.BTime > pRes->Time){ pRes->Time = max(pRes->Time, p1.BTime); } pRes->Card = min( p1.Card, mycard); //a number between 0 and 1 } else { return CANCEL; } if( !li->useProgress || p1.sizesChanged ){ li->useProgress = true; pRes->sizesChanged = true; } if(li->finished){ pRes->Progress = 1.0; } return YIELD; } default: { return 0; } } } const string TimeoutSpec = "(( \"Signature\" \"Syntax\" \"Meaning\" \"Remarks\" )" "( stream(T) x real -> stream(T)" "_ timeout [ Seconds ]" "Stops stream processing after the specified time has passed. " "Negative arguments are interpreted as 0." "query intstream(1,9999999999) timeout[5.0] count ))"; Operator timeout ( "timeout", TimeoutSpec, TimeoutVM, Operator::SimpleSelect, TimeoutTypeMap ); /* 6.8 IsOrdered Signature: stream(DATA) -> bool Checks whether a stream is sorted. */ ListExpr IsOrderedTM(ListExpr args){ if(nl->ListLength(args)!=1){ return listutils::typeError("one argument expected"); } ListExpr arg = nl->First(args); if(!listutils::isDATAStream(arg)){ return listutils::typeError("stream of DATA expected"); } return nl->SymbolAtom(CcBool::BasicType()); } int IsOrderedVM(Word* args, Word& result, int message, Word& local, Supplier s){ qp->Open(args[0].addr); Word elem; qp->Request(args[0].addr,elem); Attribute* attr=0; bool sorted=true; while(qp->Received(args[0].addr) && sorted){ Attribute* next = static_cast(elem.addr); if(attr){ int cmp = attr->Compare(next); if(cmp >0){ sorted = false; } attr->DeleteIfAllowed(); attr = next; next = 0; } else { // first element attr = next; } if(sorted){ qp->Request(args[0].addr,elem); } } if(attr){ attr->DeleteIfAllowed(); } qp->Close(args[0].addr); result = qp->ResultStorage(s); CcBool* res = static_cast(result.addr); res->Set(true,sorted); return 0; } const string IsOrderedSpec = "(( \"Signature\" \"Syntax\" \"Meaning\" \"Remarks\" )" "( stream(DATA) -> bool" "_ isOrdered" "Checks whether the argument stream is sorted in ascending order" "" "query intstream(10, 1000) isOrdered ))"; Operator isOrdered ( "isOrdered", IsOrderedSpec, IsOrderedVM, Operator::SimpleSelect, IsOrderedTM ); /* 6.9 Operator ~sbuffer~ This operator buffers an incoming stream of tuple or DATA. */ ListExpr sbufferTM(ListExpr args){ string err = "{stream(tuple(X)) , stream(DATA)} x int expected"; if(!nl->HasLength(args,2)){ return listutils::typeError("wrong number of args; " + err); } ListExpr arg = nl->First(args); if( (Stream::checkType(arg) || Stream::checkType(arg)) && CcInt::checkType(nl->Second(args)) ){ return arg; } return listutils::typeError(err); } template class sbufferInfo{ public: sbufferInfo(Word _stream, size_t _buffersize): stream(_stream), buffersize(_buffersize){ stream.open(); output = false; opos = 0; eos = false; } ~sbufferInfo(){ stream.close(); for(size_t i=0;iDeleteIfAllowed(); buffer[i] = 0; } } } T* next(){ if(buffersize==0){ return stream.request(); } if(!output){ if(eos || !fillBuffer()){ return 0; } } T* n = buffer[opos]; buffer[opos] = 0; opos++; if(opos==buffer.size()){ output= false; } return n; } private: Stream stream; size_t buffersize; vector buffer; bool output; size_t opos; bool eos; bool fillBuffer(){ T* t = stream.request(); if(!t){ return false; } buffer.clear(); opos = 0; buffer.push_back(t); while(buffer.size() < (size_t) buffersize && t){ t = stream.request(); if(t){ buffer.push_back(t); } } if(t==0){ eos = true; } output = true; return true; } }; template int sbufferVMT(Word* args, Word& result, int message, Word& local, Supplier s){ sbufferInfo* li = (sbufferInfo*) local.addr; switch(message){ case OPEN: { if(li) { delete li; } CcInt* bs = (CcInt*) args[1].addr; size_t size = 0; if(bs->IsDefined() && bs->GetValue()>0){ size = bs->GetValue(); } local.addr = new sbufferInfo(args[0], size); return 0; } case REQUEST: result.addr = li?li->next():0; return result.addr?YIELD:CANCEL; case CLOSE: if(li){ delete li; local.addr = 0; } return 0; } return -1; }; int sbufferSelect(ListExpr args){ return Attribute::checkType(nl->Second(nl->First(args)))?0:1; } ValueMapping sbufferVM[] = { sbufferVMT, sbufferVMT }; OperatorSpec sbufferSpec( "stream(X) -> stream(X)", "_ sbuffer", "Buffers a DATA or a tuple stream. ", " query strassen feed sbuffer count" ); Operator sbufferOp( "sbuffer", sbufferSpec.getStr(), 2, sbufferVM, sbufferSelect, sbufferTM ); /* 16 Operators for merging two sorted attribute streams */ ListExpr mergeOpTM(ListExpr args){ if(!nl->HasLength(args,2)){ return listutils::typeError("expected two args"); } if(!Stream::checkType(nl->First(args)) ||!Stream::checkType(nl->Second(args))){ return listutils::typeError("two sorted DATA streams expected"); } if(!nl->Equal(nl->First(args),nl->Second(args))){ return listutils::typeError("found different stream types"); } return nl->First(args); } enum mergeOP{SEC,DIFF,UNION,MERGE}; template class mergeOpInfo{ public: mergeOpInfo(Word _stream1, Word _stream2): stream1(_stream1), stream2(_stream2){ stream1.open(); stream2.open(); a1 = 0; a2 = 0; first = true; } ~mergeOpInfo(){ stream1.close(); stream2.close(); if(a1){ a1->DeleteIfAllowed(); } if(a2){ a2->DeleteIfAllowed(); } } Attribute* next(){ if(first){ first = false; a1 = stream1.request(); a2 = stream2.request(); } switch(op){ case SEC: return getNextSec(); case DIFF: return getNextDiff(); case UNION: return getNextUnion(); case MERGE: return getNextMerge(); default: return 0; } } private: Stream stream1; Stream stream2; bool first; Attribute* a1, *a2; Attribute* getNextSec(){ while(a1 && a2){ int cmp = a1->Compare(a2); if(cmp<0){ a1->DeleteIfAllowed(); a1 = stream1.request(); } else if(cmp>0){ a2->DeleteIfAllowed(); a2 = stream2.request(); } else { // equal found Attribute* res = a1; a1 = stream1.request(); a2->DeleteIfAllowed(); a2 = stream2.request(); return res; } } return 0; } Attribute* getNextDiff(){ while(a1){ if(!a2){ Attribute* res = a1; a1 = stream1.request(); return res; } int cmp = a1->Compare(a2); if(cmp<0){ Attribute* res = a1; a1 = stream1.request(); return res; } else if(cmp>0){ a2->DeleteIfAllowed(); a2=stream2.request(); } else { a1->DeleteIfAllowed(); a1 = stream1.request(); } } return 0; } Attribute* getNextMerge(){ while(a1 || a2){ if(!a1){ Attribute* res = a2; a2 = stream2.request(); return res; } if(!a2){ Attribute* res = a1; a1 = stream1.request(); return res; } int cmp = a1->Compare(a2); if(cmp<=0){ Attribute* res = a1; a1 = stream1.request(); return res; } else { Attribute* res = a2; a2 = stream2.request(); return res; } } return 0; } Attribute* getNextUnion(){ while(a1 || a2){ if(!a1){ Attribute* res = a2; a2 = stream2.request(); return res; } if(!a2){ Attribute* res = a1; a1 = stream1.request(); return res; } int cmp = a1->Compare(a2); if(cmp<0){ Attribute* res = a1; a1 = stream1.request(); return res; } else if(cmp>0) { Attribute* res = a2; a2 = stream2.request(); return res; } else { Attribute* res = a1; a1 = stream1.request(); a2->DeleteIfAllowed(); a2 = stream2.request(); return res; } } return 0; } }; template int mergeOpVMT(Word* args, Word& result, int message, Word& local, Supplier s){ mergeOpInfo* li = (mergeOpInfo*) local.addr; switch(message){ case OPEN: if(li){ delete li; } local.addr = new mergeOpInfo(args[0],args[1]); return 0; case REQUEST: result.addr = li?li->next():0; return result.addr?YIELD:CANCEL; case CLOSE: if(li){ delete li; local.addr = 0; } return 0; } return -1; } OperatorSpec mergeDiffSpec( " stream(D) x stream(D) -> stream(D), w in DATA", "_ _ mergediff", "Compuetes the difference between two DATA streams", "query intstream(1,10) intstream(3,7) mergediff count" ); OperatorSpec mergeSecSpec( " stream(D) x stream(D) -> stream(D), w in DATA", "_ _ mergesec", "Compuetes the intersection between two DATA streams", "query intstream(1,10) intstream(3,7) mergesec count" ); OperatorSpec mergeUnionSpec( " stream(D) x stream(D) -> stream(D), w in DATA", "_ _ mergeunion", "Computes the union between two DATA streams", "query intstream(1,10) intstream(3,7) mergeunion count" ); OperatorSpec mergeSpec( " stream(D) x stream(D) -> stream(D), w in DATA", "_ _ merge", "merges two sorted DATA streams into a single ine", "query intstream(1,10) intstream(3,7) merge count" ); Operator mergediffOp( "mergediff", mergeDiffSpec.getStr(), mergeOpVMT, Operator::SimpleSelect, mergeOpTM ); Operator mergesecOp( "mergesec", mergeSecSpec.getStr(), mergeOpVMT, Operator::SimpleSelect, mergeOpTM ); Operator mergeunionOp( "mergeunion", mergeUnionSpec.getStr(), mergeOpVMT, Operator::SimpleSelect, mergeOpTM ); Operator mergeOp( "mergeattrstreams", mergeSpec.getStr(), mergeOpVMT, Operator::SimpleSelect, mergeOpTM ); /* 6.15 Operator rdup */ ListExpr rdupTM(ListExpr args){ if(!nl->HasLength(args,1)){ return listutils::typeError("expected one arg"); } if(!Stream::checkType(nl->First(args))){ return listutils::typeError("expected stream(DATA)"); } return nl->First(args); } class rdupInfo{ public: rdupInfo(Word _stream): stream(_stream),last(0),first(true){ stream.open(); } ~rdupInfo(){ if(last){ last->DeleteIfAllowed(); } stream.close(); } Attribute* next(){ if(first){ first = false; last = stream.request(); if(!last) return 0; return last->Copy(); } if(!last){ return 0; } Attribute* current; while( (current=stream.request())){ int cmp = last->Compare(current); if(cmp==0){ current->DeleteIfAllowed(); } else { last->DeleteIfAllowed(); last = current; return last->Copy(); } } last->DeleteIfAllowed(); last = 0; return 0; } private: Stream stream; Attribute* last; bool first; }; int rdupVM(Word* args, Word& result, int message, Word& local, Supplier s){ rdupInfo* li = (rdupInfo*) local.addr; switch(message){ case OPEN: if(li) delete li; local.addr = new rdupInfo(args[0]); return 0; case REQUEST: result.addr = li?li->next():0; return result.addr?YIELD:CANCEL; case CLOSE: if(li){ delete li; local.addr = 0; } } return -1; } OperatorSpec rdupSpec( "stream(D) -> stream(D) , D in DATA", "_ rdup", "removes duplicates from a sorted attribute stream", "query intstream(1,10) intstream(1,10) merge rdup count" ); Operator rdupOp( "rdup", rdupSpec.getStr(), rdupVM, Operator::SimpleSelect, rdupTM ); /* Operator xth */ ListExpr xthTM(ListExpr args){ if(!nl->HasLength(args,2)){ return listutils::typeError("two args expected"); } if(!Stream::checkType(nl->First(args))){ return listutils::typeError("first arg mut be astream of DATA"); } if(!CcInt::checkType(nl->Second(args))){ return listutils::typeError("second arg must be an int"); } return nl->Second(nl->First(args)); } int xthVM(Word* args, Word& result, int message, Word& local, Supplier s){ result = qp->ResultStorage(s); Attribute* res = (Attribute*) result.addr; CcInt* x = (CcInt*) args[1].addr; if(!x->IsDefined()){ res->SetDefined(false); return 0; } int a = x->GetValue(); if(a<1){ res->SetDefined(false); return 0; } Stream stream(args[0]); stream.open(); int i=1; Attribute* r; while( (r=stream.request())){ if(i==a){ res->CopyFrom(r); r->DeleteIfAllowed(); stream.close(); return 0; } r->DeleteIfAllowed(); i++; } stream.close(); res->SetDefined(false); return 0; } OperatorSpec xthSpec( "stream(D) x int -> D, D in DATA", " _ xth[_] ", "Extract the x-th attribute form a stream. " "If this attribute does not exist, the result is undefined", " query intstream(1,10) xth[6] " ); Operator xthOp( "xth", xthSpec.getStr(), xthVM, Operator::SimpleSelect, xthTM ); ListExpr minmaxTM(ListExpr args){ if(!nl->HasLength(args,1)){ return listutils::typeError("one arg expected"); } if(!Stream::checkType(nl->First(args))){ return listutils::typeError("stream(DATA) expected"); } return nl->Second(nl->First(args)); } template int minmaxVM(Word* args, Word& result, int message, Word& local, Supplier s){ result = qp->ResultStorage(s); Attribute* res = (Attribute*) result.addr; Attribute* value = 0; Stream stream(args[0]); stream.open(); Attribute* v; while( (v=stream.request())){ if(!value){ value = v; } else { int cmp = value->Compare(v); if(!isMin){ cmp = -cmp; } if(cmp>0){ value->DeleteIfAllowed(); value = v; } else { v->DeleteIfAllowed(); } } } if(value){ res->CopyFrom(value); value->DeleteIfAllowed(); } else { res->SetDefined(false); } return 0; } OperatorSpec minattrSpec( "stream(T) -> T, T in DATA", " _ minattr", "Retrieves the minimum value within a stream of attributes", "query intstream(1,10) minattr" ); OperatorSpec maxattrSpec( "stream(T) -> T, T in DATA", " _ maxattr", "Retrieves the maximum value within a stream of attributes", "query intstream(1,10) maxattr" ); Operator minattrOp( "minattr", minattrSpec.getStr(), minmaxVM, Operator::SimpleSelect, minmaxTM ); Operator maxattrOp( "maxattr", maxattrSpec.getStr(), minmaxVM, Operator::SimpleSelect, minmaxTM ); /* 6.19 Operator ~nth~ */ ListExpr nthTM(ListExpr args){ if(!nl->HasLength(args,3)){ return listutils::typeError("three args expected"); } if(!Stream::checkType(nl->First(args))){ return listutils::typeError("first arg must be stream of DATA"); } if(!CcInt::checkType(nl->Second(args))){ return listutils::typeError("second arg must be an int"); } if(!CcBool::checkType(nl->Third(args))){ return listutils::typeError("third arg must be a bool"); } return nl->First(args); } class nthInfo{ public: nthInfo(Word _stream, int _n, bool _fixed) : stream(_stream), n(_n), fixed(_fixed) { stream.open(); srand(time(0)); } ~nthInfo(){ stream.close(); } Attribute* next(){ int v = n-1; if(!fixed){ v = rand()%n; } Attribute* res = 0; for( int i=0;iDeleteIfAllowed(); } return 0; } if( i == v ){ res = cand; } else { cand->DeleteIfAllowed(); } } return res; } int getN() const{ return n; } private: Stream stream; int n; bool fixed; }; int nthVM(Word* args, Word& result, int message, Word& local, Supplier s){ nthInfo* li = (nthInfo*) local.addr; switch(message){ case OPEN: { if(li){ delete li; local.addr = 0; } CcInt* n = (CcInt*) args[1].addr; CcBool* fixed = (CcBool*) args[2].addr; if(!n->IsDefined() || !fixed->IsDefined()){ return 0; } int v = n->GetValue(); if(v<1){ return 0; } local.addr = new nthInfo(args[0],v, fixed->GetValue()); return 0; } case REQUEST: { result.addr = li?li->next():0; return result.addr?YIELD:CANCEL; } case CLOSE : { return 0; } case REQUESTPROGRESS: { ProgressInfo p1; ProgressInfo* pRes; pRes = (ProgressInfo*) result.addr; if (qp-> RequestProgress(args[0].addr, &p1) ) { pRes->Copy(p1); int n = li?li->getN():1; pRes->Card = p1.Card/n; return YIELD; } else { return CANCEL; } } case CLOSEPROGRESS: if(li){ delete li; local.addr = 0; } return 0; } return -1; } OperatorSpec nthSpec( "stream(D) x int x bool -> stream(D), D in DATA", "_ nth[_,_]", "Extracts each nth element from a attribute stream if " "the boolean value is TRUE. In the false case from a " "block of n attributes, randomly one is chosen.", "query intstream(1,10) nth[2,TRUE] count" ); Operator nthOp( "nth", nthSpec.getStr(), nthVM, Operator::SimpleSelect, nthTM ); /* 8.12 Operators sum and avg */ ListExpr sumTM(ListExpr args){ if(!nl->HasLength(args,1)){ return listutils::typeError("1 arg expected"); } ListExpr a = nl->First(args); if( !Stream::checkType(a) && !Stream::checkType(a) && !Stream::checkType(a)){ return listutils::typeError("stream(T) expected, T in {int,longint,real}"); } return nl->Second(a); } ListExpr avgTM(ListExpr args){ if(!nl->HasLength(args,1)){ return listutils::typeError("1 arg expected"); } ListExpr a = nl->First(args); if( !Stream::checkType(a) && !Stream::checkType(a) && !Stream::checkType(a)){ return listutils::typeError("stream(T) expected, T in {int,longint,real}"); } return listutils::basicSymbol(); } template int sumavgVMT(Word* args, Word& result, int message, Word& local, Supplier s){ result = qp->ResultStorage(s); typename C::ctype sum = 0; Stream stream(args[0]); C* v; stream.open(); errno=0; int count=0; while( (v=stream.request())){ if(v->IsDefined()){ typename C::ctype val = v->GetValue(); sum = sum + val; if(errno!=0){ // overflow detected ((Attribute*) result.addr)->SetDefined(false); stream.close(); return 0; } count++; } // ignore undefined values ?? v->DeleteIfAllowed(); } stream.close(); if(avg){ if(count>0){ ((CcReal*)result.addr)->Set(true, (double)sum/count); }else { ((CcReal*)result.addr)->Set(false,0); } } else { ((C*)result.addr)->Set(true,sum); } return 0; } ValueMapping avgVM[] = { sumavgVMT, sumavgVMT, sumavgVMT }; ValueMapping sumVM[] = { sumavgVMT, sumavgVMT, sumavgVMT }; OperatorSpec avgSpec( "stream(T) -> T, T in {int,real,longing}", " _ avgattr", "computes the average of a numeric attribute stream " "ignoring undefined values.", "query intstream(1,10) avgattr" ); OperatorSpec sumSpec( "stream(T) -> T, T in {int,real,longing}", " _ sumattr", "computes the sum of a numeric attribute stream " "ignoring undefined values.", "query intstream(1,10) sumattr" ); int avgsumSelect(ListExpr args){ ListExpr a = nl->Second(nl->First(args)); if(CcInt::checkType(a)) return 0; if(CcReal::checkType(a)) return 1; if(LongInt::checkType(a)) return 2; return -1; } Operator avgattrOp( "avgattr", avgSpec.getStr(), 3, avgVM, avgsumSelect, avgTM ); Operator sumattrOp( "sumattr", sumSpec.getStr(), 3, sumVM, avgsumSelect, sumTM ); /* 7.21 ~consume~ for attribute streams */ ListExpr consumeTM(ListExpr args){ if(!nl->HasLength(args,1)){ return listutils::typeError("one arg expected"); } ListExpr a = nl->First(args); if(!Stream::checkType(a)){ return listutils::typeError("expected attribute stream"); } ListExpr at = nl->Second(a); if(listutils::isSymbol(at,"arel")){ return listutils::typeError("attribute relations are not supported"); } ListExpr attrList = nl->OneElemList( nl->TwoElemList( nl->SymbolAtom("Elem"), at)); ListExpr res = nl->TwoElemList( listutils::basicSymbol(), nl->TwoElemList( listutils::basicSymbol(), attrList)); return res; } int consumeVM(Word* args, Word& result, int message, Word& local, Supplier s){ result = qp->ResultStorage(s); GenericRelation* res = (GenericRelation*) result.addr; Stream stream(args[0]); ListExpr tupleType = nl->Second(qp->GetNumType(s)); TupleType* tt = new TupleType(tupleType); Attribute* a; stream.open(); while((a=stream.request())){ Tuple* tuple = new Tuple(tt); tuple->PutAttribute(0,a); res->AppendTuple(tuple); tuple->DeleteIfAllowed(); } stream.close(); tt->DeleteIfAllowed(); return 0; } OperatorSpec consumeSpec( "stream(D) -> rel(tuple((Elem D))), D in DATA", " _ consume", "Collects an attribute stream into a relation.", "query intstream(1,10) consume" ); Operator consumeOp( "consume", consumeSpec.getStr(), consumeVM, Operator::SimpleSelect, consumeTM ); /* 9.17 operator ~ts~ */ ListExpr tsTM(ListExpr args){ if(!nl->HasLength(args,2)){ return listutils::typeError("expected a stream and a list of funs"); } if(!Stream::checkType(nl->First(args))){ return listutils::typeError("first arg is not an attribute stream"); } ListExpr a = nl->Second(nl->First(args)); // type within stream ListExpr funlist = nl->Second(args); if(nl->AtomType(funlist)!=NoAtom){ return listutils::typeError("second arg is not a list"); } ListExpr attrList= nl->TheEmptyList(); ListExpr last = nl->TheEmptyList(); bool first = true; while(!nl->IsEmpty(funlist)){ ListExpr fun = nl->First(funlist); funlist = nl->Rest(funlist); if(!nl->HasLength(fun,2)){ return listutils::typeError("2nd arg contains an element which " "is not a named function"); } if(nl->AtomType(nl->First(fun))!=SymbolType){ return listutils::typeError("found invalid attribute name"); } ListExpr an = nl->First(fun); string n = nl->SymbolValue(an); ListExpr map = nl->Second(fun); if(!listutils::isMap<1>(map)){ return listutils::typeError("invalid function definition for "+n); } if(!nl->Equal(a, nl->Second(map))){ return listutils::typeError("function argument for " + n + " differs to the stream element"); } ListExpr funRes = nl->Third(map); if(!Attribute::checkType(funRes)){ return listutils::typeError("function result of " + n + " not in kind DATA"); } ListExpr attr = nl->TwoElemList(an, funRes); if(first){ attrList = nl->OneElemList(attr); last = attrList; first = false; } else { last = nl->Append(last, attr); } } if(!listutils::isAttrList(attrList)){ return listutils::typeError("name conflicts or invalid names " "in attributes"); } return nl->TwoElemList( listutils::basicSymbol >(), nl->TwoElemList( listutils::basicSymbol(), attrList)); } class tsInfo{ public: tsInfo(Word* args, ListExpr resTuple): stream(args[0]){ stream.open(); tt = new TupleType(resTuple); supplier = args[1].addr; nofuns = qp->GetNoSons(supplier); for(int i=0; i < nofuns;i++){ Supplier supplier2 = qp->GetSupplier(supplier, i); Supplier supplier3 = qp->GetSupplier(supplier2, 1); ArgVectorPointer funargs1 = qp->Argument(supplier3); funs.push_back(supplier3); funargs.push_back(funargs1); } } ~tsInfo(){ stream.close(); tt->DeleteIfAllowed(); } Tuple* next(){ Attribute* arg = stream.request(); if(!arg){ return 0; } Tuple* tuple = new Tuple(tt); Word value; for(size_t i=0;iRequest(funs[i],value); tuple->PutAttribute(i,((Attribute*)value.addr)->Clone()); } arg->DeleteIfAllowed(); return tuple; } private: Stream stream; TupleType* tt; Supplier supplier; int nofuns; vector funs; vector funargs; }; int tsVM(Word* args, Word& result, int message, Word& local, Supplier s){ tsInfo* li = (tsInfo*) local.addr; switch(message){ case OPEN : if(li) delete li; local.addr = new tsInfo(args, nl->Second(GetTupleResultType(s))); return 0; case REQUEST: result.addr = li?li->next():0; return result.addr?YIELD:CANCEL; case CLOSE: if(li){ delete li; local.addr = 0; } return 0; } return -1; } OperatorSpec tsSpec( "stream(T) x funlist(T->U_i) -> stream(tuple(Ui)) ", "_ ts[_,_] ", "Creates a tuple stream from a stream of attributes " " using a list of named functions", "query intstream(1,10) ts[N1 : . + 1, N2 : . - 1] consume" ); Operator tsOp( "ts", tsSpec.getStr(), tsVM, Operator::SimpleSelect, tsTM ); /* 7.19 Operator ~as~ */ ListExpr asTM(ListExpr args){ if(!nl->HasLength(args,2)){ return listutils::typeError("stream(tuple) x fun(tuple -> attr) expected"); } if(!Stream::checkType(nl->First(args))){ return listutils::typeError("first arg is not a tuple stream"); } ListExpr fun = nl->Second(args); if(!listutils::isMap<1>(fun)){ return listutils::typeError("second arg is not an unary function"); } ListExpr tt = nl->Second(nl->First(args)); if(!nl->Equal(tt, nl->Second(fun))){ return listutils::typeError("function argument and tuple in stream " "differ"); } ListExpr funres = nl->Third(fun); if(!Attribute::checkType(funres)){ return listutils::typeError("funres not in kind DATA"); } return nl->TwoElemList( listutils::basicSymbol >(), funres); } class asInfo{ public: asInfo(Word _stream, Word _fun): stream(_stream){ stream.open(); fun = _fun.addr; funargs = qp->Argument(fun); } ~asInfo(){ stream.close(); } Attribute* next(){ Tuple* tuple = stream.request(); if(!tuple){ return 0; } Word value; (*funargs)[0].setAddr(tuple); qp->Request(fun,value); Attribute* res = ((Attribute*)value.addr)->Clone(); tuple->DeleteIfAllowed(); return res; } private: Stream stream; Supplier fun; ArgVectorPointer funargs; }; int asVM(Word* args, Word& result, int message, Word& local, Supplier s){ asInfo* li = (asInfo*) local.addr; switch(message){ case OPEN : if(li) delete li; local.addr = new asInfo(args[0], args[1]); return 0; case REQUEST: result.addr = li?li->next():0; return result.addr?YIELD:CANCEL; case CLOSE: if(li){ delete li; local.addr = 0; } return 0; } return -1; } OperatorSpec asSpec( "stream(tuple) x fun(tuple->attr) -> stream(attr)", "_ as [_] ", "Converts a tuple stream into an attribute stream " "using a function.", "query ten feed as[. + 3] count" ); Operator asOp( "as", asSpec.getStr(), asVM, Operator::SimpleSelect, asTM ); /* Operator ~streamfun~ */ ListExpr streamfunTM(ListExpr args){ if(!nl->HasLength(args,2)){ return listutils::typeError("two args expected"); } if(!(Stream::checkType(nl->First(args)) || Stream::checkType(nl->First(args)))){ return listutils::typeError("first arg must be a stream " "of DATA or a stream of tuple"); } if(!listutils::isMap<1>(nl->Second(args))){ return listutils::typeError("second arg is not an unary function"); } if(!nl->Equal(nl->Second(nl->First(args)), nl->Second(nl->Second(args)))){ return listutils::typeError("function argument and stream " "elem type differ"); } ListExpr funres = nl->Third(nl->Second(args)); if(nl->HasLength(funres,2) && listutils::isSymbol(nl->First(funres), "stream")){ return listutils::typeError("function result cannot be a stream"); } return nl->First(args); } template class streamfunInfo{ public: streamfunInfo( Word& _stream, Word& _fun, int _f = 1): stream(_stream), fun(_fun.addr){ funarg = qp->Argument(fun); stream.open(); count = 0; f = _f; } ~streamfunInfo(){ stream.close(); } T* next(){ T* in = stream.request(); if(!in) return 0; count++; if(count % f == 0){ (*funarg)[0] = in; qp->Request(fun,funres); count = 0; } return in; } private: Stream stream; Supplier fun; ArgVectorPointer funarg; Word funres; int f; int count; }; template int streamfunVMT(Word* args, Word& result, int message, Word& local, Supplier s){ streamfunInfo* li = (streamfunInfo*) local.addr; switch(message){ case OPEN : { if(li) delete li; local.addr = new streamfunInfo(args[0],args[1]); return 0; } case REQUEST: result.addr = li?li->next():0; return result.addr?YIELD:CANCEL; case CLOSE:{ if(li){ delete li; local.addr = 0; } return 0; } } return -1; } int streamfunSelect(ListExpr args){ return Stream::checkType(nl->First(args))?0:1; } ValueMapping streamfunVM[] = { streamfunVMT, streamfunVMT }; OperatorSpec streamfunSpec( "stream(T) x fun(T)->.. -> stream(T), T in {DATA,tuple}", "_ streamfun[_]", "Performs a function for each element in a stream. " " The result is ignored, thus this operator is only useful " "for functions with side effects.", "query intstream(1,10) streamfun( TRUE echo[.] ) count" ); Operator streamfunOp( "streamfun", streamfunSpec.getStr(), 2, streamfunVM, streamfunSelect, streamfunTM ); /* Operator prog calls a function each xth stream element */ ListExpr progTM(ListExpr args){ if(!nl->HasLength(args,3)){ return listutils::typeError("two args expected"); } if(!(Stream::checkType(nl->First(args)) || Stream::checkType(nl->First(args)))){ return listutils::typeError("first arg must be a stream " "of DATA or a stream of tuple"); } if(!listutils::isMap<1>(nl->Second(args))){ return listutils::typeError("second arg is not an unary function"); } if(!nl->Equal(nl->Second(nl->First(args)), nl->Second(nl->Second(args)))){ return listutils::typeError("function argument and stream " "elem type differ"); } ListExpr funres = nl->Third(nl->Second(args)); if(nl->HasLength(funres,2) && listutils::isSymbol(nl->First(funres), "stream")){ return listutils::typeError("function result cannot be a stream"); } if(!CcInt::checkType(nl->Third(args))){ return listutils::typeError("third argument has to be of type int"); } return nl->First(args); } template int progVMT(Word* args, Word& result, int message, Word& local, Supplier s){ streamfunInfo* li = (streamfunInfo*) local.addr; switch(message){ case OPEN : { if(li) delete li; int f =1; CcInt* F = (CcInt*) args[2].addr; if(F->IsDefined() ){ f = std::max(f,F->GetValue()); } local.addr = new streamfunInfo(args[0],args[1],f); return 0; } case REQUEST: result.addr = li?li->next():0; return result.addr?YIELD:CANCEL; case CLOSE:{ if(li){ delete li; local.addr = 0; } return 0; } } return -1; } int progSelect(ListExpr args){ return Stream::checkType(nl->First(args))?0:1; } ValueMapping progVM[] = { progVMT, progVMT }; OperatorSpec progSpec( "stream(T) x fun(T)->.. x int -> stream(T), T in {DATA,tuple}", "_ streamfun[_]", "Performs a function for each xth element in a stream. " "x is defined by the last argument." " The result is ignored. ", "query intstream(1,10) prog( TRUE echo[.] , 3 ) count" ); Operator progOp( "prog", progSpec.getStr(), 2, progVM, progSelect, progTM ); /* 1.23 Operator ~delayS~ */ ListExpr delaySTM(ListExpr args){ if(!nl->HasLength(args,2) && !nl->HasLength(args,3)){ return listutils::typeError("2 or 3 arguments required"); } if(!listutils::isStream(nl->First(args))){ return listutils::typeError("first argument has to be a stream"); } if(!CcInt::checkType(nl->Second(args))){ return listutils::typeError("second argument is not an int"); } if(nl->HasLength(args,3) && !CcInt::checkType(nl->Third(args))){ return listutils::typeError("third argument is not an int"); } return nl->First(args); } int delaySVM(Word* args, Word& result, int message, Word& local, Supplier s){ std::pair* li = (std::pair*) local.addr; switch(message){ case OPEN: { qp->Open(args[0].addr); if(li){ delete li; local.addr = 0; } CcInt* T1 = (CcInt*) args[1].addr; if(!T1->IsDefined()) return 0; int t1 = T1->GetValue(); if(t1<0) return 0; int t2 = t1; if(qp->GetNoSons(s)==3){ T1 = (CcInt*) args[2].addr; if(!T1->IsDefined()){ return 0; } t2 = T1->GetValue(); } if(t1>t2){ return 0; } local.addr = new pair( ((size_t)t1), ((size_t)t2)); return 0; } case REQUEST: qp->Request(args[0].addr, result); if(!qp->Received(args[0].addr)) return CANCEL; if(li){ size_t t; if(li->first==li->second){ t = li->first; } else { t = rand() % (li->second - li->first) + li->first; } usleep(((size_t)t)*1000u); } return YIELD; case CLOSE : if(li){ delete li; local.addr = 0; } return 0; } return -1; } OperatorSpec delaySSpec( "stream(X) x int [x int] -> stream(X)", "_ delayS[_,_]", "Delays the transfer of each element in the stream by a " "certain duration (in millisecond). If the oprional " "argument is present, the delay will be a random number " "between the first and the second number.", "query plz feed delayS[100,500] printstream count;" ); Operator delaySOp( "delayS", delaySSpec.getStr(), delaySVM, Operator::SimpleSelect, delaySTM ); /* 1.39 sync */ ListExpr syncTM(ListExpr args){ if(!nl->HasLength(args,4)){ return listutils::typeError("4 arguments expected"); } ListExpr stream = nl->First(args); if(!listutils::isStream(stream)){ return listutils::typeError("stream required as the first argument"); } ListExpr fun = nl->Second(args); if(!listutils::isMap<1>(fun)){ return listutils::typeError("second arg is not an unary function"); } if(!nl->Equal(nl->Second(stream), nl->Second(fun))){ return listutils::typeError("type of stream and type of function " "argument differ"); } if(listutils::isStream(nl->Third(fun))){ return listutils::typeError("function result cannot be a stream."); } if(!CcInt::checkType(nl->Third(args))){ return listutils::typeError("third arg must be an int"); } if(!CcReal::checkType(nl->Fourth(args))){ return listutils::typeError("fourth arg must be a real"); } return stream; } class syncInfo{ public: syncInfo(Word& _stream, Supplier _fun, int _minTuples, int _minTime): stream(_stream), fun(_fun), minElems(_minTuples){ SmiEnvironment::CommitTransaction(); //close current transaction qp->Open(stream.addr); av = qp->Argument(fun); elems = 0; lastTime = getTime(); minTime = _minTime>0?_minTime:0; } ~syncInfo(){ qp->Close(stream.addr); SmiEnvironment::BeginTransaction(); } void* next(){ qp->Request(stream.addr,result); if(!qp->Received(stream.addr)){ return 0; } elems++; check(result); return result.addr; } private: Word stream; Supplier fun; int minElems; size_t minTime; ArgVectorPointer av; int elems; struct timeval tp; size_t lastTime; Word result; Word funres; size_t getTime(){ gettimeofday(&tp, NULL); return (size_t)tp.tv_sec*1000+tp.tv_usec/1000; } void check(Word& w){ if(elems >= minElems){ elems = 0; size_t t2 = getTime(); size_t k = t2 - lastTime; if(k >= minTime){ lastTime = t2; (*av)[0] = w; SmiEnvironment::BeginTransaction(); qp->Request(fun,funres); SmiEnvironment::CommitTransaction(); } } } void newTransaction(){ SmiEnvironment::CommitTransaction(); SmiEnvironment::BeginTransaction(); } }; int syncVM(Word* args, Word& result, int message, Word& local, Supplier s){ syncInfo* li = (syncInfo*) local.addr; switch(message){ case OPEN: { if(li){ delete li; local.addr = 0; } CcInt* minTu = (CcInt*) args[2].addr; if(!minTu->IsDefined()){ return 0; } CcReal* minTime = (CcReal*) args[3].addr; if(!minTime->IsDefined()){ return 0; } local.addr = new syncInfo(args[0], args[1].addr, minTu->GetValue(), (int)(minTime->GetValue()*1000)); return 0; } case REQUEST: result.addr = li?li->next():0; return result.addr?YIELD:CANCEL; case CLOSE: if(li){ delete li; local.addr = 0; } return 0; } return -1; } OperatorSpec syncSpec( "stream x fun x int x real -> stream", "_ sync[_,_,_] ", "After minElements in stream (3rd arg) and " " after minimum t seconds (4th arg), this " " operator evaluates it's parameter function (2nd arg)," " commits the running transaction and starts a new one", " query plz feed sync[ plz feed count, 100, 0.001] count" ); Operator syncOp( "sync", syncSpec.getStr(), syncVM, Operator::SimpleSelect, syncTM ); ListExpr printStreamMessagesTM(ListExpr args){ if(!nl->HasLength(args,1)){ return listutils::typeError("one argument expected"); } if(!listutils::isStream(nl->First(args))){ return listutils::typeError("stream expected"); } return nl->First(args); } int printStreamMessagesVM(Word* args, Word& result, int message, Word& local, Supplier s){ switch(message){ case OPEN : cout << "OPEN" << endl; qp->Open(args[0].addr); return 0; case REQUEST : cout << "REQUEST" << endl; qp->Request(args[0].addr, result); return qp->Received(args[0].addr)?YIELD:CANCEL; case CLOSE : cout << "CLOSE" << endl; qp->Close(args[0].addr); return 0; case REQUESTPROGRESS: cout << "REQUESTPROGRESS" << endl; return qp->RequestProgress(args[0].addr, (ProgressInfo*)result.addr); case CLOSEPROGRESS: cout << "CLOSEPROGRESS" << endl; return 0; default : return -1; } } OperatorSpec printStreamMessagesSpec( "stream(X) -> stream(X) ", "_ op ", "Debug operator, prints out messages send to value mapping.", " query ten feed printStreamMessages count" ); Operator printStreamMessagesOp( "printStreamMessages", printStreamMessagesSpec.getStr(), printStreamMessagesVM, Operator::SimpleSelect, printStreamMessagesTM ); /* Operator ~contains~ */ ListExpr containsTM(ListExpr args){ if(!nl->HasLength(args,2)){ return listutils::typeError("2 arguments expected"); } if(!Stream::checkType(nl->First(args))){ return listutils::typeError("first argument not a stream of DATA"); } if(!Attribute::checkType(nl->Second(args))){ return listutils::typeError("second argument not in kind DATA"); } if(!nl->Equal(nl->Second(nl->First(args)), nl->Second(args))){ return listutils::typeError("stream type and type of second " "argument differ"); } return listutils::basicSymbol(); } int containsVM(Word* args, Word& result, int message, Word& local, Supplier s){ Stream stream(args[0]); Attribute* elem = (Attribute*) args[1].addr; stream.open(); Attribute* a; bool found = false; while(!found && (a=stream.request())!=0){ found = a->Compare(elem) == 0; a->DeleteIfAllowed(); } stream.close(); result = qp->ResultStorage(s); CcBool* res = (CcBool*) result.addr; res->Set(true,found); return 0; } OperatorSpec containsSpec( "stream x D -> bool , with D in DATA", "_ contains _ ", "Checks whether the second argument is element of the stream " "given as the first argument", "query intstream(0,23) contains 10" ); Operator containsOp( "contains", containsSpec.getStr(), containsVM, Operator::SimpleSelect, containsTM ); /* Operator ~some~ */ ListExpr someTM(ListExpr args){ if(!nl->HasLength(args,2)){ return listutils::typeError("two arguments required"); } if(!Stream::checkType(nl->First(args)) && !Stream::checkType(nl->First(args))){ return listutils::typeError("the first argument is not a tuple stream" " and not a stream of DATA"); } if(!CcInt::checkType(nl->Second(args))){ return listutils::typeError("the second argument ist not an int"); } return nl->First(args); } template class someInfo{ public: someInfo(Word _stream, size_t bs): stream(_stream), bufferSize(bs), count(0),currentOut(-1){ stream.open(); init(); } ~someInfo(){ for(size_t i=currentOut+1; i< buffer.size(); i++){ buffer[i]->DeleteIfAllowed(); } stream.close(); } T* next(){ currentOut++; if(currentOut >= (int)buffer.size()){ return 0; } T* res = buffer[currentOut]; buffer[currentOut] = 0; return res; } private: Stream stream; size_t bufferSize; size_t count; int currentOut; std::vector buffer; void init(){ T* tup; while( (tup = stream.request()) != nullptr){ count++; insert(tup); } } void insert(T* tup){ if(buffer.size() < bufferSize){ buffer.push_back(tup); return; } // create a random number between 0 and count-1 // note, count contains already the current tuple size_t rnd = std::rand()/((RAND_MAX + 1u)/count); if(rnd < buffer.size()){ buffer[rnd]->DeleteIfAllowed(); buffer[rnd]=tup; } else { tup->DeleteIfAllowed(); } } }; template int someVMT(Word* args, Word& result, int message, Word& local, Supplier s){ someInfo* li = (someInfo*) local.addr; switch(message){ case OPEN: { if(li) { delete li; local.addr =0; } CcInt* bs = (CcInt*) args[1].addr; if(bs->IsDefined()){ int s = bs->GetValue(); if(s>0) { local.addr = new someInfo(args[0], s); } } return 0; } case REQUEST : result.addr = li?li->next():0; return result.addr?YIELD:CANCEL; case CLOSE : { if(li){ delete li; local.addr = 0; } return 0; } } return -1; } ValueMapping someVM[] = { someVMT, someVMT }; int someSelect(ListExpr args){ return Stream::checkType(nl->First(args))?1:0; } OperatorSpec someSpec( "stream(X) x int -> stream(X) , X in {TUPLE, DATA}", "_ some [_] ", "Creates a random selection from the stream of a given size " "using reservoir sampling.", "query plz feed some[200] count" ); Operator someOp( "some", someSpec.getStr(), 2, someVM, someSelect, someTM ); /* Operator ~sideEffect~ */ ListExpr sideEffectTM(ListExpr args){ if(!nl->HasLength(args,2)){ return listutils::typeError("2 arguments expected"); } if(!listutils::isStream(nl->First(args))){ return listutils::typeError("first argument is not a stream"); } ListExpr streamElem = nl->Second(nl->First(args)); if(!listutils::isMap<2>(nl->Second(args))){ return listutils::typeError("second argument is not a binary function"); } ListExpr fun = nl->Second(args); ListExpr fa1 = nl->Second(fun); ListExpr fa2 = nl->Third(fun); ListExpr funres = nl->Fourth(fun); if(!nl->Equal(streamElem, fa1)){ return listutils::typeError("types of stream and first " "function argument differ"); } if(!CcInt::checkType(fa2)){ return listutils::typeError("second function arguemnt is not an int"); } if(!Attribute::checkType(funres)){ return listutils::typeError("function result not in kind data"); } return nl->First(args); } class sideEffectInfo{ public: sideEffectInfo(Word& _stream, Word& _fun): stream(_stream), fun(_fun), count(0) { funargs = qp->Argument(fun.addr); qp->Open(stream.addr); (*funargs)[1] = SetWord(new CcInt(true,0)); } ~sideEffectInfo(){ ((CcInt*) (*funargs)[1].addr)->DeleteIfAllowed(); qp->Close(stream.addr); } void* next(){ qp->Request(stream.addr, elem); if(!qp->Received(stream.addr)) { return 0; } (*funargs)[0] = elem; ((CcInt*)((*funargs)[1].addr))->Set(true,++count); qp->Request(fun.addr,funres); return elem.addr; } private: Word stream; Word fun; size_t count; ArgVectorPointer funargs; Word funres; Word elem; }; int sideEffectVM(Word* args, Word& result, int message, Word& local, Supplier s){ sideEffectInfo* li = (sideEffectInfo*) local.addr; switch(message){ case OPEN: if(li) delete li; local.addr = new sideEffectInfo(args[0], args[1]); return 0; case REQUEST: result.addr = li?li->next():0; return result.addr?YIELD:CANCEL; case CLOSE : if(li){ delete li; local.addr = 0; } return 0; } return -1; } OperatorSpec sideEffectSpec( "stream(X) x fun(X x int -> a) -> stream(X) , a in DATA", "_ sideEffect[_] ", "Evaluates a function for each element in the stream ignoring the result. " "The second function arguemnt is set automatically to the number of " "the current elements (counting starts with 1)", "query plz feed sideEffect[ 1 echo[.Plz] ] count " ); Operator sideEffectOp{ "sideEffect", sideEffectSpec.getStr(), sideEffectVM, Operator::SimpleSelect, sideEffectTM }; /* 7 Creating the Algebra */ class StreamAlgebra : public Algebra { public: StreamAlgebra() : Algebra() { AddOperator( &streamcount ); AddOperator( &streamprintstream ); AddOperator( &printstream2Op ); AddOperator( &streamtransformstream ); AddOperator( &projecttransformstream ); AddOperator( &namedtransformstream ); AddOperator( &streamfeed ); AddOperator( &streamuse ); AddOperator( &streamuse2 ); AddOperator( &streamaggregateS ); AddOperator( &streamfilter ); AddOperator( ensure_Info("ensure"), ensure_vms, ensure_sf, ensure_tm ); AddOperator( &echo ); AddOperator( realstreamInfo(), realstreamFun, realstreamTypeMap ); AddOperator( intstreamInfo(), intstreamValueMap, intstreamTypeMap ); AddOperator( &STREAMELEM ); AddOperator( &STREAMELEM2 ); AddOperator( &streamtail ); streamtail.SetUsesMemory(); AddOperator( &kinds); AddOperator( &timeout); AddOperator( &isOrdered); AddOperator( &sbufferOp); AddOperator(&intstream2); AddOperator(&mergediffOp); AddOperator(&mergesecOp); AddOperator(&mergeunionOp); AddOperator(&mergeOp); AddOperator(&rdupOp); AddOperator(&xthOp); AddOperator(&minattrOp); AddOperator(&maxattrOp); AddOperator(&nthOp); AddOperator(&avgattrOp); AddOperator(&sumattrOp); AddOperator(&consumeOp); AddOperator(&tsOp); AddOperator(&asOp); AddOperator(&streamfunOp); AddOperator(&progOp); AddOperator(&delaySOp); AddOperator(&syncOp); AddOperator(&printStreamMessagesOp); AddOperator(&containsOp); AddOperator(&someOp); AddOperator(&sideEffectOp); #ifdef USE_PROGRESS streamcount.EnableProgress(); streamtransformstream.EnableProgress(); namedtransformstream.EnableProgress(); streamfeed.EnableProgress(); streamfilter.EnableProgress(); timeout.EnableProgress(); nthOp.EnableProgress(); printStreamMessagesOp.EnableProgress(); #endif std::srand(std::time(nullptr)); } ~StreamAlgebra() {}; }; /* 7 Initialization Each algebra module needs an initialization function. The algebra manager has a reference to this function if this algebra is included in the list of required algebras, thus forcing the linker to include this module. The algebra manager invokes this function to get a reference to the instance of the algebra class and to provide references to the global nested list container (used to store constructor, type, operator and object information) and to the query processor. The function has a C interface to make it possible to load the algebra dynamically at runtime. */ extern "C" Algebra* InitializeStreamAlgebra( NestedList* nlRef, QueryProcessor* qpRef ) { nl = nlRef; qp = qpRef; return (new StreamAlgebra()); }