Files
secondo/Algebras/Stream/StreamAlgebra.cpp
2026-01-23 17:03:45 +08:00

7193 lines
186 KiB
C++

/*
----
This file is part of SECONDO.
Copyright (C) 2004, University in Hagen, Department of Computer Science,
Database Systems for New Applications.
SECONDO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
SECONDO is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SECONDO; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
----
//paragraph [1] Title: [{\Large \bf \begin{center}] [\end{center}}]
//paragraph [10] Footnote: [{\footnote{] [}}]
//[ue] [\"u]
//[ae] [\"a]
//[_] [\_]
//[TOC] [\tableofcontents]
[1] StreamAlgebra - Implementing Generalized (non-tuple) Streams of Objects
December 2006, Initial version implemented by Christian D[ue]ntgen,
Faculty for Mathematics and Informatics,
LG Database Systems for New Applications
Feruniversit[ae]t in Hagen.
----
State Operator/Signatures
OK use: (stream X) (map X Y) --> (stream Y)
OK (stream X) (map X (stream Y)) --> (stream Y)
OK use2: (stream X) Y (map X Y Z) --> (stream Z)
OK (stream X) Y (map X Y stream(Z)) --> (stream Z)
OK X (stream Y) (map X y Z) --> (stream Z)
OK X (stream Y) (map X y (stream Z)) --> (stream Z)
OK (stream X) (stream Y) (map X Y Z) --> (stream Z)
OK (stream X) (stream Y) (map X Y (stream Z)) --> (stream Z)
for X,Y,Z of kind DATA
OK feed: T --> (stream T)
OK transformstream: stream(tuple((Id T))) --> (stream T)
OK (stream T) --> stream(tuple((Elem T)))
OK aggregateS: (stream T) x (T x T --> T) x T --> T
OK count: (stream T) --> int
OK filter: ((stream T) (map T bool)) --> int
OK printstream: (stream T) --> (stream T)
projecttransformstream: stream(tuple((a1 t1) ..(an tn))) x ai -> stream(ti)
COMMENTS:
(*): These operators have been implemented for
T in {bool, int, real, point}
(**): These operators have been implemented for
T in {bool, int, real, point, string, region}
Key to STATE of implementation:
OK : Operator has been implemented and fully tested
(OK): Operator has been implemented and partially tested
Test: Operator has been implemented, but tests have not been done
Pre : Operator has not been functionally implemented, but
stubs (dummy code) exist
n/a : Neither functionally nor dummy code exists for this ones
+ : Equivalent exists for according mType
- : Does nor exist for according mType
? : It is unclear, whether it exists or not
----
*/
/*
0. Bug-List
----
(none known)
Key:
(C): system crash
(R): Wrong result
----
*/
/*
[TOC]
1 Overview
This file contains the implementation of the stream operators.
2 Defines, includes, and constants
*/
#include <cmath>
#include <stack>
#include <deque>
#include <limits>
#include <sstream>
#include <vector>
#include <fstream>
#include <time.h>
#include <errno.h>
#include <utility>
#include <unistd.h>
#include <sys/time.h>
#include "CostEstimation.h"
#include "NestedList.h"
#include "QueryProcessor.h"
#include "AlgebraManager.h"
#include "Algebra.h"
#include "StandardTypes.h"
#include "Algebras/Relation-C++/RelationAlgebra.h"
#include "SecondoSystem.h"
#include "Symbols.h"
#include "NList.h"
#include "ListUtils.h"
#include "Progress.h"
#include "AlmostEqual.h"
#include "Stream.h"
#include "Algebras/Standard-C++/LongInt.h"
extern NestedList* nl;
extern QueryProcessor* qp;
extern AlgebraManager* am;
using namespace std;
// #define GSA_DEBUG
#define STRALG_DEBUG false
#define DEBUGMESSAGE(MESSAGE) if(STRALG_DEBUG) cout << __PRETTY_FUNCTION__ \
<< " (" << __FILE__ << ":" << __LINE__ << ") " << MESSAGE << endl
/*
4 General Selection functions
*/
/*
5 Implementation of Algebra Operators
*/
/*
5.19 Operator ~feed~
The operator is used to cast a single value T to a (stream T)
having a single element of type T.
5.19.1 Type Mapping for ~feed~
---- DATA -> stream(DATA)
----
*/
ListExpr
TypeMapStreamfeed( ListExpr args )
{
// if it is a stream, just pass the incoming stream
if(nl->HasLength(args,1) && listutils::isStream(nl->First(args))){
return nl->First(args);
}
string err = "one argument of kind DATA expected";
if(nl->ListLength(args)!=1){
return listutils::typeError(err);
}
ListExpr arg1 = nl->First(args);
if(Tuple::checkType(arg1)){
return Stream<Tuple>::wrap(arg1);
}
if(!listutils::isDATA(arg1)){
return listutils::typeError(err);
}
if(nl->ListLength(arg1) == 2){
ListExpr outerType = nl->First(arg1);
if (nl->IsAtom(outerType) && (nl->AtomType(outerType)==SymbolType))
{
if ((nl->SymbolValue(outerType)=="arel2")
|| (nl->SymbolValue(outerType)=="nrel2"))
{
return listutils::typeError("arel2 and nrel2 are processed "
"by NestedRelation2Algebra");
}
}
}
return Stream<Attribute>::wrap(arg1);
}
/*
5.19.2 Value Mapping for ~feed~
T may be of type Attribute or Tuple
*/
struct SFeedLocalInfo
{
bool finished;
bool sonIsObjectNode;
bool progressinitialized;
double* attrSize;
double* attrSizeExt;
int noAttributes;
SFeedLocalInfo(Attribute* arg, const bool isObject):
finished( false ),
sonIsObjectNode( isObject ),
progressinitialized( false )
{
double coresize = arg->Sizeof();
double flobsize = 0.0;
for(int i=0; i < arg->NumOfFLOBs(); i++){
flobsize += arg->GetFLOB(i)->getSize();
}
attrSize = new double[1];
attrSize[0] = coresize + flobsize;
attrSizeExt = new double[1];
attrSizeExt[0] = coresize;
noAttributes = 1;
}
SFeedLocalInfo(Tuple* arg, const bool isObject):
finished( false ),
sonIsObjectNode( isObject ),
progressinitialized( false )
{
attrSize = new double[1];
attrSize[0] = arg->GetExtSize();
attrSizeExt = new double[1];
attrSizeExt[0] = arg->GetRootSize();
noAttributes = arg->GetNoAttributes();
}
~SFeedLocalInfo(){
if(attrSize) {delete[] attrSize; attrSize = 0;}
if(attrSizeExt) {delete[] attrSizeExt; attrSizeExt = 0;}
}
};
template<class T>
int MappingStreamFeed( Word* args, Word& result, int message,
Word& local, Supplier s )
{
SFeedLocalInfo *linfo;
T* arg = (static_cast<T*>(args[0].addr));
switch( message ){
case OPEN:{
linfo = static_cast<SFeedLocalInfo*>(local.addr);
if(linfo){
delete linfo;
}
linfo = new SFeedLocalInfo(arg,qp->IsObjectNode(qp->GetSupplierSon(s,0)));
local.setAddr(linfo);
return 0;
}
case REQUEST:{
if ( local.addr == 0 )
return CANCEL;
linfo = static_cast<SFeedLocalInfo*>(local.addr);
if ( linfo->finished )
return CANCEL;
result.setAddr(arg->Clone());
linfo->finished = true;
return YIELD;
}
case CLOSE:{
// localinfo is disposed by CLOSEPROGRESS
return 0;
}
case CLOSEPROGRESS:{
if ( local.addr )
{
linfo = static_cast<SFeedLocalInfo*>(local.addr);
delete linfo;
local.setAddr(0);
}
return 0;
}
case REQUESTPROGRESS:{
linfo = static_cast<SFeedLocalInfo*>(local.addr);
if(!linfo){
return CANCEL;
}
ProgressInfo *pRes;
pRes = (ProgressInfo*) result.addr;
ProgressInfo p1;
if( !linfo->sonIsObjectNode){
if(!qp->RequestProgress(qp->GetSupplierSon(s,0), &p1) ) {
return CANCEL;
}
// the son is a computed result node
// just copy everything
pRes->CopyBlocking(p1);
pRes->Time = p1.Time;
} else {
// the son is a database object
pRes->BTime = 0.00001; // no blocking time
pRes->BProgress = 1.0; // non-blocking
pRes->Time = 0.00001; // (almost) zero runtime
}
if(linfo->progressinitialized){
pRes->sizesChanged = false;
linfo->progressinitialized = true;
} else {
pRes->sizesChanged = true;
}
pRes->Card = 1; // cardinality
pRes->Size = linfo->attrSize[0]; // total size
pRes->SizeExt = linfo->attrSizeExt[0]; // size w/o FLOBS
pRes->noAttrs = linfo->noAttributes; //no of attributes
pRes->attrSize = linfo->attrSize;
pRes->attrSizeExt = linfo->attrSizeExt;
pRes->sizesChanged = true; //sizes have been recomputed
if(linfo->finished){
pRes->Progress = 1.0;
pRes->Time = 0.00001;
}
return YIELD;
}
} // switch
return -1; // should not be reached
}
int MappingStreamFeedStream( Word* args, Word& result, int message,
Word& local, Supplier s )
{
switch(message){
case OPEN:
qp->Open(args[0].addr);
return 0;
case REQUEST:
qp->Request(args[0].addr, result);
return qp->Received(args[0].addr)? YIELD:CANCEL;
case CLOSE:
qp->Close(args[0].addr);
return 0;
case CLOSEPROGRESS: break;
case REQUESTPROGRESS:
ProgressInfo* pRes;
pRes = (ProgressInfo*) result.addr;
if ( qp->RequestProgress(args[0].addr, pRes) ){
return YIELD;
} else {
result.addr = 0;
return CANCEL;
}
break;
}
return 0;
}
/*
5.19.3 Specification for operator ~feed~
*/
const string
StreamSpecfeed=
"( ( \"Signature\" \"Syntax\" \"Meaning\" \"Example\" ) "
"( <text>For T in kind DATA:\n"
"T -> (stream T)</text--->"
"<text>_ feed</text--->"
"<text>create a single-value stream from "
"a single value.</text--->"
"<text>query [const int value 5] feed count;</text---> ) )";
/*
5.19.4 Selection Function of operator ~feed~
*/
ValueMapping streamfeedmap[] = { MappingStreamFeed<Attribute>,
MappingStreamFeedStream,
MappingStreamFeed<Tuple> };
int StreamfeedSelect( ListExpr args )
{
if(listutils::isStream(nl->First(args))) return 1;
return Attribute::checkType(nl->First(args))?0:2;
}
/*
5.19.5 Definition of operator ~feed~
*/
Operator streamfeed( "feed",
StreamSpecfeed,
3,
streamfeedmap,
StreamfeedSelect,
TypeMapStreamfeed);
/*
5.20 Operator ~use~
The ~use~ class of operators implements a set of functors, that derive
stream-valued operators from operators taking scalar arguments and returning
scalar values or streams of values:
----
use: (stream X) (map X Y) -> (stream Y)
(stream X) (map X (stream Y)) -> (stream Y)
(stream X) Y (map X Y Z) -> (stream Z)
(stream X) Y (map X Y stream(Z)) -> (stream Z)
X (stream Y) (map X y Z) -> (stream Z)
X (stream Y) (map X y (stream Z)) -> (stream Z)
(stream X) (stream Y) (map X Y Z) -> (stream Z)
(stream X) (stream Y) (map X Y (stream Z)) -> (stream Z)
for X,Y,Z of kind DATA
----
5.20.1 Type Mapping for ~use~
*/
ListExpr
TypeMapUse( ListExpr args )
{
string err="stream(S) x ( S -> T) , S,T in DATA expected";
if(!nl->HasLength(args,2)){
return listutils::typeError(err);
}
if(!Stream<Attribute>::checkType(nl->First(args)) &&
!Stream<Tuple>::checkType(nl->First(args))){
return listutils::typeError(err);
}
if(!listutils::isMap<1>(nl->Second(args))){
return listutils::typeError(err);
}
ListExpr streamType = nl->Second(nl->First(args));
ListExpr funArg = nl->Second(nl->Second(args));
ListExpr funRes = nl->Third(nl->Second(args));
if(!nl->Equal(streamType,funArg)){
return listutils::typeError(err + " (stream type different "
"to function argument)");
}
if(Stream<Attribute>::checkType(funRes)){
return funRes;
}
if(!listutils::isDATA(funRes)){
return listutils::typeError(err + " (result of function "
"not in kind DATA)");
}
return nl->TwoElemList( nl->SymbolAtom(Stream<Attribute>::BasicType()),
funRes);
}
ListExpr
TypeMapUse2( ListExpr args )
{
string outstr1, outstr2; // output strings
ListExpr sarg1, sarg2, map; // arguments to use
ListExpr marg1, marg2, mres; // argument to mapping
ListExpr sarg1Type, sarg2Type, sresType; // 'flat' arg type
ListExpr argConfDescriptor;
bool
sarg1isstream = false,
sarg2isstream = false,
resisstream = false;
int argConfCode = 0;
// 0. Check number of arguments
if ( (nl->ListLength( args ) != 3) )
{
ErrorReporter::ReportError("Operator use2 expects a list of "
"length three ");
return nl->SymbolAtom( Symbol::TYPEERROR() );
}
// 1. get use arguments
sarg1 = nl->First( args );
sarg2 = nl->Second( args );
map = nl->Third( args );
// check basics
if(!Stream<Tuple>::checkType(sarg1) && !Stream<Attribute>::checkType(sarg1) &&
!Attribute::checkType(sarg1)){
return listutils::typeError("first argument has to be a tuple stream, "
"a data stream or an attribute");
}
if(!Stream<Tuple>::checkType(sarg2) && !Stream<Attribute>::checkType(sarg2) &&
!Attribute::checkType(sarg2)){
return listutils::typeError("second argument has to be a tuple stream, "
"a data stream or an attribute");
}
if(!listutils::isMap<2>(map)){
return listutils::typeError("third argument is not a map having 2"
" arguments");
}
// 2. First argument
// check sarg1 for being a stream
if( Attribute::checkType(sarg1)) { // attribute
sarg1Type = sarg1;
sarg1isstream = false;
} else { // stream
sarg1Type = nl->Second(sarg1);
sarg1isstream = true;
}
// 3. Second Argument
// check sarg2 for being a stream
if( Attribute::checkType(sarg2)) { // attribute
sarg2Type = sarg2;
sarg2isstream = false;
} else { // stream
sarg2Type = nl->Second(sarg2);
sarg2isstream = true;
}
// 4. First and Second argument
// check whether at least one stream argument is present
if ( !sarg1isstream && !sarg2isstream ) {
return listutils::typeError("at leat one of the first two args "
" must be a stream(T), with T in"
" {DATA, tuple}");
}
// 5. Third argument
// get map arguments
marg1 = nl->Second(map);
marg2 = nl->Third(map);
mres = nl->Fourth(map);
// check marg1
if ( !( nl->Equal(marg1, sarg1Type) ) )
{
nl->WriteToString(outstr1, sarg1Type);
nl->WriteToString(outstr2, marg1);
ErrorReporter::ReportError("Operator use2: 1st argument's stream"
"type does not match the type of the "
"mapping's 1st argument. If e.g. the first "
"is 'stream X', then the latter must be 'X'."
"The types passed are '" + outstr1 +
"' and '" + outstr2 + "'.");
return nl->SymbolAtom( Symbol::TYPEERROR() );
}
// check marg2
if ( !( nl->Equal(marg2, sarg2Type) ) )
{
nl->WriteToString(outstr1, sarg2Type);
nl->WriteToString(outstr2, marg2);
ErrorReporter::ReportError("Operator use2: 2nd argument's stream"
"type does not match the type of the "
"mapping's 2nd argument. If e.g. the second"
" is 'stream X', then the latter must be 'X'."
"The types passed are '" + outstr1 +
"' and '" + outstr2 + "'.");
return nl->SymbolAtom( Symbol::TYPEERROR() );
}
// 6. Determine result type
// get map result type 'sresType'
// may be a stream of T, T in {DATA,TUPLE}
if( Stream<Attribute>::checkType(mres)
|| Stream<Tuple>::checkType(mres)){
resisstream = true;
sresType = mres; // map result type is already a stream
} else if( Attribute::checkType(mres)
|| Tuple::checkType(mres)){
resisstream = false;
sresType = nl->TwoElemList(nl->SymbolAtom(Symbol::STREAM()), mres);
} else {
return listutils::typeError("result of the map must be T or "
"stream(T), with T in {DATA,TUPLE}");
}
// 7. This check can be removed when operators working on tuplestreams have
// been implemented:
if ( Tuple::checkType(sarg1Type)
|| Stream<Tuple>::checkType(sarg1Type)){
return listutils::typeError("use2: support for tuple not implemented");
}
// 8. Append flags describing argument configuration for value mapping:
// 0: no stream
// 1: sarg1 is a stream
// 2: sarg2 is a stream
// 4: map result is a stream
//
// e.g. 7=4+2+1: both arguments are streams and the
// map result is a stream
if(sarg1isstream) argConfCode += 1;
if(sarg2isstream) argConfCode += 2;
if(resisstream) argConfCode += 4;
argConfDescriptor = nl->OneElemList(nl->IntAtom(argConfCode));
return nl->ThreeElemList(nl->SymbolAtom(Symbol::APPEND()),
argConfDescriptor, sresType);
}
/*
5.20.2 Value Mapping for ~use~
*/
struct UseLocalInfo{
bool Xfinished, Yfinished, funfinished; // whether we have finished
Word X, Y, fun; // pointers to the argument nodes
Word XVal, YVal, funVal; // the last arg values
int argConfDescriptor; // type of argument configuration
};
// (stream X) (map X Y) -> (stream Y)
int Use_SN( Word* args, Word& result, int message,
Word& local, Supplier s )
{
UseLocalInfo *sli;
Word instream = args[0], fun = args[1];
Word funResult, argValue;
ArgVectorPointer funArgs;
switch (message)
{
case OPEN :
#ifdef GSA_DEBUG
cout << "Use_SN received OPEN" << endl;
#endif
sli = new UseLocalInfo;
sli->Xfinished = true;
qp->Open(instream.addr);
sli->Xfinished = false;
local.setAddr(sli);
#ifdef GSA_DEBUG
cout << "Use_SN finished OPEN" << endl;
#endif
return 0;
case REQUEST :
// For each REQUEST, we get one value from the stream,
// pass it to the parameter function and evalute the latter.
// The result is simply passed on.
#ifdef GSA_DEBUG
cout << "Use_SN received REQUEST" << endl;
#endif
if( local.addr == 0 )
{
#ifdef GSA_DEBUG
cout << "Use_SN finished REQUEST: CANCEL (1)" << endl;
#endif
return CANCEL;
}
sli = (UseLocalInfo*)local.addr;
if (sli->Xfinished)
{
#ifdef GSA_DEBUG
cout << "Use_SN finished REQUEST: CANCEL (2)" << endl;
#endif
return CANCEL;
}
funResult.addr = 0;
argValue.addr = 0;
qp->Request(instream.addr, argValue); // get one arg value from stream
if(qp->Received(instream.addr))
{
funArgs = qp->Argument(fun.addr); // set argument for the
(*funArgs)[0] = argValue; // parameter function
qp->Request(fun.addr, funResult); // call parameter function
// copy result:
result.setAddr(((Attribute*) (funResult.addr))->Clone());
((Attribute*) (argValue.addr))->DeleteIfAllowed(); // delete argument
#ifdef GSA_DEBUG
cout << " result.addr =" << result.addr << endl;
#endif
argValue.addr = 0;
#ifdef GSA_DEBUG
cout << "Use_SN finished REQUEST: YIELD" << endl;
#endif
return YIELD;
}
else // (input stream consumed completely)
{
qp->Close(instream.addr);
sli->Xfinished = true;
result.addr = 0;
#ifdef GSA_DEBUG
cout << "Use_SN finished REQUEST: CANCEL (3)" << endl;
#endif
return CANCEL;
}
case CLOSE :
#ifdef GSA_DEBUG
cout << "Use_SN received CLOSE" << endl;
#endif
if( local.addr != 0 )
{
sli = (UseLocalInfo*)local.addr;
if ( !sli->Xfinished )
qp->Close( instream.addr );
delete sli;
local.setAddr(0);
}
#ifdef GSA_DEBUG
cout << "Use_SN finished CLOSE" << endl;
#endif
return 0;
} // end switch
cout << "Use_SN received UNKNOWN COMMAND" << endl;
return -1; // should not be reached
}
// (stream X) (map X (stream Y)) -> (stream Y)
int Use_SS( Word* args, Word& result, int message,
Word& local, Supplier s )
{
UseLocalInfo *sli;
Word funResult;
ArgVectorPointer funargs;
switch (message)
{
case OPEN :
sli = new UseLocalInfo;
sli->X.setAddr( args[0].addr );
sli->fun.setAddr( args[1].addr );
sli->Xfinished = true;
sli->funfinished = true;
sli->XVal.addr = 0;
// open the ("outer") input stream and
qp->Open( sli->X.addr );
sli->Xfinished = false;
// save the local information
local.setAddr(sli);
return 0;
case REQUEST :
// For each value from the 'outer' stream, an 'inner' stream
// of values is generated by the parameter function.
// For each REQUEST, we pass one value from the 'inner' stream
// as the result value.
// If the inner stream is consumed, we try to get a new value
// from the 'outer' stream and re-open the inner stream
#ifdef GSA_DEBUG
cout << "\nUse_SS: Received REQUEST";
#endif
//1. recover local information
if( local.addr == 0 )
return CANCEL;
sli = (UseLocalInfo*)local.addr;
// create the next result
while( !sli->Xfinished )
{
if( sli->funfinished )
{// end of map result stream reached -> get next X
qp->Request( sli->X.addr, sli->XVal);
if (!qp->Received( sli->X.addr ))
{ // Stream X is exhaused
qp->Close( sli->X.addr );
sli->Xfinished = true;
result.addr = 0;
return CANCEL;
} // got an X-elem
funargs = qp->Argument( sli->fun.addr );
(*funargs)[0] = sli->XVal;
qp->Open( sli->fun.addr );
sli->funfinished = false;
} // Now, we have an open map result stream
qp->Request( sli->fun.addr, funResult );
if(qp->Received( sli->fun.addr ))
{ // cloning and passing the result
result.setAddr(((Attribute*) (funResult.addr))->Clone());
((Attribute*) (funResult.addr))->DeleteIfAllowed();
#ifdef GSA_DEBUG
cout << " result.addr=" << result.addr << endl;
#endif
return YIELD;
}
else
{ // end of map result stream reached
qp->Close( sli->fun.addr );
sli->funfinished = true;
((Attribute*) (sli->XVal.addr))->DeleteIfAllowed();
}
} // end while
case CLOSE :
if( local.addr != 0 )
{
sli = (UseLocalInfo*)local.addr;
if( !sli->funfinished )
{
qp->Close( sli->fun.addr );
((Attribute*)(sli->X.addr))->DeleteIfAllowed();
}
if ( !sli->Xfinished )
qp->Close( sli->X.addr );
delete sli;
local.setAddr(0);
}
return 0;
} // end switch
cout << "\nUse_SS received UNKNOWN COMMAND" << endl;
return -1; // should not be reached
}
// case stream(Tuple) x (tuple -> DATA)
int Use_TsN( Word* args, Word& result, int message,
Word& local, Supplier s ) {
switch(message){
case OPEN: {
qp->Open(args[0].addr);
return 0;
}
case REQUEST: {
Word tuple;
qp->Request(args[0].addr, tuple);
if(qp->Received(args[0].addr)){
ArgVectorPointer funarg = qp->Argument(args[1].addr);
(*funarg)[0] = tuple;
Word funRes;
qp->Request(args[1].addr, funRes);
Attribute* res = (Attribute*) funRes.addr;
result.addr = res->Clone();
((Tuple*) tuple.addr)->DeleteIfAllowed();
return YIELD;
} else {
return CANCEL;
}
}
case CLOSE: {
qp->Close(args[0].addr);
return 0;
}
default: assert(false); // unknonwn message
}
return -1;
}
struct UseTsSLocal{
UseTsSLocal(): funOpened(false), currentTuple(0){}
bool funOpened;
Tuple* currentTuple;
};
// case stream(Tuple) x (tuple -> stream(DATA) )
int Use_TsS( Word* args, Word& result, int message,
Word& local, Supplier s ) {
UseTsSLocal* li = (UseTsSLocal*) local.addr;
switch (message){
case OPEN: {
if(li){
if(li->funOpened){
qp->Close(args[1].addr);
li->funOpened = false;
}
if(li->currentTuple){
li->currentTuple->DeleteIfAllowed();
li->currentTuple = 0;
}
} else {
local.addr = new UseTsSLocal();
}
qp->Open(args[0].addr);
return 0;
}
case REQUEST: {
if(!li){
return CANCEL;
}
result.addr = 0;
while(!result.addr){
if(!li->funOpened){ // next elem from input stream required
Word t;
qp->Request(args[0].addr,t);
if(!qp->Received(args[0].addr)){
return CANCEL;
}
if(li->currentTuple){
li->currentTuple->DeleteIfAllowed();
}
li->currentTuple = (Tuple*) t.addr;
ArgVectorPointer funargs = qp->Argument(args[1].addr);
(*funargs)[0] = t;
qp->Open(args[1].addr);
li->funOpened = true;
}
// evaluate function
Word funRes;
qp->Request(args[1].addr, funRes);
if(qp->Received(args[1].addr)){
result.addr = funRes.addr;
} else {
qp->Close(args[1].addr);
li->funOpened = false;
}
}
return YIELD;
}
case CLOSE: {
if(li){
if(li->funOpened){
qp->Close(args[1].addr);
}
if(li->currentTuple){
li->currentTuple->DeleteIfAllowed();
}
delete li;
local.addr = 0;
}
return 0;
}
default : return -1;
}
}
// (stream X) Y (map X Y Z) -> (stream Z)
// X (stream Y) (map X y Z) -> (stream Z)
int Use_SNN( Word* args, Word& result, int message,
Word& local, Supplier s )
{
UseLocalInfo *sli;
Word xval, funresult;
ArgVectorPointer funargs;
switch (message)
{
case OPEN :
#ifdef GSA_DEBUG
cout << "\nUse_SNN received OPEN" << endl;
#endif
sli = new UseLocalInfo ;
sli->Xfinished = true;
sli->X.addr = 0;
sli->Y.addr = 0;
sli->fun.setAddr(args[2].addr);
// get argument configuration info
sli->argConfDescriptor = ((CcInt*)args[3].addr)->GetIntval();
if(sli->argConfDescriptor & 4)
{
delete( sli );
local.addr = 0;
#ifdef GSA_DEBUG
cout << "\nUse_SNN was called with stream result mapping!" << endl;
#endif
return 0;
}
if(sli->argConfDescriptor & 1)
{ // the first arg is the stream
sli->X.setAddr(args[0].addr); // X is the stream
sli->Y.setAddr(args[1].addr); // Y is the constant value
}
else
{ // the second arg is the stream
sli->X.setAddr(args[1].addr); // X is the stream
sli->Y.setAddr(args[0].addr); // Y is the constant value
}
qp->Open(sli->X.addr); // open outer stream argument
sli->Xfinished = false;
local.setAddr(sli);
#ifdef GSA_DEBUG
cout << "Use_SNN finished OPEN" << endl;
#endif
return 0;
case REQUEST :
// For each REQUEST, we get one value from the stream,
// pass it (and the remaining constant argument) to the parameter
// function and evalute the latter. The result is simply passed on.
// sli->X is the stream, sli->Y the constant argument.
#ifdef GSA_DEBUG
cout << "Use_SNN received REQUEST" << endl;
#endif
// 1. get local data object
if (local.addr == 0)
{
result.addr = 0;
#ifdef GSA_DEBUG
cout << "Use_SNN finished REQUEST: CANCEL (1)" << endl;
#endif
return CANCEL;
}
sli = (UseLocalInfo*) local.addr;
if (sli->Xfinished)
{ // stream already exhausted earlier
result.addr = 0;
#ifdef GSA_DEBUG
cout << "Use_SNN finished REQUEST: CANCEL (2)" << endl;
#endif
return CANCEL;
}
// 2. request value from outer stream
qp->Request( sli->X.addr, xval );
if(!qp->Received( sli->X.addr ))
{ // stream exhausted now
qp->Close( sli->X.addr );
sli->Xfinished = true;
#ifdef GSA_DEBUG
cout << "Use_SNN finished REQUEST: CANCEL (3)" << endl;
#endif
return CANCEL;
}
// 3. call parameter function, delete args and return result
funargs = qp->Argument( sli->fun.addr );
if (sli->argConfDescriptor & 1)
{
(*funargs)[0] = xval;
(*funargs)[1] = sli->Y;
}
else
{
(*funargs)[0] = sli->Y;
(*funargs)[1] = xval;
}
qp->Request( sli->fun.addr, funresult );
result.setAddr(((Attribute*) (funresult.addr))->Clone());
#ifdef GSA_DEBUG
cout << " result.addr=" << result.addr << endl;
#endif
((Attribute*) (xval.addr))->DeleteIfAllowed();
#ifdef GSA_DEBUG
cout << "Use_SNN finished REQUEST: YIELD" << endl;
#endif
return YIELD;
case CLOSE :
#ifdef GSA_DEBUG
cout << "Use_SNN received CLOSE" << endl;
#endif
if( local.addr != 0 )
{
sli = (UseLocalInfo*)local.addr;
if (!sli->Xfinished)
qp->Close( sli->X.addr ); // close input
delete sli;
local.setAddr(0);
}
#ifdef GSA_DEBUG
cout << "Use_SNN finished CLOSE" << endl;
#endif
return 0;
} // end switch
cout << "\nUse_SNN received UNKNOWN COMMAND" << endl;
return -1; // should not be reached
}
// (stream X) Y (map X Y (stream Z)) -> (stream Z)
// X (stream Y) (map X y (stream Z)) -> (stream Z)
int Use_SNS( Word* args, Word& result, int message,
Word& local, Supplier s )
{
UseLocalInfo *sli;
Word funresult;
ArgVectorPointer funargs;
switch (message)
{
case OPEN :
#ifdef GSA_DEBUG
cout << "\nUse_SNS received OPEN" << endl;
#endif
sli = new UseLocalInfo ;
sli->Xfinished = true;
sli->funfinished = true;
sli->X.addr = 0;
sli->Y.addr = 0;
sli->fun.addr = 0;
sli->XVal.addr = 0;
sli->YVal.addr = 0;
// get argument configuration info
sli->argConfDescriptor = ((CcInt*)args[3].addr)->GetIntval();
if(! (sli->argConfDescriptor & 4))
{
delete( sli );
local.addr = 0;
cout << "\nUse_SNS was called with non-stream result mapping!"
<< endl;
return 0;
}
if(sli->argConfDescriptor & 1)
{ // the first arg is the stream
sli->X.setAddr(args[0].addr); // X is the stream
sli->Y.setAddr(args[1].addr); // Y is the constant value
}
else
{ // the second arg is the stream
sli->X.setAddr(args[1].addr); // X is the stream
sli->Y.setAddr(args[0].addr); // Y is the constant value
}
sli->YVal = sli->Y; // save value of constant argument
qp->Open(sli->X.addr); // open the ("outer") input stream
sli->Xfinished = false;
sli->fun.setAddr(args[2].addr);
local.setAddr(sli);
#ifdef GSA_DEBUG
cout << "Use_SNN finished OPEN" << endl;
#endif
return 0;
case REQUEST :
// First, we check whether an inner stream is finished
// (sli->funfinished). If so, we try to get a value from
// the outer stream and try to re-open the inner stream.
// sli->X is a pointer to the OUTER stream,
// sli->Y is a pointer to the constant argument.
#ifdef GSA_DEBUG
cout << "Use_SNN received REQUEST" << endl;
#endif
// 1. get local data object
if (local.addr == 0)
{
result.addr = 0;
#ifdef GSA_DEBUG
cout << "Use_SNN finished REQUEST: CANCEL (1)" << endl;
#endif
return CANCEL;
}
sli = (UseLocalInfo*) local.addr;
// 2. request values from inner stream
while (!sli->Xfinished)
{
while (sli->funfinished)
{ // the inner stream is closed, try to (re-)open it
// try to get the next X-value from outer stream
qp->Request(sli->X.addr, sli->XVal);
if (!qp->Received(sli->X.addr))
{ // stream X exhaused. CANCEL
sli->Xfinished = true;
qp->Close(sli->X.addr);
#ifdef GSA_DEBUG
cout << "Use_SNN finished REQUEST: CANCEL (3)" << endl;
#endif
return CANCEL;
}
funargs = qp->Argument( sli->fun.addr );
if (sli->argConfDescriptor & 1)
{
(*funargs)[0] = sli->XVal;
(*funargs)[1] = sli->YVal;
}
else
{
(*funargs)[0] = sli->YVal;
(*funargs)[1] = sli->XVal;
}
qp->Open( sli->fun.addr );
sli->funfinished = false;
} // end while - Now, the inner stream is open again
qp->Request(sli->fun.addr, funresult);
if (qp->Received(sli->fun.addr))
{ // inner stream returned a result
result.setAddr(((Attribute*) (funresult.addr))->Clone());
((Attribute*) (funresult.addr))->DeleteIfAllowed();
#ifdef GSA_DEBUG
cout << " result.addr=" << result.addr << endl;
cout << "Use_SNN finished REQUEST: YIELD" << endl;
#endif
return YIELD;
}
else{ // inner stream exhausted
qp->Close(sli->fun.addr);
sli->funfinished = true;
((Attribute*)(sli->XVal.addr))->DeleteIfAllowed();
sli->XVal.addr = 0;
}
} // end while
result.addr = 0;
#ifdef GSA_DEBUG
cout << "Use_SNN finished REQUEST: CANCEL (4)" << endl;
#endif
return CANCEL;
case CLOSE :
#ifdef GSA_DEBUG
cout << "Use_SNN received CLOSE" << endl;
#endif
if( local.addr != 0 )
{
sli = (UseLocalInfo*)local.addr;
if (!sli->funfinished)
qp->Close( sli->fun.addr ); // close map result stream
if (!sli->Xfinished)
qp->Close( sli->X.addr ); // close outer stream
delete sli;
local.setAddr(Address(0));
}
#ifdef GSA_DEBUG
cout << "Use_SNN finished CLOSE" << endl;
#endif
return 0;
} // end switch
cout << "Use_SNN received UNKNOWN COMMAND" << endl;
return -1; // should not be reached
}
// (stream X) (stream Y) (map X Y Z) -> (stream Z)
int Use_SSN( Word* args, Word& result, int message,
Word& local, Supplier s )
{
UseLocalInfo *sli;
Word funresult;
ArgVectorPointer funargs;
switch (message)
{
case OPEN :
#ifdef GSA_DEBUG
cout << "\nUse_SSN received OPEN" << endl;
#endif
sli = new UseLocalInfo ;
sli->Xfinished = true;
sli->Yfinished = true;
// get argument configuration info
sli->argConfDescriptor = ((CcInt*)args[3].addr)->GetIntval();
if(sli->argConfDescriptor & 4)
{
delete( sli );
local.addr = 0;
cout << "\nUse_SSN was called with stream result mapping!"
<< endl;
return 0;
}
if(!(sli->argConfDescriptor & 3))
{
delete( sli );
local.addr = 0;
cout << "\nUse_SSN was called with non-stream arguments!"
<< endl;
return 0;
}
sli->X.setAddr(args[0].addr); // X is the stream
sli->Y.setAddr(args[1].addr); // Y is the constant value
sli->fun.setAddr(args[2].addr); // fun is the mapping function
qp->Open(sli->X.addr); // open outer stream argument
sli->Xfinished = false;
local.setAddr(sli);
#ifdef GSA_DEBUG
cout << "Use_SSN finished OPEN" << endl;
#endif
return 0;
case REQUEST :
// We do a nested loop to join the elements of the outer (sli->X)
// and inner (sli->Y) stream. For each pairing, we evaluate the
// parameter function (sli->fun), which return a single result.
// A clone of the result is passed as the result.
// We also need to delete each element, when it is not required
// anymore.
#ifdef GSA_DEBUG
cout << "Use_SSN received REQUEST" << endl;
#endif
// get local data object
if (local.addr == 0)
{
result.addr = 0;
#ifdef GSA_DEBUG
cout << "Use_SSN finished REQUEST: CANCEL (1)" << endl;
#endif
return CANCEL;
}
sli = (UseLocalInfo*) local.addr;
while(!sli->Xfinished)
{
if (sli->Yfinished)
{ // try to (re-) start outer instream
qp->Request(sli->X.addr, sli->XVal);
if (!qp->Received(sli->X.addr))
{ // outer instream exhaused
qp->Close(sli->X.addr);
sli->Xfinished = true;
result.addr = 0;
#ifdef GSA_DEBUG
cout << "Use_SSN finished REQUEST: CANCEL (2)" << endl;
#endif
return CANCEL;
}
// Got next X-elem. (Re-)Start inner instream:
qp->Open(sli->Y.addr);
sli->Yfinished = false;
}
// Now, we have open inner and outer streams
qp->Request(sli->Y.addr, sli->YVal);
if (!qp->Received(sli->Y.addr))
{ // inner stream is exhausted
qp->Close(sli->Y.addr);
// Delete current X-elem:
((Attribute*) (sli->XVal.addr))->DeleteIfAllowed();
sli->Yfinished = true;
}
// got next Y-elem
if (!sli->Xfinished && !sli->Yfinished)
{ // pass parameters and call mapping, clone result
funargs = qp->Argument( sli->fun.addr );
(*funargs)[0] = sli->XVal;
(*funargs)[1] = sli->YVal;
qp->Request( sli->fun.addr, funresult );
result.setAddr(((Attribute*) (funresult.addr))->Clone());
((Attribute*) (sli->YVal.addr))->DeleteIfAllowed();
#ifdef GSA_DEBUG
cout << "Use_SSN finished REQUEST: YIELD" << endl;
#endif
return YIELD;
}
} // end while
#ifdef GSA_DEBUG
cout << "Use_SSN finished REQUEST: CANCEL (3)" << endl;
#endif
return CANCEL;
case CLOSE :
#ifdef GSA_DEBUG
cout << "Use_SSN received CLOSE" << endl;
#endif
if( local.addr != 0 )
{
sli = (UseLocalInfo*)local.addr;
if (!sli->Yfinished)
{
qp->Close( sli->Y.addr ); // close inner instream
// Delete current X-elem:
((Attribute*) (sli->XVal.addr))->DeleteIfAllowed();
}
if (!sli->Xfinished)
qp->Close( sli->X.addr ); // close outer instream
delete sli;
local.setAddr(0);
}
result.addr = 0;
#ifdef GSA_DEBUG
cout << "Use_SSN finished CLOSE" << endl;
#endif
return 0;
} // end switch
result.addr = 0;
cout << "\nUse_SSN received UNKNOWN COMMAND" << endl;
return -1; // should not be reached
}
// (stream X) (stream Y) (map X Y (stream Z)) -> (stream Z)
int Use_SSS( Word* args, Word& result, int message,
Word& local, Supplier s )
{
UseLocalInfo *sli;
Word funresult;
ArgVectorPointer funargs;
switch (message)
{
case OPEN :
#ifdef GSA_DEBUG
cout << "\nUse_SSS received OPEN" << endl;
#endif
sli = new UseLocalInfo ;
sli->Xfinished = true;
sli->Yfinished = true;
sli->funfinished = true;
// get argument configuration info
sli->argConfDescriptor = ((CcInt*)args[3].addr)->GetIntval();
if(!(sli->argConfDescriptor & 4) )
{
delete( sli );
local.addr = 0;
cout << "\nUse_SSS was called with non-stream result mapping!"
<< endl;
return 0;
}
if(!(sli->argConfDescriptor & 3))
{
delete( sli );
local.addr = 0;
cout << "\nUse_SSS was called with non-stream arguments!"
<< endl;
return 0;
}
sli->X = args[0]; // X is the stream
sli->Y = args[1]; // Y is the constant value
sli->fun = args[2]; // fun is the mapping function
qp->Open(sli->X.addr); // open X stream argument
sli->Xfinished = false;
local.setAddr(sli);
#ifdef GSA_DEBUG
cout << "Use_SSS finished OPEN" << endl;
#endif
return 0;
case REQUEST :
// We do a nested loop to join the elements of the outer (sli->X)
// and inner (sli->Y) stream. For each pairing, we open the
// parameter function (sli->fun), which returns a stream result.
// We consume this map result stream one-by-one.
// When it is finally consumed, we try to restart it with the next
// X/Y value pair.
// A clone of the result is passed as the result.
// We also need to delete each X/Y element, when it is not required
// any more.
#ifdef GSA_DEBUG
cout << "Use_SSS received REQUEST" << endl;
#endif
// get local data object
if (local.addr == 0)
{
result.addr = 0;
#ifdef GSA_DEBUG
cout << "Use_SSS finished REQUEST: CANCEL (1)" << endl;
#endif
return CANCEL;
}
sli = (UseLocalInfo*) local.addr;
while(!sli->Xfinished)
{
if (sli->Yfinished)
{ // get next X-value from outer instream
// and restart inner (Y-) instream
qp->Request(sli->X.addr, sli->XVal);
if (!qp->Received(sli->X.addr))
{ // X-instream exhaused
qp->Close(sli->X.addr);
sli->Xfinished = true;
#ifdef GSA_DEBUG
cout << "Use_SSS finished REQUEST: CANCEL (2)" << endl;
#endif
result.addr = 0;
return CANCEL;
}
// Got next X-elem. (Re-)Start inner Y-instream:
qp->Open(sli->Y.addr);
sli->Yfinished = false;
} // Now, we have open X- and Y- streams
if (sli->funfinished)
{ // get next Y-value from inner instream
// and open new map result stream
qp->Request(sli->Y.addr, sli->YVal);
if (!qp->Received(sli->Y.addr))
{
qp->Close(sli->Y.addr);
((Attribute*) (sli->XVal.addr))->DeleteIfAllowed();
sli->Yfinished = true;
}
else
{
funargs = qp->Argument( sli->fun.addr );
(*funargs)[0] = sli->XVal;
(*funargs)[1] = sli->YVal;
qp->Open( sli->fun.addr );
sli->funfinished = false;
}
}
// Now, we have an open map result streams
if (!sli->Xfinished && !sli->Yfinished && !sli->funfinished)
{ // pass parameters and call mapping, clone result
funargs = qp->Argument( sli->fun.addr );
(*funargs)[0] = sli->XVal;
(*funargs)[1] = sli->YVal;
qp->Request( sli->fun.addr, funresult );
if ( qp->Received(sli->fun.addr) )
{ // got a value from map result stream
result.setAddr(((Attribute*)(funresult.addr))->Clone());
((Attribute*) (funresult.addr))->DeleteIfAllowed();
#ifdef GSA_DEBUG
cout << "Use_SSS finished REQUEST: YIELD" << endl;
#endif
return YIELD;
}
else
{ // map result stream exhausted
qp->Close( sli->fun.addr) ;
((Attribute*) (sli->YVal.addr))->DeleteIfAllowed();
sli->funfinished = true;
} // try to restart with new X/Y pairing
}
} // end while
result.addr = 0;
#ifdef GSA_DEBUG
cout << "Use_SSS finished REQUEST: CANCEL (3)" << endl;
#endif
return CANCEL;
case CLOSE :
#ifdef GSA_DEBUG
cout << "Use_SSS received CLOSE" << endl;
#endif
if( local.addr != 0 )
{
sli = (UseLocalInfo*)local.addr;
if (!sli->funfinished)
{
qp->Close( sli->fun.addr ); // close map result stream
// Delete current Y-elem:
((Attribute*) (sli->YVal.addr))->DeleteIfAllowed();
}
if (!sli->Yfinished)
{
qp->Close( sli->Y.addr ); // close inner instream
// Delete current X-elem:
((Attribute*) (sli->XVal.addr))->DeleteIfAllowed();
}
if (!sli->Xfinished)
qp->Close( sli->X.addr ); // close outer instream
delete sli;
local.setAddr(0);
}
#ifdef GSA_DEBUG
cout << "Use_SSS finished CLOSE" << endl;
#endif
return 0;
} // end switch
cout << "\nUse_SSS received UNKNOWN COMMAND" << endl;
return -1; // should not be reached
}
/*
5.20.3 Specification for operator ~use~
*/
const string
StreamSpecUse=
"( ( \"Signature\" \"Syntax\" \"Meaning\" \"Example\" ) "
"( <text>For X in kind DATA or X = tuple(Z)*, Y in kind DATA:\n"
"(*: not yet implemented)\n"
"((stream X) (map X Y) ) -> (stream Y) \n"
"((stream X) (map X (stream Y))) -> (stream Y)</text--->"
"<text>_ use [ _ ]</text--->"
"<text>The use class of operators implements "
"a set of functors, that derive stream-valued "
"operators from operators taking scalar "
"arguments and returning scalar values or "
"streams of values.</text--->"
"<text>query intstream(1,5) use[ fun(i:int) i*i ] printstream count;\n"
"query intstream(1,5) use[ fun(i:int) intstream(i,5) ] printstream count;"
"</text---> ) )";
const string
StreamSpecUse2=
"( ( \"Signature\" \"Syntax\" \"Meaning\" \"Example\" ) "
"( <text>For X in kind DATA or X = tuple(W)*, Y,Z in kind DATA:\n"
"(*: not yet implemented)\n"
"((stream X) Y (map X Y Z) ) -> (stream Z) \n"
"((stream X) Y (map X Y stream(Z)) ) -> (stream Z) \n"
"(X (stream Y) (map X Y Z) ) -> (stream Z) \n"
"(X (stream Y) (map X Y (stream Z))) -> (stream Z) \n"
"((stream X) (stream Y) (map X Y Z) ) -> (stream Z) \n"
"((stream X) (stream Y) (map X Y (stream Z))) -> (stream Z)</text--->"
"<text>_ _ use2 [ _ ]</text--->"
"<text>The use2 class of operators implements "
"a set of functors, that derive stream-valued "
"operators from operators taking scalar "
"arguments and returning scalar values or "
"streams of values. use2 performs a product "
"between the two first of its arguments, passing each "
"combination to the mapped function once.</text--->"
"<text>query intstream(1,5) [const int value 5] use2[ fun(i:int, j:int) "
"intstream(i,j) ] printstream count;\n"
"query [const int value 3] intstream(1,5) use2[ fun(i:int, j:int) i+j ] "
"printstream count;\n"
"query intstream(1,5) [const int value 3] use2[ fun(i:int, j:int) i+j ] "
"printstream count;\n"
"query [const int value 2] intstream(1,5) use2[ fun(i:int, j:int) "
"intstream(i,j) ] printstream count;\n"
"query [const int value 3] intstream(1,5) use2[ fun(i:int, j:int) "
"intstream(i,j) ] printstream count;\n"
"query intstream(1,2) intstream(1,3) use2[ fun(i:int, j:int) "
"intstream(i,j) ] printstream count;</text---> ) )";
/*
5.20.4 Selection Function of operator ~use~
*/
ValueMapping streamusemap[] =
{ Use_SN,
Use_SS,
Use_TsN,
Use_TsS
};
int
streamUseSelect( ListExpr args )
{
ListExpr stream = nl->First(args);
ListExpr funRes = nl->Third(nl->Second(args));
bool ts = !Stream<Attribute>::checkType(stream);
bool streamRes = listutils::isStream(funRes);
if(!ts && !streamRes) return 0; // SN
if(!ts && streamRes) return 1; // SS
if(ts && !streamRes) return 2; // TsN
if(ts && streamRes) return 3; // TsS
return -1;
}
ValueMapping streamuse2map[] =
{ Use_SNN,
Use_SNS,
Use_SSN,
Use_SSS
// ,
// Use_TsNN,
// Use_TsNS,
// Use_TsTsN,
// Use_TsTsS
};
int
streamUse2Select( ListExpr args )
{
ListExpr
X = nl->First(args),
Y = nl->Second(args),
M = nl->Third(args);
bool
xIsStream = false,
yIsStream = false,
resIsStream = false;
bool
xIsTuple = false,
yIsTuple = false;
int index = 0;
// examine first arg
// check type of sarg1
if( Tuple::checkType(X) ) {
xIsTuple = true;
xIsStream = false;
} else if( Stream<ANY>::checkType(X)) {
xIsStream = true;
if(Tuple::checkType(nl->Second(X))){
xIsTuple = true;
} else {
xIsTuple = false;
}
} else {
xIsTuple = false;
xIsStream = false;
}
if( Tuple::checkType(Y) ) {
yIsTuple = true;
yIsStream = false;
} else if( Stream<ANY>::checkType(Y)) {
yIsStream = true;
if(Tuple::checkType(nl->Second(Y))){
yIsTuple = true;
} else {
yIsTuple = false;
}
} else {
yIsTuple = false;
yIsStream = false;
}
// examine mapping result type
ListExpr mres = nl->Fourth(M);
if(Stream<ANY>::checkType(mres)){
resIsStream = true;
} else {
resIsStream = false;
}
// calculate appropriate index value
// tuple variants offest : +4
// both args streams : +2
// mapping result is stream : +1
index = 0;
if ( xIsTuple || yIsTuple ) index += 4;
if ( xIsStream && yIsStream ) index += 2;
if ( resIsStream ) index += 1;
if (index > 3)
cout << "\nWARNING: index =" << index
<< ">3 in streamUse2Select!" << endl;
return index;
}
/*
5.20.5 Definition of operator ~use~
*/
Operator streamuse( "use",
StreamSpecUse,
4,
streamusemap,
streamUseSelect,
TypeMapUse);
Operator streamuse2( "use2",
StreamSpecUse2,
4,
streamuse2map,
streamUse2Select,
TypeMapUse2);
/*
5.24 Operator ~aggregateS~
Stream aggregation operator
This operator applies an aggregation function (which must be binary,
associative and commutative) to a stream of data using a given neutral (initial)
value (which is also returned if the stream is empty). If the stream contains
only one single element, this element is returned as the result.
The result a single value of the same kind.
----
For T in kind DATA:
aggregateS: ((stream T) x (T x T --> T) x T) --> T
----
The first argument is the input stream.
The second argument is the function used in the aggregation.
The third value is used to initialize the mapping (for the first elem)
and will also be return if the input stream is empty.
5.24.1 Type mapping function for ~aggregateS~
*/
ListExpr StreamaggregateTypeMap( ListExpr args )
{
string outstr1, outstr2;
ListExpr TypeT;
// check for correct length
if (nl->ListLength(args) != 3) {
return listutils::typeError("Operator aggregateS expects a "
"list of length three.");
}
// get single arguments
ListExpr instream = nl->First(args),
map = nl->Second(args),
zerovalue = nl->Third(args);
if(!Stream<Attribute>::checkType(instream)){
return listutils::typeError("first element must be a stream of DATA");
}
TypeT = nl->Second(instream);
// check for second to be of length 4, (map T T T)
// T of same type as first
if ( nl->IsAtom(map) ||
!(nl->ListLength(map) == 4) ||
!( nl->IsEqual(nl->First(map), Symbol::MAP()) ) ||
!( nl->Equal(nl->Fourth(map), nl->Second(map)) ) ||
!( nl->Equal(nl->Third(map), nl->Second(map)) ) ||
!( nl->Equal(nl->Third(map), TypeT) ) ) {
ErrorReporter::ReportError("Operator aggregateS expects a list of length "
"four as second argument, having structure "
"'(map T T T)', where T has the base type of "
"the first argument.");
return nl->SymbolAtom( Symbol::TYPEERROR() );
}
// check for third to be atomic and of the same type T
if ( !listutils::isDATA(zerovalue) ||
!nl->Equal(TypeT, zerovalue) )
{
ErrorReporter::ReportError("Operator aggregateS expects a list of length"
"one as third argument (neutral elem), having "
"structure 'T', where T is also the type of "
"the mapping's arguments and result. Also, "
"T must be of kind DATA.");
return nl->SymbolAtom( Symbol::TYPEERROR() );
}
// return T as the result type.
return TypeT;
}
/*
5.24.2 Value mapping function of operator ~aggregateS~
The ~aggregateS~ operator uses a stack to compute the aggregation
balanced. This will have advantages in geometric aggregation.
It may also help to reduce numeric errors in aggregation using
double values.
5.24.2.1 ~StackEntry~
A stack entry consist of the level within the (simulated)
balanced tree and the corresponding value.
Note:
The attributes at level 0 come directly from the input stream.
We have to free them using the deleteIfAllowed function.
On all other levels, the attribute is computes using the
parameter function. Because this is outside of stream and
tuples, no reference counting is available and we have to delete
them using the usual delete function.
*/
struct AggrStackEntry
{
inline AggrStackEntry(): level(-1),value(0)
{ }
inline AggrStackEntry( long level, Attribute* value):
level( level )
{ this->value = value;}
inline AggrStackEntry( const AggrStackEntry& a ):
level( a.level )
{ this->value = a.value;}
inline AggrStackEntry& operator=( const AggrStackEntry& a )
{ level = a.level; value = a.value; return *this; }
inline ~AggrStackEntry(){ } // use destroy !!
inline void destroy(){
if(level<0){
return;
}
if(level==0){ // original from tuple
value->DeleteIfAllowed();
} else {
delete value;
}
value = 0;
level = -1;
}
long level;
Attribute* value;
};
int Streamaggregate(Word* args, Word& result, int message,
Word& local, Supplier s)
{
// The argument vector contains the following values:
// args[0] = stream of tuples
// args[1] = mapping function
// args[2] = zero value
Word resultWord;
ArgVectorPointer vector = qp->Argument(args[1].addr);
Stream<Attribute> stream(args[0]);
stream.open();
result = qp->ResultStorage(s);
// read the first tuple
Attribute* attr = stream.request();
if(attr ==0){ // stream was empty, copy zero element to result
((Attribute*)result.addr)-> CopyFrom( (const Attribute*)args[2].addr );
} else { // ok, there is at least one element in the stream
// nonempty stream, consume it
stack<AggrStackEntry> theStack;
while( attr!=0 ){
// put the attribute on the stack merging with existing entries
// while possible
int level = 0;
while(!theStack.empty() && level==theStack.top().level){
// merging is possible
AggrStackEntry top = theStack.top();
theStack.pop();
// call the parameter function
((*vector)[0]).setAddr(top.value);
((*vector)[1]).setAddr(attr);
qp->Request(args[1].addr, resultWord);
qp->ReInitResultStorage(args[1].addr);
top.destroy(); // remove stack content
if(level==0){ // delete attr;
attr->DeleteIfAllowed();
} else {
delete attr;
}
attr = (Attribute*) resultWord.addr;
level++;
}
AggrStackEntry entry(level,attr);
theStack.push(entry);
attr = stream.request();
}
// stream ends, merge stack elements regardless of their level
assert(!theStack.empty()); // at least one element must be exist
AggrStackEntry tmpResult = theStack.top();
theStack.pop();
while(!theStack.empty()){
AggrStackEntry top = theStack.top();
theStack.pop();
((*vector)[0]).setAddr(top.value);
((*vector)[1]).setAddr(tmpResult.value);
qp->Request(args[1].addr, resultWord);
qp->ReInitResultStorage(args[1].addr);
tmpResult.destroy(); // destroy temporarly result
tmpResult.level = 1; // mark as computed
tmpResult.value = (Attribute*) resultWord.addr;
top.destroy();
}
((Attribute*)result.addr)->
CopyFrom((Attribute*)tmpResult.value);
tmpResult.destroy();
}
// close input stream
stream.close();
return 0;
}
/*
5.24.3 Specification for operator ~aggregate~
*/
const string StreamaggregateSpec =
"( ( \"Signature\" \"Syntax\" \"Meaning\" "
"\"Example\" ) "
"("
"<text>For T in kind DATA:\n"
"((stream T) ((T T) -> T) T ) -> T\n</text--->"
"<text>_ aggregateS [ fun ; _ ]</text--->"
"<text>Aggregates the values from the stream (1st arg) "
"using a binary associative and commutative "
"aggregation function (2nd arg), "
"and a 'neutral value' (3rd arg, also passed as the "
"result if the stream is empty). If the stream contains"
"only one single element, that element will be returned"
"as the result.</text--->"
"<text>query intstream(1,5) aggregateS[ "
"fun(i1:int, i2:int) i1+i2 ; 0]\n"
"query intstream(1,5) aggregateS[ "
"fun(i1:STREAMELEM, i2:STREAMELEM) ifthenelse(i1>i2,i1,i2) ; 0]</text--->"
") )";
/*
5.24.4 Selection Function of operator ~aggregate~
*/
ValueMapping streamaggregatemap[] =
{
Streamaggregate
};
int streamaggregateSelect( ListExpr args )
{
return 0;
}
/*
5.24.5 Definition of operator ~aggregate~
*/
Operator streamaggregateS( "aggregateS",
StreamaggregateSpec,
1,
streamaggregatemap,
streamaggregateSelect,
StreamaggregateTypeMap);
/*
5.27 Operator ~transformstream~
----
transformstream: (stream T) -> stream(tuple((Elem T)))
stream(tuple((Id T))) -> (stream T)
for T in kind DATA, id some arbitrary identifier
----
Operator ~transformstream~ transforms a (stream DATA) into a
(stream(tuple((element DATA)))) and vice versa. ~element~ is the name for the
attribute created.
The result of the first variant can e.g. be consumed to form a relation
or be processed using ordinary tuplestream operators.
*/
/*
5.27.1 Type mapping function for ~transformstream~
----
stream(DATA) --> stream(tuple((elem DATA)))
stream(tuple((attrname DATA))) --> stream(DATA)
----
*/
ListExpr StreamTransformstreamTypeMap(ListExpr args)
{
if(!nl->HasLength(args,1)){
return listutils::typeError("one argument expected");
}
ListExpr arg = nl->First(args);
// variant 1: stream<DATA> -> stream <TUPLE>
if(Stream<Attribute>::checkType(arg)){
ListExpr res = nl->TwoElemList(
nl->SymbolAtom(Stream<Tuple>::BasicType()),
nl->TwoElemList(
nl->SymbolAtom(Tuple::BasicType()) ,
nl->OneElemList(
nl->TwoElemList(
nl->SymbolAtom("Elem"),
nl->Second(arg)))));
return res;
}
// variant 2: stream(tuple( a: b)) -> stream(b)
if(Stream<Tuple>::checkType(arg)){
ListExpr attrList = nl->Second(nl->Second(arg));
if(!nl->HasLength(attrList,1)){
return listutils::typeError("Only one attribute within the "
"tuple allowed");
}
ListExpr res = nl->TwoElemList(
nl->SymbolAtom(Stream<Attribute>::BasicType()),
nl->Second(nl->First(attrList)));
return res;
}
return listutils::typeError("stream(DATA) or stream(tuple([a : X]))"
" expected");
}
/*
5.27.3 Type Mapping for the ~namedtransformstream~ operator
This operator works as the transformstream operator. It takes a stream
of elements with kind data and produces a tuple stream, from it.
So, the value mapping is Transformstream[_]S[_]TS which is alos used by the
transformstream operator. The only difference is, additional to the
stream argument, this operator receives also a name for the attribute instead
using the defaul name 'elem'.
---- stream(DATA) x ident --> stream(tuple((ident DATA)))
----
*/
ListExpr NamedtransformstreamTypemap(ListExpr args){
if(nl->ListLength(args)!=2){
return listutils::typeError("two arguments required");
}
ListExpr stream = nl->First(args);
if(!Stream<Attribute>::checkType(stream)){
return listutils::typeError("First argument must be a stream(DATA).");
}
ListExpr nameList = nl->Second(args);
if(!listutils::isSymbol(nameList)){
return listutils::typeError("Second argument muts be an attribute name");
}
string name = nl->SymbolValue(nameList);
string symcheckmsg = "";
if(!SecondoSystem::GetCatalog()->IsValidIdentifier(name,symcheckmsg)){
return listutils::typeError("Symbol unusable: "+symcheckmsg+".");
}
char f = name[0];
if(f<'A' || f>'Z'){
return listutils::typeError("An attribute name has to "
"start with an upper case");
}
return nl->TwoElemList(nl->SymbolAtom(Stream<Tuple>::BasicType()),
nl->TwoElemList(
nl->SymbolAtom(Tuple::BasicType()),
nl->OneElemList(
nl->TwoElemList(
nl->SymbolAtom(name),
nl->Second(stream)
)
)));
}
/*
5.27.2 Value mapping for operator ~transformstream~
*/
template<class T>
struct TransformstreamLocalInfo
{
TransformstreamLocalInfo( Word arg) :
finished(false), resultTupleType(0), progressinitialized(false),
stream(arg)
{}
~TransformstreamLocalInfo() {
if(resultTupleType) {
resultTupleType->DeleteIfAllowed();
}
}
bool finished;
TupleType* resultTupleType;
bool progressinitialized;
Stream<T> stream;
};
// The first variant creates a tuplestream from a stream:
int Transformstream_S_TS(Word* args, Word& result, int message,
Word& local, Supplier s)
{
TransformstreamLocalInfo<Attribute> *sli;
Word value;
Tuple *newTuple;
switch ( message ) {
case OPEN:{
sli = (TransformstreamLocalInfo<Attribute>*) local.addr;
if(sli){
delete sli;
local.addr = 0;
}
sli = new TransformstreamLocalInfo<Attribute>(args[0]);
local.setAddr(sli);
ListExpr resultType = GetTupleResultType( s );
sli->resultTupleType = new TupleType( nl->Second( resultType ) );
sli->finished = false;
sli->progressinitialized = false;
sli->stream.open();
return 0;
}
case REQUEST:{
if (local.addr == 0)
return CANCEL;
sli = (TransformstreamLocalInfo<Attribute>*) (local.addr);
if (sli->finished){
return CANCEL;
}
Attribute* attr = sli->stream.request();
if (attr==0) { // input stream consumed
sli->stream.close();
sli->finished = true;
result.addr = 0;
return CANCEL;
}
// create tuple, copy and pass result, delete value
newTuple = new Tuple( sli->resultTupleType );
newTuple->PutAttribute( 0, attr );
result.setAddr(newTuple);
return YIELD;
}
case CLOSE:{
if (local.addr != 0) {
sli = (TransformstreamLocalInfo<Attribute>*) (local.addr);
if (!sli->finished){
sli->stream.close();
sli->finished = true;
}
}
return 0;
}
case CLOSEPROGRESS:{
if (local.addr != 0)
{
sli = (TransformstreamLocalInfo<Attribute>*) (local.addr);
if(!sli->finished){
sli->stream.close();
}
delete sli;
local.setAddr(0);
}
return 0;
}
case REQUESTPROGRESS:{
sli = (TransformstreamLocalInfo<Attribute>*) (local.addr);
if(!sli){
return CANCEL;
}
ProgressInfo p1;
ProgressInfo* pRes = (ProgressInfo*) result.addr;
if( !sli->stream.requestProgress( &p1) ){
return CANCEL;
};
const double uProject = 0.00073; //millisecs per tuple
const double vProject = 0.0004; //millisecs per tuple and attribute
pRes->Copy(p1);
pRes->Time = p1.Time + pRes->Card * (uProject + vProject);
pRes->Progress = p1.Progress; //a number between 0 and 1
if( !sli->progressinitialized || p1.sizesChanged ) {
pRes->sizesChanged = true;
sli->progressinitialized = true;
} else {
pRes->sizesChanged = false;
}
return YIELD;
}
} // switch
cout << "Transformstream_S_TS: UNKNOWN MESSAGE!" << endl;
return 0;
}
// The second variant creates a stream from a tuplestream:
int Transformstream_TS_S(Word* args, Word& result, int message,
Word& local, Supplier s)
{
TransformstreamLocalInfo<Tuple> *sli;
switch ( message ){
case OPEN:{
sli = (TransformstreamLocalInfo<Tuple>*) local.addr;
if(sli){
delete sli;
}
sli = new TransformstreamLocalInfo<Tuple>(args[0]);
sli->finished = false;
sli->progressinitialized = false;
sli->stream.open();
local.setAddr(sli);
return 0;
}
case REQUEST:{
if (local.addr == 0)
{
return CANCEL;
}
sli = (TransformstreamLocalInfo<Tuple>*) (local.addr);
if (sli->finished)
{
return CANCEL;
}
Tuple* tuple = sli->stream.request();
if (tuple==0) { // input stream consumed
sli->stream.close();
sli->finished = true;
result.addr = 0;
return CANCEL;
}
// extract, copy and pass value, delete tuple
result.addr = tuple->GetAttribute(0)->Copy();
tuple->DeleteIfAllowed();
return YIELD;
}
case CLOSE:{
if (local.addr != 0){
sli = (TransformstreamLocalInfo<Tuple>*) (local.addr);
if (!sli->finished){
sli->stream.close();
sli->finished = true;
// disposal of localinfo done in CLOSEPROGRESS
}
}
return 0;
}
case CLOSEPROGRESS:{
if (local.addr != 0)
{
sli = (TransformstreamLocalInfo<Tuple>*) (local.addr);
if (!sli->finished){
sli->stream.close();
}
delete sli;
local.setAddr(0);
}
return 0;
}
case REQUESTPROGRESS:{
sli = (TransformstreamLocalInfo<Tuple>*) (local.addr);
if( !sli ){
return CANCEL;
}
ProgressInfo p1;
ProgressInfo* pRes = (ProgressInfo*) result.addr;
if( !sli->stream.requestProgress( &p1) ){
return CANCEL;
};
const double uProject = 0.00073; //millisecs per tuple
const double vProject = 0.0004; //millisecs per tuple and attribute
pRes->Copy(p1);
pRes->Time = p1.Time + pRes->Card * (uProject + vProject);
pRes->Progress = p1.Progress; //a number between 0 and 1
if( !sli->progressinitialized || p1.sizesChanged ) {
pRes->sizesChanged = true;
sli->progressinitialized = true;
} else {
pRes->sizesChanged = false;
}
return YIELD;
}
}
cout << __PRETTY_FUNCTION__ <<": UNKNOWN MESSAGE!" << endl;
return -1;
}
/*
5.27.3 Specification for operator ~transformstream~
*/
const string StreamTransformstreamSpec =
"( ( \"Signature\" \"Syntax\" \"Meaning\" "
"\"Example\" ) "
"("
"<text>For T in kind DATA:\n"
"(stream T) -> stream(tuple((elem T)))\n"
"stream(tuple(attrname T)) -> (stream T)</text--->"
"<text>_ transformstream</text--->"
"<text>Transforms a 'stream T' into a tuplestream "
"with a single attribute 'elem' containing the "
"values coming from the input stream and vice "
"versa. The identifier 'elem' is fixed, the "
"attribute name 'attrname' may be arbitrary "
"chosen, but the tuplestream's tupletype may "
"have only a single attribute.</text--->"
"<text>query intstream(1,5) transformstream consume\n "
"query ten feed transformstream printstream count</text--->"
") )";
const string NamedtransformstreamSpec =
"( ( \"Signature\" \"Syntax\" \"Meaning\" "
"\"Example\" ) "
"("
"<text>stream(t) x name -> stream (tuple(t name)))</text--->"
"<text> _ namedtransformstream [ _ ] </text--->"
"<text> Converts a stream to a tuple stream with"
" given attribute name </text--->"
"<text>query intsteam(0,100)"
" namedtransformstream [Number] consume</text--->"
") )";
/*
5.27.4 Selection Function of operator ~transformstream~
*/
ValueMapping streamtransformstreammap[] =
{
Transformstream_S_TS,
Transformstream_TS_S
};
int streamTransformstreamSelect( ListExpr args )
{
ListExpr first = nl->First( args);
if(Stream<Tuple>::checkType(first)){
return 1;
} else {
return 0;
}
}
/*
5.27.5 Definition of operator ~transformstream~
*/
Operator streamtransformstream( "transformstream",
StreamTransformstreamSpec,
2,
streamtransformstreammap,
streamTransformstreamSelect,
StreamTransformstreamTypeMap);
/*
5.28 Operator ~projecttransformstream~
5.28.1 Type Mapping
---- stream(tuple((a1 t1) (a2 t2)...(an tn))) x ai --> stream(ti)
----
*/
ListExpr ProjecttransformstreamTM(ListExpr args){
if(!nl->HasLength(args,2)){
return listutils::typeError("Two arguments expected");
}
ListExpr stream = nl->First(args);
if(!Stream<Tuple>::checkType(stream)){
return listutils::typeError("first argument must be a tuple stream");
}
ListExpr nameList = nl->Second(args);
if(!listutils::isSymbol(nameList)){
return listutils::typeError("Second argument is not a "
"valid attribute name");
}
string name = nl->SymbolValue(nameList);
ListExpr attrType;
ListExpr attrList = nl->Second(nl->Second(stream));
int pos = listutils::findAttribute(attrList, name, attrType);
if(pos<=0){
return listutils::typeError("Attribute " + name +
" not found in tuple");
}
pos--;
return nl->ThreeElemList(
nl->SymbolAtom(Symbol::APPEND()),
nl->OneElemList(nl->IntAtom(pos)),
nl->TwoElemList(nl->SymbolAtom(Stream<Attribute>::BasicType()),
attrType));
}
/*
5.28.2 Value Mapping
*/
class ProjectTransformLI{
public:
ProjectTransformLI(Word& s, CcInt* p):stream(s), pos(p->GetIntval()){
stream.open();
}
~ProjectTransformLI(){
stream.close();
}
Attribute* next(){
Tuple* t = stream.request();
if(t==0){
return 0;
} else {
Attribute* a = t->GetAttribute(pos)->Copy();
t->DeleteIfAllowed();
return a;
}
}
private:
Stream<Tuple> stream;
int pos;
};
int Projecttransformstream(Word* args, Word& result, int message,
Word& local, Supplier s)
{
switch ( message )
{
case OPEN:{
if(local.addr){
delete (ProjectTransformLI*)local.addr;
}
local.addr = new ProjectTransformLI(args[0], (CcInt*)(args[2].addr));
return 0;
}
case REQUEST:{
if(!local.addr){
return CANCEL;
}
result.addr = ((ProjectTransformLI*)local.addr)->next();
return result.addr?YIELD:CANCEL;
}
case CLOSE:
if(local.addr){
delete (ProjectTransformLI*)local.addr;
local.addr=0;
}
return 0;
}
cerr << "Projecttransformstream: UNKNOWN MESSAGE!" << endl;
return -1;
}
/*
5.28.3 Specification
*/
const string ProjecttransformstreamSpec =
"( ( \"Signature\" \"Syntax\" \"Meaning\" "
"\"Example\" ) "
"("
"<text>stream(tuple((a1 t1)...(an tn))) x an -> (stream tn)</text--->"
"<text>_ project transformstream [ _ ] </text--->"
"<text> extracts an attribute from a tuple stream </text--->"
"<text>query Staedte feed projecttransformstream"
" [PLZ] printintstream count</text--->"
") )";
/*
5.28.4 Definition of the operator instance
*/
Operator projecttransformstream (
"projecttransformstream", //name
ProjecttransformstreamSpec, //specification
Projecttransformstream, //value mapping
Operator::SimpleSelect, //trivial selection function
ProjecttransformstreamTM //type mapping
);
Operator namedtransformstream (
"namedtransformstream", //name
NamedtransformstreamSpec, //specification
Transformstream_S_TS, //value mapping
Operator::SimpleSelect, //trivial selection function
NamedtransformstreamTypemap //type mapping
);
/*
5.29 The ~echo~ operator
stream(X) x bool x DATA -> STREAM(X)
X x DATA -> X (X can be all but stream)
*/
ListExpr EchoTypeMap(ListExpr args){
int len = nl->ListLength(args);
if(len!=2 && len!=3){
ErrorReporter::ReportError("Wrong number of parameters");
return nl->TypeError();
}
ListExpr errorInfo = nl->OneElemList(nl->SymbolAtom("ERROR"));
if(len==2){ // T x S -> T , T # stream(...)
// check for kind DATA
ListExpr typeToPrint = nl->Second(args);
if(! SecondoSystem::GetAlgebraManager()
->CheckKind(Kind::DATA(),typeToPrint,errorInfo)){
ErrorReporter::ReportError("last arg has to be in kind DATA");
return nl->TypeError();
}
// check for T# stream
if(listutils::isStream(nl->First(args))){
ErrorReporter::ReportError("If the first argument is a stream, two "
"further parameters are required");
return nl->TypeError();
}
return nl->First(args);
} else { // len==3
// first argument has to be a stream
if(!listutils::isStream(nl->First(args))){
return listutils::typeError("When 3 parameters are given, the"
" first of them must be a stream");
}
if(!nl->IsEqual(nl->Second(args),CcBool::BasicType())){
ErrorReporter::ReportError("bool expected as second argument.");
return nl->TypeError();
}
ListExpr typeToPrint = nl->Third(args);
if(! SecondoSystem::GetAlgebraManager()
->CheckKind(Kind::DATA(),typeToPrint,errorInfo)){
ErrorReporter::ReportError("last arg has to be in kind DATA");
return nl->TypeError();
}
return nl->First(args);
}
}
/*
5.28.2 Value Mapping for the echo operator
*/
int Echo_Stream(Word* args, Word& result, int message,
Word& local, Supplier s)
{
bool each = ((CcBool*) args[1].addr)->GetBoolval();
Attribute* s1 = (Attribute*) args[2].addr;
Word elem;
switch(message){
case OPEN:
cout << "OPEN: ";
s1->Print(cout) << endl;
qp->Open(args[0].addr);
return 0;
case REQUEST:
if(each){
cout << "REQUEST: ";
s1->Print(cout) << endl;
}
qp->Request(args[0].addr,elem);
if(qp->Received(args[0].addr)){
result.setAddr(elem.addr);
return YIELD;
} else{
return CANCEL;
}
case CLOSE:
cout << "CLOSE: ";
s1->Print(cout) << endl;
qp->Close(args[0].addr);
return 0;
}
return 0;
}
int Echo_Other(Word* args, Word& result, int message,
Word& local, Supplier s)
{
Attribute* s1 = (Attribute*) args[1].addr;
result.setAddr(args[0].addr);
s1->Print(cout) << endl;
return 0;
}
/*
5.29.3 Selection function and VM array
*/
ValueMapping echovm[] = {Echo_Stream, Echo_Other};
int EchoSelect(ListExpr args){
if(nl->ListLength(args)==2){
return 1;
} else {
return 0;
}
}
/*
5.29.4 Specification
*/
const string EchoSpec =
"( ( \"Signature\" \"Syntax\" \"Meaning\" "
"\"Example\" ) "
"("
"<text>stream(T) x bool x string -> stream(T) \n"
" T x string -> T , T # stream</text--->"
"<text>_ echo [ _ ] </text--->"
"<text> prints the given string if operator mapping is called </text--->"
"<text>query Staedte feed echo[TRUE, \"called\"] count</text--->"
") )";
/*
5.29.5 Creatinmg the operator instance
*/
Operator echo( "echo",
EchoSpec,
2,
echovm,
EchoSelect,
EchoTypeMap);
/*
5.28 Operator ~count~
Signature:
----
For T in kind DATA:
(stream T) -> int
----
The operator counts the number of stream elements.
*/
/*
5.28.1 Type mapping function for ~count~
*/
ListExpr
streamCountType( ListExpr args )
{
if ( nl->ListLength(args) != 1 ){
return listutils::typeError("one argument expected");
}
ListExpr arg1 = nl->First(args);
if(!Stream<Attribute>::checkType(arg1)){
return listutils::typeError("stream(DATA) expected");
}
return nl->SymbolAtom(CcInt::BasicType());
}
/*
5.28.2 Value mapping for operator ~count~
*/
int
streamCountFun (Word* args, Word& result, int message, Word& local, Supplier s)
/*
Count the number of elements in a stream. An example for consuming a stream.
*/
{
struct streamCountFunLocalInfo {
bool initializedprogress;
double *attrSize;
double *attrSizeExt;
streamCountFunLocalInfo(): initializedprogress( false ) {
attrSize = new double[1];
attrSize[0] = sizeof(CcInt);
attrSizeExt = new double[1];
attrSizeExt[0] = sizeof(CcInt);
}
~streamCountFunLocalInfo() {
delete[] attrSize;
delete[] attrSizeExt;
}
};
Word elem;
int count = 0;
streamCountFunLocalInfo* li;
li = static_cast<streamCountFunLocalInfo*>(local.addr);
switch(message){
case OPEN:
case CLOSE:
case REQUEST: {
if(!li){
li = new streamCountFunLocalInfo();
local.addr = li;
}
qp->Open(args[0].addr);
qp->Request(args[0].addr, elem);
while ( qp->Received(args[0].addr) ){
count++;
Attribute* attr = static_cast<Attribute*>( elem.addr );
attr->DeleteIfAllowed(); // consume the stream object
qp->Request(args[0].addr, elem);
}
result = qp->ResultStorage(s);
static_cast<CcInt*>(result.addr)->Set(true, count);
qp->Close(args[0].addr);
return 0;
}
case REQUESTPROGRESS:{
if(!local.addr){
return CANCEL;
}
ProgressInfo* pRes;
pRes = (ProgressInfo*) result.addr;
pRes->Card = 1 ; //expected cardinality
pRes->Size = sizeof(CcInt); //expected total size
pRes->SizeExt = sizeof(CcInt); //expected root+ext size (no FLOBs)
pRes->noAttrs = 1; //no of attributes
pRes->attrSize = li->attrSize; // the complete size
pRes->attrSizeExt = li->attrSizeExt; //the root and extension size
pRes->sizesChanged = true; //sizes have been recomputed
li->initializedprogress = true;
ProgressInfo p1;
if ( qp->RequestProgress(args[0].addr, &p1) ){
pRes->BTime = p1.Time; // this is a blocking operator!
pRes->BProgress = p1.Progress; // this is a blocking operator!
pRes->Progress = p1.Progress;
pRes->Time = p1.Time;
return YIELD;
} else {
result.addr = 0;
return CANCEL;
}
break;
}
case CLOSEPROGRESS:{
if(li){
delete li;
local.addr = 0;
}
return 0;
}
default: {
return -1;
}
}
return 0;
}
/*
5.28.3 Specification for operator ~count~
*/
const string streamCountSpec =
"( ( \"Signature\" \"Syntax\" \"Meaning\" \"Example\" ) "
"( <text>For T in kind DATA:\n"
"((stream T)) -> int</text--->"
"<text>_ count</text--->"
"<text>Counts the number of elements of a stream.</text--->"
"<text>query intstream (1,10) count</text--->"
") )";
/*
5.28.4 Selection Function of operator ~count~
*/
int
streamCountSelect (ListExpr args ) { return 0; }
/*
5.28.5 Definition of operator ~count~
*/
Operator streamcount (
"count", //name
streamCountSpec, //specification
streamCountFun, //value mapping
streamCountSelect, //trivial selection function
streamCountType //type mapping
);
/*
5.29 Operator ~printstream~
----
For T in kind DATA:
(stream T) -> (stream T)
----
For every stream element, the operator calls the ~print~ function
and passes on the element.
*/
/*
5.29.1 Type mapping function for ~printstream~
*/
ListExpr
streamPrintstreamType( ListExpr args ) {
if(!nl->HasLength(args,1)){
return listutils::typeError("One argument expected.");
}
ListExpr stream = nl->First(args);
// case: stream<DATA>
if(Stream<Attribute>::checkType(stream)){
return stream;
}
if(!Stream<Tuple>::checkType(stream)){
return listutils::typeError("stream<DATA> or stream<Tuple> expected");
}
// case : stream<tuple>
// collect and append the attribute names
ListExpr attrList = nl->Second(nl->Second(stream));
bool firstcall = true;
ListExpr attrNames = nl->TheEmptyList();
ListExpr last = nl->TheEmptyList();
while( !nl->IsEmpty(attrList) ) {
ListExpr attr = nl->First(attrList);
attrList = nl->Rest(attrList);
ListExpr name = nl->StringAtom(nl->SymbolValue(nl->First(attr)));
if(firstcall){
attrNames = nl->OneElemList(name);
last = attrNames;
firstcall = false;
} else {
last = nl->Append(last, name);
}
}
// return stream@(noAttrs,attrList)
ListExpr res = nl->ThreeElemList(
nl->SymbolAtom(Symbol::APPEND()),
attrNames,
stream);
return res;
}
/*
5.29.2 Value mapping for operator ~printstream~
*/
int
streamPrintstreamFun (Word* args, Word& result,
int message, Word& local, Supplier s)
/*
Print the elements of an Attribute-type stream.
An example for a pure stream operator (input and output are streams).
*/
{
Word elem;
switch( message )
{
case OPEN:
qp->Open(args[0].addr);
return 0;
case REQUEST:
qp->Request(args[0].addr, elem);
if ( qp->Received(args[0].addr) )
{
((Attribute*) elem.addr)->Print(cout); cout << endl;
result = elem;
return YIELD;
}
else return CANCEL;
case CLOSE:
qp->Close(args[0].addr);
return 0;
}
/* should not happen */
return -1;
}
int
streamPrintTupleStreamFun (Word* args, Word& result,
int message, Word& local, Supplier s)
/*
Print the elements of a Tuple-type stream.
*/
{
Word tupleWord, elem;
string attrName;
switch(message)
{
case OPEN:
qp->Open(args[0].addr);
return 0;
case REQUEST:
qp->Request(args[0].addr, tupleWord);
if(qp->Received(args[0].addr))
{
cout << "Tuple: (" << endl;
Tuple* tuple = (Tuple*) (tupleWord.addr);
for(int i=0; i<tuple->GetNoAttributes(); i++){
string attrName = (static_cast<CcString*>
(args[i+1].addr))->GetValue();
cout << attrName << ": ";
Attribute* attr = (Attribute*) (tuple->GetAttribute(i));
if(attr){
attr->Print(cout);
} else {
cout << "Invalid attribute: NULL";
}
cout << endl;
}
cout << " )" << endl;
result = tupleWord;
return YIELD;
}
else
{
return CANCEL;
}
case CLOSE:
qp->Close(args[0].addr);
return 0;
}
return 0;
}
/*
5.29.3 Specification for operator ~printstream~
*/
const string streamPrintstreamSpec =
"( ( \"Signature\" \"Syntax\" \"Meaning\" \"Example\" ) "
"( <text>For T in kind DATA:\n"
"((stream T)) -> (stream T)\n"
"((stream tuple(X))) -> (stream tuple(X))</text--->"
"<text>_ printstream</text--->"
"<text>Prints the elements of an arbitrary stream or tuplestream.</text--->"
"<text>query intstream (1,10) printstream count</text--->"
") )";
/*
5.29.4 Selection Function of operator ~printstream~
Uses the same function as for ~count~.
*/
int
streamPrintstreamSelect (ListExpr args )
{
ListExpr streamType = nl->Second(nl->First(args));
if( (nl->ListLength(streamType) == 2) &&
(nl->IsEqual(nl->First(streamType),Tuple::BasicType())))
return 0;
else
return 1;
}
ValueMapping streamprintstreammap[] = {
streamPrintTupleStreamFun,
streamPrintstreamFun
};
/*
5.29.5 Definition of operator ~printstream~
*/
Operator streamprintstream (
"printstream", //name
streamPrintstreamSpec, //specification
2,
streamprintstreammap, //value mapping
streamPrintstreamSelect, //own selection function
streamPrintstreamType //type mapping
);
/*
5.30 printstream2
*/
ListExpr printstream2TM(ListExpr args){
if(!nl->HasLength(args,3)){
return listutils::typeError("3 arguments expected");
}
ListExpr stream = nl->First(args);
if(!Stream<Tuple>::checkType(stream)
&& !Stream<Attribute>::checkType(stream)){
return listutils::typeError("first argument must be a stream of "
"tuple or a stream of attribute");
}
if(!CcString::checkType(nl->Second(args))){
return listutils::typeError("second argument is not a string");
}
if(!CcString::checkType(nl->Third(args))){
return listutils::typeError("third argument is not a string");
}
return nl->First(args);
}
template<class StreamType>
class printstream2Info{
public:
printstream2Info(Word& _stream, CcString* _pre, CcString* _after):
stream(_stream){
stream.open();
pre = _pre->IsDefined()?_pre->GetValue():"";
after = _after->IsDefined()?_after->GetValue():"";
elem = 1;
}
~printstream2Info(){
stream.close();
}
StreamType* next(){
StreamType* res = stream.request();
if(res){
print(res);
}
return res;
}
private:
Stream<StreamType> stream;
string pre;
string after;
int elem;
void print(StreamType* elem){
cout << pre << " " << this->elem << endl;
this->elem++;
elem->Print(cout);
cout << endl << after << endl;
}
};
template<class StreamType>
int printstream2VMT (Word* args, Word& result,
int message, Word& local, Supplier s){
printstream2Info<StreamType>* li = (printstream2Info<StreamType>*) local.addr;
switch(message){
case OPEN: if(li){
delete li;
}
local.addr = new printstream2Info<StreamType>(args[0],
(CcString*) args[1].addr,
(CcString*) args[2].addr);
return 0;
case REQUEST : {
result.addr = li?li->next():0;
return result.addr?YIELD:CANCEL;
}
case CLOSE: {
if(li){
delete li;
local.addr = 0;
}
return 0;
}
}
return -1;
}
ValueMapping printstream2VM[] = {
printstream2VMT<Attribute>,
printstream2VMT<Tuple>
};
OperatorSpec printstream2Spec(
"stream(X) x string x string -> stream(X)",
"_ printstream2[_,_]",
"Outputs elements in the stream in a stream to standard out.",
"Each element in enclosed in the strings given as second and third"
"argument. The header is extended by the number of the element."
"query plz feed printstream2[\"Elem : \", \"-----\"] count"
);
int printstream2Select(ListExpr args){
return Stream<Attribute>::checkType(nl->First(args))?0:1;
}
Operator printstream2Op(
"printstream2",
printstream2Spec.getStr(),
2,
printstream2VM,
printstream2Select,
printstream2TM
);
/*
5.30 Operator ~filter~
----
For T in kind DATA:
((stream T) (map T bool)) -> (stream T)
----
The operator filters the elements of an arbitrary stream by a predicate.
*/
/*
5.30.1 Type mapping function for ~filter~
*/
ListExpr
streamFilterType( ListExpr args )
{
ListExpr stream, map;
string out, out2;
if ( nl->ListLength(args) == 2 )
{
stream = nl->First(args);
map = nl->Second(args);
// test first argument for stream(T), T in kind DATA
if(!Stream<Attribute>::checkType(stream)){
return listutils::typeError("Operator filter expects a (stream T), "
"T in kind DATA as its first argument. "
"The argument provided "
"has type '" + out + "' instead.");
}
// test second argument for map T' bool. T = T'
if ( nl->IsAtom(map)
|| (nl->ListLength(map) != 3)
|| !nl->IsEqual(nl->First(map), Symbol::MAP())
|| !nl->IsEqual(nl->Third(map), CcBool::BasicType()) )
{
nl->WriteToString(out, map);
ErrorReporter::ReportError("Operator filter expects a "
"(map T bool), T in kind DATA, "
"as its second argument. "
"The second argument provided "
"has type '" + out + "' instead.");
return nl->SymbolAtom(Symbol::TYPEERROR());
}
if ( !( nl->Equal( nl->Second(stream), nl->Second(map) ) ) )
{
nl->WriteToString(out, nl->Second(stream));
nl->WriteToString(out2, nl->Second(map));
ErrorReporter::ReportError("Operator filter: the stream base type "
"T must match the map's argument type, "
"e.g. 1st: (stream T), 2nd: (map T bool). "
"The actual types are 1st: '" + out +
"', 2nd: '" + out2 + "'.");
return nl->SymbolAtom(Symbol::TYPEERROR());
}
}
else
{ // wrong number of arguments
ErrorReporter::ReportError("Operator filter expects two arguments.");
return nl->SymbolAtom(Symbol::TYPEERROR());
}
return stream; // return type of first argument
}
/*
5.30.2 Value mapping for operator ~filter~
*/
int
streamFilterFun (Word* args, Word& result, int message, Word& local, Supplier s)
/*
Filter the elements of a stream by a predicate. An example for a stream
operator and also for one calling a parameter function.
*/
{
struct StreamFilterLocalInfo{
StreamFilterLocalInfo():current( 0 ), returned( 0 ), done( false ){};
int current; //tuples read
int returned; //tuples returned
bool done; //arg stream exhausted
};
Word elem, funresult;
ArgVectorPointer funargs;
StreamFilterLocalInfo *fli = static_cast<StreamFilterLocalInfo*>(local.addr);
switch( message ){
case OPEN:{
if(fli){
delete fli;
}
fli = new StreamFilterLocalInfo();
local.setAddr(fli);
qp->Open(args[0].addr);
return 0;
}
case REQUEST:{
if(!fli || fli->done){ return CANCEL; }
funargs = qp->Argument(args[1].addr); //Get the argument vector for
//the parameter function.
qp->Request(args[0].addr, elem);
while ( qp->Received(args[0].addr) ) {
fli->current++;
(*funargs)[0] = elem;
//Supply the argument for the
//parameter function.
qp->Request(args[1].addr, funresult);
//Ask the parameter function
//to be evaluated.
if ( ((CcBool*) funresult.addr)->GetBoolval() ){
// object fulfills condition - pass it on:
result = elem;
fli->returned++;
return YIELD;
}
//otherwise: consume the stream object:
((Attribute*) elem.addr)->DeleteIfAllowed();
qp->Request(args[0].addr, elem); // get next element
} // while
return CANCEL;
}
case CLOSE:{
qp->Close(args[0].addr);
return 0;
}
case CLOSEPROGRESS:{
if( fli ){
delete fli;
local.setAddr(0);
}
return 0;
}
case REQUESTPROGRESS:{
ProgressInfo p1;
ProgressInfo* pRes;
const double uFilter = 0.01;
pRes = (ProgressInfo*) result.addr;
if ( qp->RequestProgress(args[0].addr, &p1) ){
pRes->CopySizes(p1);
if ( fli ){ //filter was started
if ( fli->done ){ //arg stream exhausted, all known
pRes->Card = (double) fli->returned;
pRes->Time = p1.Time + (double) fli->current
* qp->GetPredCost(s) * uFilter;
pRes->Progress = 1.0;
pRes->CopyBlocking(p1);
return YIELD;
}
if ( fli->returned >= enoughSuccessesSelection ){
//stable state assumed now
pRes->Card = p1.Card *
( (double) fli->returned / (double) (fli->current));
pRes->Time = p1.Time + p1.Card * qp->GetPredCost(s) * uFilter;
if ( p1.BTime < 0.1 && pipelinedProgress ){
//non-blocking, use pipelining
pRes->Progress = p1.Progress;
} else {
pRes->Progress = (p1.Progress * p1.Time
+ fli->current * qp->GetPredCost(s) * uFilter) / pRes->Time;
}
pRes->CopyBlocking(p1);
return YIELD;
}
}
//filter not yet started or not enough seen
pRes->Card = p1.Card * qp->GetSelectivity(s);
pRes->Time = p1.Time + p1.Card * qp->GetPredCost(s) * uFilter;
if ( p1.BTime < 0.1 && pipelinedProgress ){
//non-blocking, use pipelining
pRes->Progress = p1.Progress;
} else {
pRes->Progress = (p1.Progress * p1.Time) / pRes->Time;
}
pRes->CopyBlocking(p1);
return YIELD;
} else {
return CANCEL;
}
}
} // switch
/* should not happen */
return -1;
}
/*
5.30.3 Specification for operator ~filter~
*/
const string streamFilterSpec =
"( ( \"Signature\" \"Syntax\" \"Meaning\" \"Example\" ) "
"( <text>For T in kind DATA:\n"
"((stream T) (map T bool)) -> (stream T)</text--->"
"<text>_ filter [ fun ]</text--->"
"<text>Filters the elements of a stream by a predicate.</text--->"
"<text>query intstream (1,10) filter[. > 7] printintstream count</text--->"
") )";
/*
5.30.4 Selection Function of operator ~filter~
Uses the same function as for ~count~.
*/
/*
5.30.5 Definition of operator ~filter~
*/
Operator streamfilter (
"filter", //name
streamFilterSpec, //specification
streamFilterFun, //value mapping
streamCountSelect, //trivial selection function
streamFilterType //type mapping
);
/*
5.41 Operator ~realstream~
----
real x real x real -> stream(real)
----
The ~realstream~ operator takes three arguments of type ~real~.
It produces a stream of real values in range provided by the first
two arguments with a stepwide taken from the third argument.
*/
/*
5.41.1 Type mapping function for ~realstream~
*/
ListExpr realstreamTypeMap( ListExpr args ){
ListExpr arg1, arg2, arg3;
if ( nl->ListLength(args) == 3 )
{
arg1 = nl->First(args);
arg2 = nl->Second(args);
arg3 = nl->Third(args);
if ( nl->IsEqual(arg1, CcReal::BasicType()) &&
nl->IsEqual(arg2,CcReal::BasicType()) &&
nl->IsEqual(arg3, CcReal::BasicType()) ){
return nl->TwoElemList(nl->SymbolAtom(Stream<CcReal>::BasicType()),
nl->SymbolAtom(CcReal::BasicType()));
}
}
ErrorReporter::ReportError("real x real x real expected");
return nl->TypeError();
}
/*
5.41.2 Value mapping for operator ~realstream~
*/
int
realstreamFun (Word* args, Word& result, int message, Word& local, Supplier s)
{
struct RangeAndDiff {
double first, last, diff;
long iter;
long card;
bool initializedprogress;
double* attrSize;
double* attrSizeExt;
RangeAndDiff(Word* args) {
initializedprogress = false;
CcReal* r1 = ((CcReal*)args[0].addr);
CcReal* r2 = ((CcReal*)args[1].addr);
CcReal* r3 = ((CcReal*)args[2].addr);
iter = 0;
bool defined = r1->IsDefined() && r2->IsDefined() && r3->IsDefined();
if (defined) {
first = r1->GetRealval();
last = r2->GetRealval();
diff = r3->GetRealval();
}
else {
first = 0;
last = -1;
diff = 1;
}
if(diff > 0.0) {
card = (long)(ceil(fabs( (last - first) / diff ) + 1.0));
} else {
card = 1;
}
attrSize = new double[1];
attrSize[0] = sizeof(CcReal);
attrSizeExt = new double[1];
attrSizeExt[0] = sizeof(CcReal);
}
~RangeAndDiff() {
if(attrSize) { delete[] attrSize; attrSize = 0;}
if(attrSizeExt) {delete[] attrSizeExt; attrSizeExt = 0;}
}
};
RangeAndDiff* range_d = 0;
double current = 0;
double cd = 0;
CcReal* elem = 0;
switch( message )
{
case OPEN: {
range_d = new RangeAndDiff(args);
local.addr = range_d;
return 0;
}
case REQUEST: {
range_d = ((RangeAndDiff*) local.addr);
cd = (double) range_d->iter * range_d->diff;
current = range_d->first + cd;
if(range_d->diff == 0.0){ // don't allow endless loops
return CANCEL;
} else if(range_d->diff < 0.0){
if(current < range_d->last){
return CANCEL;
} else {
elem = new CcReal(true,current);
result.addr = elem;
range_d->iter++;
return YIELD;
}
} else { // diff > 0.0
if(current > range_d->last){
return CANCEL;
} else {
elem = new CcReal(true,current);
result.addr = elem;
range_d->iter++;
return YIELD;
}
}
}
case CLOSE: {
// localinfo is destroyed in CLOSEPROGRESS
return 0;
}
case CLOSEPROGRESS: {
range_d = ((RangeAndDiff*) local.addr);
if(range_d){
delete range_d;
local.setAddr(0);
}
range_d = 0;
return 0;
}
case REQUESTPROGRESS: {
ProgressInfo* pRes = (ProgressInfo*) result.addr;
range_d = ((RangeAndDiff*) local.addr);
if( range_d ){
if(range_d->initializedprogress){
pRes->sizesChanged = false; //sizes were not recomputed
range_d->initializedprogress = true;
} else {
pRes->sizesChanged = true; //first call
}
pRes->Size = sizeof(CcReal); //total tuple size
// (including FLOBs)
pRes->SizeExt = sizeof(CcReal); //tuple root and extension part
pRes->noAttrs = 1; //no of attributes
pRes->attrSize = range_d->attrSize; // complete size per attr
pRes->attrSizeExt = range_d->attrSizeExt; // root +extension
// size per attr
const double feedccreal = 0.001; //milliseconds per CcReal
pRes->Card = range_d->card; //expected cardinality
pRes->Time = (range_d->card) * feedccreal; //expected time, [ms]
pRes->Progress = (double) range_d-> iter / (double) range_d->card;
pRes->BTime = 0.00001; // blocking time must not be 0
pRes->BProgress = 1.0; // blocking progress [0,1]
return YIELD;
} else {
return CANCEL;
}
}
default: {
return -1; /* should not happen */
}
} // switch
return -1; /* should not happen */
}
/*
5.41.3 Specification for operator ~~
*/
struct realstreamInfo : OperatorInfo
{
realstreamInfo() : OperatorInfo()
{
name = "realstream";
signature = CcReal::BasicType()+" x "+CcReal::BasicType()+
" -> stream(real)";
syntax = "realstream(_ , _, _)";
meaning = "Creates a stream of reals containing the numbers "
"between the first and the second argument. The third "
"argument defines the step width.";
example = "realstream(-100.0, 100.0, 0.5) count";
supportsProgress = true;
}
};
/*
5.41 Operator ~intstream~
---- int x int --> stream(int)
----
*/
// TypeMappingFunction
ListExpr
intstreamTypeMap( ListExpr args )
{
string err = "int x int expected";
if(!nl->HasLength(args,2)){
return listutils::typeError(err);
}
if(!listutils::isSymbol(nl->First(args),CcInt::BasicType()) ||
!listutils::isSymbol(nl->Second(args),CcInt::BasicType())){
return listutils::typeError(err);
}
return nl->TwoElemList(nl->SymbolAtom(Stream<CcInt>::BasicType()),
nl->SymbolAtom(CcInt::BasicType()));
}
// ValueMappingFunction
int
intstreamValueMap(Word* args, Word& result,
int message, Word& local, Supplier s)
{
// An auxiliary type which keeps the state of this
// operation during two requests
struct Range {
int current;
int last;
int card;
bool initializedprogress;
double* attrSize;
double* attrSizeExt;
Range(CcInt* i1, CcInt* i2):
initializedprogress(false), attrSize(0), attrSizeExt(0)
{
// Do a proper initialization even if one of the
// arguments has an undefined value
if (i1->IsDefined() && i2->IsDefined())
{
current = i1->GetIntval();
last = i2->GetIntval();
}
else
{
// this initialization will create an empty stream
current = 1;
last = 0;
}
card = last - current + 1;
attrSize = new double[1];
attrSizeExt = new double[1];
attrSize[0] = i1->Sizeof(); // core size of a CcInt
attrSizeExt[0] = i1->Sizeof(); // ext size of a CcInt is the same
}
~Range() {
delete[] attrSize;
delete[] attrSizeExt;
}
};
Range* range = static_cast<Range*>(local.addr);
switch( message )
{
case OPEN: { // initialize the local storage
CcInt* i1 = static_cast<CcInt*>( args[0].addr );
CcInt* i2 = static_cast<CcInt*>( args[1].addr );
if(range){
delete range;
}
range = new Range(i1, i2);
local.addr = range;
return 0;
}
case REQUEST: { // return the next stream element
if(!range) {
return CANCEL;
} else if ( range->current <= range->last ) {
CcInt* elem = new CcInt(true, range->current++);
result.addr = elem;
return YIELD;
} else {
result.addr = 0;
return CANCEL;
}
}
case CLOSE: { // free the local storage
#ifndef USE_PROGRESS
if (range != 0) {
delete range;
local.addr = 0;
}
#endif
return 0;
}
#ifdef USE_PROGRESS
case CLOSEPROGRESS: {
if (range != 0) {
delete range;
local.addr = 0;
}
return 0;
}
case REQUESTPROGRESS: {
ProgressInfo* pRes = (ProgressInfo*) result.addr;
if(!range){
return CANCEL;
}
if( !range->initializedprogress ){
pRes->sizesChanged = true; //first call
range->initializedprogress = true;
} else {
pRes->sizesChanged = false; //sizes were not recomputed
}
pRes->Size = sizeof(CcInt); //total tuple size
// (including FLOBs)
pRes->SizeExt = sizeof(CcInt); //tuple root and extension part
pRes->noAttrs = 1; //no of attributes
pRes->attrSize = range->attrSize; // complete size per attr
pRes->attrSizeExt = range->attrSizeExt; // root +extension
// size per attr
const double feedccint = 0.001; //milliseconds per CcReal
pRes->Card = range->card; //expected cardinality
pRes->Time = (range->card) * feedccint; //expected time, [ms]
pRes->Progress = ((double)(range->card
- (range->last - range->current + 1)))
/ ((double) range->card);
pRes->BTime = 0.00001; // blocking time must not be 0
pRes->BProgress = 1.0; // blocking progress [0,1]
return YIELD;
}
#endif
default: {
/* should never happen */
assert(false);
return -1;
}
}
}
// Specification
struct intstreamInfo : OperatorInfo
{
intstreamInfo() : OperatorInfo()
{
name = "intstream";
signature = CcInt::BasicType() + " x " + CcInt::BasicType() +
" -> stream(int)";
syntax = "intstream(_ , _)";
meaning = "Creates a stream of all integers starting with the first and "
"ending with the second argument.";
supportsProgress = true;
}
};
/*
5 TestOperator
the following operator does the same as the intstream operator. But
it uses a new method for progress estimation.
5.1.1 Type Mapping
*/
ListExpr intstream2TM(ListExpr args){
string err = "int x int expected";
if(!nl->HasLength(args,2)){
return listutils::typeError(err);
}
if(!CcInt::checkType(nl->First(args)) ||
!CcInt::checkType(nl->Second(args))){
return listutils::typeError(err);
}
return nl->TwoElemList( listutils::basicSymbol<Stream<CcInt> >(),
listutils::basicSymbol<CcInt>());
}
/*
5.1.2 LocalInfo class
Don't care about progress estimation!
*/
class IntStream2Info{
public:
IntStream2Info(CcInt* i1, CcInt* i2){
if(!i1->IsDefined() || !i2->IsDefined()){
current = 1;
max = 0;
} else {
current = i1->GetValue();
max = i2->GetValue();
}
}
CcInt* next(){
return current>max?0:new CcInt(true,++current);
}
private:
int current;
int max;
};
/*
5.1.3 Value Mapping
Call the init Function of the CostEstimation class
when open is called.
*/
int
intstream2VM(Word* args, Word& result,
int message, Word& local, Supplier s){
IntStream2Info* li = (IntStream2Info*) local.addr;
switch(message){
case OPEN: { if(li){
delete li;
}
local.addr = new IntStream2Info((CcInt*) args[0].addr,
(CcInt*) args[1].addr);
qp->getCostEstimation(s)->init(args,local.addr);
return 0;
}
case REQUEST:{
if(!li){
result.addr = 0;
} else {
result.addr = li->next();
}
return result.addr?YIELD:CANCEL;
}
case CLOSE:{
if(li){
delete li;
local.addr = 0;
}
return 0;
}
}
return -1;
}
/*
5.1.4 Specification
*/
OperatorSpec intstream2Spec(
"int x int -> int",
"intstream2(min, max)",
"returns a stream of integer from min to max (both included)",
"query intstream2(3, 5) count ");
/*
5.1.5 CostEstimation
*/
class IntStream2CE: public CostEstimation{
public:
IntStream2CE():initialized(false),
firstCall(true),pi(0), timePerElem(0.001) {
}
virtual void init(Word* args, void* localInfo){
CcInt* i1 = (CcInt*) args[0].addr;
CcInt* i2 = (CcInt*) args[1].addr;
returned = 0;
initialized = true;
firstCall = true;
if(!pi){
pi = new ProgressInfo();
pi->attrSize = new double[1];
pi->attrSizeExt = new double[1];
}
if(!i1->IsDefined() || !i2->IsDefined()){
pi->Card = 0;
} else {
int v1 = i1->GetValue();
int v2 = i2->GetValue();
pi->Card = v1>v2?0: (v2 - v1) + 1;
}
pi->Size = sizeof(CcInt);
pi->SizeExt = i1->Sizeof();
pi->noAttrs = 1;
pi->attrSize[0] = pi->Size;
pi->attrSizeExt[0] = pi->SizeExt;
pi->sizesChanged = true;
pi->Time = 0;
pi->Progress = 0;
pi->BTime = 0;
pi->BProgress = 1;
}
virtual ~IntStream2CE(){
if(pi){
delete[] pi->attrSize;
delete[] pi->attrSizeExt;
delete pi;
pi = 0;
}
}
virtual int requestProgress(Word* args,
ProgressInfo* result,
void* localInfo,
const bool argsAvailable) {
if(!initialized){
return CANCEL;
}
pi->sizesChanged = firstCall;
firstCall = false;
result->CopySizes(*pi);
result->Progress = pi->Card>0?(double)returned / (double)pi->Card : 1.0;
result->Time = (double) (pi->Card - returned) * timePerElem;
return YIELD;
}
private:
bool initialized;
bool firstCall;
ProgressInfo* pi;
double timePerElem;
};
CostEstimation* intstream2CECreator(){
return new IntStream2CE();
}
/*
5.1.5 Operator instance
*/
Operator intstream2(
"intstream2",
intstream2Spec.getStr(),
intstream2VM,
Operator::SimpleSelect,
intstream2TM,
intstream2CECreator);
/*
6 Type operators
Type operators are used only for inferring argument types of parameter
functions. They have a type mapping but no evaluation function.
*/
/*
6.1 Type Operator ~STREAMELEM~
This type operator extracts the type of the elements from a stream type given
as the first argument and otherwise just forwards its type.
----
((stream T1) ...) -> T1
(T1 ...) -> T1
----
*/
ListExpr
STREAMELEMTypeMap( ListExpr args )
{
if(nl->ListLength(args) >= 1)
{
ListExpr first = nl->First(args);
if (nl->ListLength(first) == 2)
{
if (nl->IsEqual(nl->First(first), Symbol::STREAM())) {
return nl->Second(first);
}
else {
return first;
}
}
else {
return first;
}
}
return listutils::typeError("at least one argument expected");
}
const string STREAMELEMSpec =
"(( \"Signature\" \"Syntax\" \"Meaning\" \"Remarks\" )"
"( <text>((stream T1) ... ) -> T1\n"
"(T1 ... ) -> T1</text--->"
"<text>type operator</text--->"
"<text>Extracts the type of the stream elements if the first "
"argument is a stream and forwards the first argument's type "
"otherwise.</text--->"
"<text>Not for use with sos-syntax</text---> ))";
Operator STREAMELEM (
"STREAMELEM",
STREAMELEMSpec,
0,
Operator::SimpleSelect,
STREAMELEMTypeMap );
/*
6.2 Type Operator ~STREAMELEM2~
This type operator extracts the type of the elements from the stream type
within the second element within a list of argument types. Otherwise,
the first arguments type is simplyforwarded.
----
(T1 (stream T2) ...) -> T2
(T1 T2 ...) -> T2
----
*/
ListExpr
STREAMELEM2TypeMap( ListExpr args )
{
if(nl->ListLength(args) >= 2)
{
ListExpr second = nl->Second(args);
if (nl->ListLength(second) == 2)
{
if (nl->IsEqual(nl->First(second), Symbol::STREAM())) {
return nl->Second(second);
}
else {
return second;
}
}
else {
return second;
}
}
return nl->SymbolAtom(Symbol::TYPEERROR());
}
const string STREAMELEM2Spec =
"(( \"Signature\" \"Syntax\" \"Meaning\" \"Remarks\" )"
"( <text>(T1 (stream T2) ... ) -> T2\n"
"( T1 T2 ... ) -> T2</text--->"
"<text>type operator</text--->"
"<text>Extracts the type of the elements from a stream given "
"as the second argument if it is a stream. Otherwise, it forwards "
"the original type.</text--->"
"<text>Not for use with sos-syntax.</text---> ))";
Operator STREAMELEM2 (
"STREAMELEM2",
STREAMELEM2Spec,
0,
Operator::SimpleSelect,
STREAMELEM2TypeMap );
/*
6.3 Operator ~ensure~
6.3.1 The Specification
*/
struct ensure_Info : OperatorInfo {
ensure_Info(const string& opName) : OperatorInfo()
{
name = opName;
signature = "stream(T) x int -> bool";
syntax = "ensure[n]";
meaning = "Returns true if at least n tuples are received"
", otherwise false.";
}
};
/*
6.3.2 Type mapping of operator ~ensure~
stream(DATA) x int -> bool
stream(tuple(...) x int -> bool
*/
ListExpr ensure_tm( ListExpr args )
{
if(nl->ListLength(args)!=2){
ErrorReporter::ReportError("two arguments expected");
return nl->TypeError();
}
ListExpr first = nl->First(args);
ListExpr second = nl->Second(args);
if(!nl->IsEqual(second,CcInt::BasicType())){
ErrorReporter::ReportError("second argument must be of type int");
return nl->TypeError();
}
if(!listutils::isDATAStream(first) &&
!listutils::isTupleStream(first)){
ErrorReporter::ReportError("first argument must be of type"
" stream(tuple(...)) or stream(DATA)");
return nl->TypeError();
}
return nl->SymbolAtom(CcBool::BasicType());
}
int
ensure_sf( ListExpr args )
{
NList list(args);
list = list.first();
int num = 0;
NList attrs;
if ( list.checkStreamTuple(attrs) ) {
num = 0;
} else {
num = 1;
}
return num;
}
/*
6.3.3 Value mapping function of operator ~ensure~
*/
template<class T>
int ensure_vm(Word* args, Word& result, int message, Word& local, Supplier s)
{
Word elem(Address(0) );
result = qp->ResultStorage(s);
CcBool* res = static_cast<CcBool*>( result.addr );
CcInt* CcNum = (CcInt*)args[1].addr;
if(!CcNum->IsDefined()){
res->SetDefined(false);
return 0;
}
int num = CcNum->GetValue();
qp->Open(args[0].addr);
qp->Request(args[0].addr, elem);
while ((num>0) && qp->Received(args[0].addr))
{
static_cast<T*>( elem.addr )->DeleteIfAllowed();
qp->Request(args[0].addr, elem);
num--;
}
qp->Close(args[0].addr);
bool ensure = (num == 0);
res->Set( true, ensure );
return 0;
}
ValueMapping ensure_vms[] =
{
ensure_vm<Tuple>,
ensure_vm<Attribute>,
0
};
/*
6.4 Operator ~tail~
*/
/*
6.4.1 Type Mapping for Operator ~tail~:
---
(stream (tuple X)) x int ---> (append TRUE (stream (tuple X)))
(stream (tuple X)) x int x bool ---> (stream (tuple X))
(stream T) x int ---> (append TRUE (stream T))
(stream (T)) x int x bool ---> (stream T)
---
*/
ListExpr streamTypeMapTail( ListExpr args )
{
NList type(args);
bool doAppend = false;
ListExpr errorInfo = nl->OneElemList(nl->SymbolAtom("ErrorInfo"));
if(type.length() < 2 || type.length() > 3){
return NList::typeError( "Expected 2 or 3 arguments.");
}
if(type.hasLength(3)){
if(type.third() != NList(CcBool::BasicType())){
return NList::typeError( "Optional 3rd argument must be "
"'bool', if specified!");
}
}else{ // appending the default for unspecified optional 3rd argument required
doAppend = true;
}
if(type.second() != NList(CcInt::BasicType())){
return NList::typeError( "Expected 'int' for 2nd argument!");
}
// 1st argument must be a stream...
if(!( type.first().hasLength(2)
&& type.first().first().isSymbol(sym.STREAM()))){
return NList::typeError( "Expected a stream as 1st argument!");
}
NList streamtype = type.first().second();
// stream elements must be in kind DATA or (tuple X)
if( !( streamtype.hasLength(2)
&& streamtype.first().isSymbol(sym.TUPLE())
&& IsTupleDescription(streamtype.second().listExpr())
)
&& !(am->CheckKind(Kind::DATA(),streamtype.listExpr(),errorInfo))){
return NList::typeError( "Expected a stream of DATA or TUPLE.");
}
if(doAppend){
NList resType1 =NList( NList(Symbol::APPEND()),
NList(true, false).enclose(),
type.first()
);
return resType1.listExpr();
}else{
DEBUGMESSAGE("Resulttype = " << type.first().convertToString());
return type.first().listExpr();
}
}
// localinfo used within
// value mapping for stream(tuple(X)) x int [ x bool ]--> stream(tuple(X))
class TailLocalInfo
{
public:
TailLocalInfo(const int mN,
const bool mKeepOrder,
Supplier s)
: n ( mN ),
keepOrder ( mKeepOrder ),
finished ( true ),
bufferSize ( 0 ),
returnedResults( 0 ),
buffer ( (size_t)qp->GetMemorySize(s)*1024*1024 )
{
// member translationTable initialized automatically
};
~TailLocalInfo()
{
// destructor for members buffer, it and
// translationTable will be called automatically
};
// Store 'tuple' within the local buffer and delete it, if allowed.
void AppendTuple(Tuple *tuple)
{
buffer.AppendTuple( tuple ); // append current stream elem
if(bufferSize == 0){
// DEBUGMESSAGE(" Inserting the first tuple...");
finished = false;
}
bufferSize++; // increase element counter
// DEBUGMESSAGE(" Inserting tuple " << bufferSize << "/" << n);
if(bufferSize > n){
// DEBUGMESSAGE(" Queue full. Pop front.");
translationTable.pop_front(); // remove head of buffer
}
// The tuplebuffer should use subsequent tupleids starting with 0 and
// proceeding up to bufferSize.
translationTable.push_back((TupleId)(bufferSize-1)); // append tupleId
tuple->DeleteIfAllowed(); // delete appended element from memory
return;
};
// Get the next tuple from the local buffer
// set member ~finished~ when done
// return 0, iff no further result exists
Tuple* GetNextTuple()
{
TupleId Id;
// Since TupeId is defined as a long. and the tuplebuffer uses
// subsequent long values as TupleIds (starting with 0), we can simply
// enumerate all used TupleIds starting with the first one needed.
if(finished || returnedResults >= n){
// DEBUGMESSAGE(" Finished " << returnedResults << "/" << n);
finished = true;
return ((Tuple*) 0);
}
if(keepOrder){
// get first elem first
// DEBUGMESSAGE(" Getting from front");
Id = translationTable.front();
translationTable.pop_front();
}else{
// get last elem first
// DEBUGMESSAGE(" Getting from back");
Id = translationTable.back();
translationTable.pop_back();
}
returnedResults++;
// DEBUGMESSAGE(" Getting tuple " << returnedResults << "/" << n);
finished = translationTable.empty();
return buffer.GetTuple( Id , false);
};
int n;
bool keepOrder;
bool finished;
protected:
long bufferSize;
long returnedResults;
TupleBuffer buffer;
deque<TupleId> translationTable;
};
// value mapping for stream(tuple(X)) x int [ x bool ]--> stream(tuple(X))
int StreamTailTupleTreamVM(Word* args, Word& result,
int message, Word& local, Supplier s)
{
TailLocalInfo* li;
Word InputStream = args[0];
CcInt* CcN = static_cast<CcInt*>(args[1].addr);
CcBool* CcKeepOrder = static_cast<CcBool*>(args[2].addr);
Word elem;
switch( message )
{
case OPEN:{
DEBUGMESSAGE("Start OPEN");
if( !CcN->IsDefined()
|| !CcKeepOrder->IsDefined()
|| !InputStream.addr
|| CcN->GetIntval() <= 0
)
{
local.setAddr(0);
DEBUGMESSAGE("End OPEN 1");
return 0;
} // else: consume the InputStream
li = new TailLocalInfo( CcN->GetIntval(),
CcKeepOrder->GetBoolval(),
s
);
local.setAddr(li);
// open and consume the input stream
qp->Open(InputStream.addr);
qp->Request(InputStream.addr, elem); // get first stream elem
while (qp->Received(args[0].addr))
{
Tuple *tuple = static_cast<Tuple*>(elem.addr);
li->AppendTuple(tuple);
qp->Request(InputStream.addr, elem); // get next stream elem
}
// InputStream will be closed when calling CLOSE
DEBUGMESSAGE("End OPEN 2");
return 0;
}
case REQUEST:{
DEBUGMESSAGE("Start REQUEST");
if(!local.addr){ DEBUGMESSAGE("End REQUEST: CANCEL1 "); return CANCEL; }
li = static_cast<TailLocalInfo*>(local.addr);
if(li->finished){ DEBUGMESSAGE("End REQUEST: CANCEL2 "); return CANCEL; }
Tuple *restuple = li->GetNextTuple();
if(!restuple){
DEBUGMESSAGE("End REQUEST: CANCEL3 "); return CANCEL;
}
// else {
// restuple->IncReference(); // reference for the stream
// }
result.setAddr( restuple );
DEBUGMESSAGE("End REQUEST: YIELD");
return YIELD;
}
case CLOSE:{
DEBUGMESSAGE("Start CLOSE");
qp->Close(InputStream.addr);
if(local.addr){
li = static_cast<TailLocalInfo*>(local.addr);
delete li;
local.setAddr(0);
}
DEBUGMESSAGE("End CLOSE");
return 0;
}
}
return 0;
}
// localinfo used within value mapping for
// stream T x int [ x bool ]--> stream T, T in DATA
// This is a specialization of TailLocalInfo.
class DataTailLocalInfo: public TailLocalInfo
{
public:
DataTailLocalInfo(const int mN,
const bool mKeepOrder,
const ListExpr elemNumType,
Supplier s)
: TailLocalInfo( mN, mKeepOrder, s )
{
ListExpr numericElemType = elemNumType;
ListExpr attrExpr =
nl->TwoElemList(nl->SymbolAtom("elem"),numericElemType);
ListExpr tupleExpr =
nl->TwoElemList(nl->SymbolAtom(Tuple::BasicType()),
nl->OneElemList(attrExpr));
bufferTupleType = new TupleType(tupleExpr);
};
~DataTailLocalInfo()
{
if(bufferTupleType)
bufferTupleType->DeleteIfAllowed();
};
// move elem into internal tuplebuffer
void AppendElem(Attribute *elem)
{
Tuple *tuple = new Tuple(bufferTupleType);
tuple->PutAttribute( 0, elem );
AppendTuple(tuple);
// AppendTuple(...) already calls tuple->DeleteIfAllowed()!
};
// return the next element from the local tuplebuffer
Attribute* GetNextElem(){
Tuple *tuple = GetNextTuple();
if(tuple){
Attribute *elem = (tuple->GetAttribute(0))->Copy();
tuple->DeleteIfAllowed();
return elem;
} // else: No elem left!
return static_cast<Attribute*>(0);
};
private:
TupleType *bufferTupleType;
};
// value mapping for stream T x int [ x bool ]--> stream T, T in DATA
int StreamTailDataStreamVM(Word* args, Word& result,
int message, Word& local, Supplier s)
{
DataTailLocalInfo* li;
Word InputStream = args[0];
CcInt* CcN = static_cast<CcInt*>(args[1].addr);
CcBool* CcKeepOrder = static_cast<CcBool*>(args[2].addr);
Word elem;
switch( message )
{
case OPEN:{
DEBUGMESSAGE("Start OPEN");
if( !CcN->IsDefined()
|| !CcKeepOrder->IsDefined()
|| !InputStream.addr
|| CcN->GetIntval() <= 0
)
{
local.setAddr(0);
DEBUGMESSAGE("End OPEN 1");
return 0;
} // else: consume the InputStream
ListExpr elemTypeNL = nl->Second(qp->GetNumType( s ));
li = new DataTailLocalInfo( CcN->GetIntval(),
CcKeepOrder->GetBoolval(),
elemTypeNL, s
);
local.setAddr(li);
// open and consume the input stream
qp->Open(InputStream.addr);
qp->Request(InputStream.addr, elem); // get first stream elem
while (qp->Received(args[0].addr))
{
Attribute *myObj = static_cast<Attribute*>(elem.addr);
li->AppendElem(myObj); // store the tuple in a tuplebuffer
qp->Request(InputStream.addr, elem); // get next stream elem
}
// InputStream will be closed when calling CLOSE
DEBUGMESSAGE("End OPEN 2");
return 0;
}
case REQUEST:{
DEBUGMESSAGE("Start REQUEST");
if(!local.addr){ DEBUGMESSAGE("End REQUEST: CANCEL1 "); return CANCEL; }
li = static_cast<DataTailLocalInfo*>(local.addr);
if(li->finished){ DEBUGMESSAGE("End REQUEST: CANCEL2 "); return CANCEL; }
result.setAddr(li->GetNextElem()); // extract the object
if(!result.addr){
DEBUGMESSAGE("End REQUEST: CANCEL3 ");
return CANCEL;
}
DEBUGMESSAGE("End REQUEST: YIELD");
return YIELD;
}
case CLOSE:{
DEBUGMESSAGE("Start CLOSE");
qp->Close(InputStream.addr);
if(local.addr){
li = static_cast<DataTailLocalInfo*>(local.addr);
delete li;
local.setAddr(0);
}
DEBUGMESSAGE("End CLOSE");
return 0;
}
}
return 0;
}
ValueMapping streamtailmap[] =
{ StreamTailTupleTreamVM,
StreamTailDataStreamVM
};
int streamTailSelect( ListExpr args )
{
NList type(args);
if( type.first().second().hasLength(2)
&& type.first().second().first() == Tuple::BasicType())
return 0;
return 1;
}
const string StreamSpecTail =
"( ( \"Signature\" \"Syntax\" \"Meaning\" \"Example\" ) "
"( <text>stream(tuple(X)) x int [ x bool] -> stream(tuple(X))\n"
"stream(T)) x int [ x bool] -> stream(T), T in DATA</text--->"
"<text>_ tail[ n ], _ tail[ n, keepOrder ]</text--->"
"<text>Delivers only the last 'n' stream elements. Optional parameter "
"'keepOrder' controls the ordering of the result. If set to TRUE (default) "
"the original ordering is maintained. Otherwise, the tuples are returned "
"in reverse order.</text--->"
"<text>query ten feed head[6] tail[2] tconsume</text--->"
") )";
Operator streamtail( "tail",
StreamSpecTail,
2,
streamtailmap,
streamTailSelect,
streamTypeMapTail);
/*
6.6 Operator ~kinds~
6.6.1 Type Mapping
*/
ListExpr KindsTypeMap(const ListExpr args){
if(nl->ListLength(args)!=1){
ErrorReporter::ReportError("Wrong number of arguments ");
return nl->TypeError();
} else {
if(nl->IsEqual(nl->First(args),CcString::BasicType())){
return nl->TwoElemList(nl->SymbolAtom(Stream<CcString>::BasicType()),
nl->SymbolAtom(CcString::BasicType()));
} else {
ErrorReporter::ReportError("Wrong number of arguments ");
return nl->TypeError();
}
}
}
/*
6.6.2 Value Mapping
*/
class KindsLocalInfo{
public:
KindsLocalInfo(CcString* name):pos(0),kinds(){
if(!name) {
return;
}
if(!name->IsDefined()){
return;
}
string type = name->GetValue();
if(!SecondoSystem::GetCatalog()->IsTypeName(type)){
return;
}
int algId=0;
int typeId = 0;
SecondoSystem::GetCatalog()->GetTypeId(name->GetValue(),algId,typeId);
TypeConstructor* tc = am->GetTC(algId,typeId);
kinds = tc->GetKinds();
}
CcString* nextKind(){
if(pos>=kinds.size()){
return 0;
} else {
CcString* res = new CcString(true,kinds[pos]);
pos++;
return res;
}
}
private:
unsigned int pos;
vector<string> kinds;
};
int KindsVM(Word* args, Word& result,
int message, Word& local, Supplier s)
{
switch(message){
case OPEN: {
local.setAddr( new KindsLocalInfo(
static_cast<CcString*>(args[0].addr)));
return 0;
}
case REQUEST: {
KindsLocalInfo* li = static_cast<KindsLocalInfo*>(local.addr);
if(!li){
return CANCEL;
}else{
result.setAddr(li->nextKind());
return result.addr ? YIELD : CANCEL;
}
}
case CLOSE: {
KindsLocalInfo* li = static_cast<KindsLocalInfo*>(local.addr);
if(li){
delete li;
local.setAddr(0);
}
return 0;
}
default: return 0;
}
}
/*
6.6.3 Specification
*/
const string KindsSpec =
"(( \"Signature\" \"Syntax\" \"Meaning\" \"Remarks\" )"
"( <text>string -> stream(string)</text--->"
"<text>_ kinds </text--->"
"<text>Produces a stream of strings for a given type</text--->"
"<text>query string kinds transformstream consume</text---> ))";
Operator kinds (
"kinds",
KindsSpec,
KindsVM,
Operator::SimpleSelect,
KindsTypeMap );
/*
6.7 Operator ~timeout~
This operator will terminate stream procesing, when it its result is requested
a specified time after opening it. Until then, is just returns the result of its
stream predecessor.
*/
/*
Type Mapping Fubnction:
---- stream(X) x real --> stream(X)
----
*/
ListExpr TimeoutTypeMap(const ListExpr args){
if(nl->ListLength(args)!=2){
return listutils::typeError("one argument expected");
}
ListExpr first = nl->First(args);
if(!listutils::isStream(first)){
return listutils::typeError("Expected stream as 1st argument.");
}
ListExpr second = nl->Second(args);
if( !CcReal::checkType(second)) {
return listutils::typeError("Expected real as 2nd argument.");
}
return nl->First(args);
}
struct TimeoutLocalInfo {
TimeoutLocalInfo(const double _seconds):
useProgress( false ),
seconds( _seconds),
elemcounter( 0 ),
finished( false ),
streamisopen( false )
{
initial = time( 0 ); // get current time
if(seconds < 0.0){
seconds = 0.0;
finished = true;
}
};
bool useProgress; // check during RequestProgress only
time_t initial; // the time, when the stopwatch started
double seconds; // the time difference for the timeout (in seconds)
long elemcounter; // number of already returned stream elements
bool finished; // true iff finished
bool streamisopen;
};
int TimeoutVM(Word* args, Word& result, int message, Word& local, Supplier s)
{
switch(message){
case OPEN: {
// set termination conditions
TimeoutLocalInfo* li = new TimeoutLocalInfo(
(static_cast<CcReal*>(args[1].addr))->GetRealval());
local.setAddr(li);
qp->Open(args[0].addr);
li->streamisopen = true;
return 0;
}
case REQUEST: {
TimeoutLocalInfo* li = static_cast<TimeoutLocalInfo*>(local.addr);
if(!li){
return CANCEL;
} else if( li->finished
|| ( !li->useProgress
&& (difftime(time(0),li->initial) >= li->seconds)
) ) {
li->finished = true;
return CANCEL;
} else {
qp->Request(args[0].addr,result);
if(result.addr == 0){
li->finished = true;
return CANCEL;
} else {
li->elemcounter++;
return YIELD;
}
}
}
case CLOSE: {
TimeoutLocalInfo* li = static_cast<TimeoutLocalInfo*>(local.addr);
if(li && li->streamisopen){
qp->Close(args[0].addr);
}
return 0;
}
case CLOSEPROGRESS: {
TimeoutLocalInfo* li = (TimeoutLocalInfo*) local.addr;
if ( li ) {
delete li;
local.setAddr(0);
}
return 0;
}
case REQUESTPROGRESS: {
TimeoutLocalInfo* li = (TimeoutLocalInfo*) local.addr;
if( !li ) {
return CANCEL;
}
ProgressInfo *pRes;
pRes = (ProgressInfo*) result.addr;
ProgressInfo p1;
double runtime = difftime(time(0),li->initial);
li->finished = ( runtime >= li->seconds );
if ( qp->RequestProgress(args[0].addr, &p1) ) {
pRes->Copy(p1);
double myprogress = runtime / li->seconds;
if(myprogress <= 0.0){
myprogress = 0.0000001; // avoid div/0
}
double mycard = li->elemcounter / myprogress;
if(mycard <= 1){
mycard = 1; // avoid div/0
}
pRes->Progress = min(max(p1.Progress, myprogress), pRes->BProgress);
pRes->Time = min( p1.Time, li->seconds*1000 );
if( p1.BTime > pRes->Time){
pRes->Time = max(pRes->Time, p1.BTime);
}
pRes->Card = min( p1.Card, mycard); //a number between 0 and 1
} else {
return CANCEL;
}
if( !li->useProgress || p1.sizesChanged ){
li->useProgress = true;
pRes->sizesChanged = true;
}
if(li->finished){
pRes->Progress = 1.0;
}
return YIELD;
}
default: {
return 0;
}
}
}
const string TimeoutSpec =
"(( \"Signature\" \"Syntax\" \"Meaning\" \"Remarks\" )"
"( <text>stream(T) x real -> stream(T)</text--->"
"<text>_ timeout [ Seconds ]</text--->"
"<text>Stops stream processing after the specified time has passed. "
"Negative arguments are interpreted as 0.</text--->"
"<text>query intstream(1,9999999999) timeout[5.0] count</text---> ))";
Operator timeout (
"timeout",
TimeoutSpec,
TimeoutVM,
Operator::SimpleSelect,
TimeoutTypeMap );
/*
6.8 IsOrdered
Signature: stream(DATA) -> bool
Checks whether a stream is sorted.
*/
ListExpr IsOrderedTM(ListExpr args){
if(nl->ListLength(args)!=1){
return listutils::typeError("one argument expected");
}
ListExpr arg = nl->First(args);
if(!listutils::isDATAStream(arg)){
return listutils::typeError("stream of DATA expected");
}
return nl->SymbolAtom(CcBool::BasicType());
}
int IsOrderedVM(Word* args, Word& result,
int message, Word& local, Supplier s){
qp->Open(args[0].addr);
Word elem;
qp->Request(args[0].addr,elem);
Attribute* attr=0;
bool sorted=true;
while(qp->Received(args[0].addr) && sorted){
Attribute* next = static_cast<Attribute*>(elem.addr);
if(attr){
int cmp = attr->Compare(next);
if(cmp >0){
sorted = false;
}
attr->DeleteIfAllowed();
attr = next;
next = 0;
} else { // first element
attr = next;
}
if(sorted){
qp->Request(args[0].addr,elem);
}
}
if(attr){
attr->DeleteIfAllowed();
}
qp->Close(args[0].addr);
result = qp->ResultStorage(s);
CcBool* res = static_cast<CcBool*>(result.addr);
res->Set(true,sorted);
return 0;
}
const string IsOrderedSpec =
"(( \"Signature\" \"Syntax\" \"Meaning\" \"Remarks\" )"
"( <text>stream(DATA) -> bool</text--->"
"<text>_ isOrdered</text--->"
"<text>Checks whether the argument stream is sorted in ascending order"
"</text--->"
"<text>query intstream(10, 1000) isOrdered </text---> ))";
Operator isOrdered (
"isOrdered",
IsOrderedSpec,
IsOrderedVM,
Operator::SimpleSelect,
IsOrderedTM );
/*
6.9 Operator ~sbuffer~
This operator buffers an incoming stream of tuple or DATA.
*/
ListExpr sbufferTM(ListExpr args){
string err = "{stream(tuple(X)) , stream(DATA)} x int expected";
if(!nl->HasLength(args,2)){
return listutils::typeError("wrong number of args; " + err);
}
ListExpr arg = nl->First(args);
if( (Stream<Attribute>::checkType(arg) || Stream<Tuple>::checkType(arg))
&& CcInt::checkType(nl->Second(args)) ){
return arg;
}
return listutils::typeError(err);
}
template<class T>
class sbufferInfo{
public:
sbufferInfo(Word _stream, size_t _buffersize): stream(_stream),
buffersize(_buffersize){
stream.open();
output = false;
opos = 0;
eos = false;
}
~sbufferInfo(){
stream.close();
for(size_t i=0;i<buffer.size();i++){
if(buffer[i]){
buffer[i]->DeleteIfAllowed();
buffer[i] = 0;
}
}
}
T* next(){
if(buffersize==0){
return stream.request();
}
if(!output){
if(eos || !fillBuffer()){
return 0;
}
}
T* n = buffer[opos];
buffer[opos] = 0;
opos++;
if(opos==buffer.size()){
output= false;
}
return n;
}
private:
Stream<T> stream;
size_t buffersize;
vector<T*> buffer;
bool output;
size_t opos;
bool eos;
bool fillBuffer(){
T* t = stream.request();
if(!t){
return false;
}
buffer.clear();
opos = 0;
buffer.push_back(t);
while(buffer.size() < (size_t) buffersize && t){
t = stream.request();
if(t){
buffer.push_back(t);
}
}
if(t==0){
eos = true;
}
output = true;
return true;
}
};
template<class T>
int sbufferVMT(Word* args, Word& result,
int message, Word& local, Supplier s){
sbufferInfo<T>* li = (sbufferInfo<T>*) local.addr;
switch(message){
case OPEN: {
if(li) {
delete li;
}
CcInt* bs = (CcInt*) args[1].addr;
size_t size = 0;
if(bs->IsDefined() && bs->GetValue()>0){
size = bs->GetValue();
}
local.addr = new sbufferInfo<T>(args[0], size);
return 0;
}
case REQUEST:
result.addr = li?li->next():0;
return result.addr?YIELD:CANCEL;
case CLOSE:
if(li){
delete li;
local.addr = 0;
}
return 0;
}
return -1;
};
int sbufferSelect(ListExpr args){
return Attribute::checkType(nl->Second(nl->First(args)))?0:1;
}
ValueMapping sbufferVM[] = {
sbufferVMT<Attribute>,
sbufferVMT<Tuple>
};
OperatorSpec sbufferSpec(
"stream(X) -> stream(X)",
"_ sbuffer",
"Buffers a DATA or a tuple stream. ",
" query strassen feed sbuffer count" );
Operator sbufferOp(
"sbuffer",
sbufferSpec.getStr(),
2,
sbufferVM,
sbufferSelect,
sbufferTM
);
/*
16 Operators for merging two sorted attribute streams
*/
ListExpr mergeOpTM(ListExpr args){
if(!nl->HasLength(args,2)){
return listutils::typeError("expected two args");
}
if(!Stream<Attribute>::checkType(nl->First(args))
||!Stream<Attribute>::checkType(nl->Second(args))){
return listutils::typeError("two sorted DATA streams expected");
}
if(!nl->Equal(nl->First(args),nl->Second(args))){
return listutils::typeError("found different stream types");
}
return nl->First(args);
}
enum mergeOP{SEC,DIFF,UNION,MERGE};
template<mergeOP op>
class mergeOpInfo{
public:
mergeOpInfo(Word _stream1, Word _stream2):
stream1(_stream1), stream2(_stream2){
stream1.open();
stream2.open();
a1 = 0;
a2 = 0;
first = true;
}
~mergeOpInfo(){
stream1.close();
stream2.close();
if(a1){
a1->DeleteIfAllowed();
}
if(a2){
a2->DeleteIfAllowed();
}
}
Attribute* next(){
if(first){
first = false;
a1 = stream1.request();
a2 = stream2.request();
}
switch(op){
case SEC: return getNextSec();
case DIFF: return getNextDiff();
case UNION: return getNextUnion();
case MERGE: return getNextMerge();
default: return 0;
}
}
private:
Stream<Attribute> stream1;
Stream<Attribute> stream2;
bool first;
Attribute* a1, *a2;
Attribute* getNextSec(){
while(a1 && a2){
int cmp = a1->Compare(a2);
if(cmp<0){
a1->DeleteIfAllowed();
a1 = stream1.request();
} else if(cmp>0){
a2->DeleteIfAllowed();
a2 = stream2.request();
} else { // equal found
Attribute* res = a1;
a1 = stream1.request();
a2->DeleteIfAllowed();
a2 = stream2.request();
return res;
}
}
return 0;
}
Attribute* getNextDiff(){
while(a1){
if(!a2){
Attribute* res = a1;
a1 = stream1.request();
return res;
}
int cmp = a1->Compare(a2);
if(cmp<0){
Attribute* res = a1;
a1 = stream1.request();
return res;
} else if(cmp>0){
a2->DeleteIfAllowed();
a2=stream2.request();
} else {
a1->DeleteIfAllowed();
a1 = stream1.request();
}
}
return 0;
}
Attribute* getNextMerge(){
while(a1 || a2){
if(!a1){
Attribute* res = a2;
a2 = stream2.request();
return res;
}
if(!a2){
Attribute* res = a1;
a1 = stream1.request();
return res;
}
int cmp = a1->Compare(a2);
if(cmp<=0){
Attribute* res = a1;
a1 = stream1.request();
return res;
} else {
Attribute* res = a2;
a2 = stream2.request();
return res;
}
}
return 0;
}
Attribute* getNextUnion(){
while(a1 || a2){
if(!a1){
Attribute* res = a2;
a2 = stream2.request();
return res;
}
if(!a2){
Attribute* res = a1;
a1 = stream1.request();
return res;
}
int cmp = a1->Compare(a2);
if(cmp<0){
Attribute* res = a1;
a1 = stream1.request();
return res;
} else if(cmp>0) {
Attribute* res = a2;
a2 = stream2.request();
return res;
} else {
Attribute* res = a1;
a1 = stream1.request();
a2->DeleteIfAllowed();
a2 = stream2.request();
return res;
}
}
return 0;
}
};
template<mergeOP op>
int mergeOpVMT(Word* args, Word& result,
int message, Word& local, Supplier s){
mergeOpInfo<op>* li = (mergeOpInfo<op>*) local.addr;
switch(message){
case OPEN:
if(li){
delete li;
}
local.addr = new mergeOpInfo<op>(args[0],args[1]);
return 0;
case REQUEST:
result.addr = li?li->next():0;
return result.addr?YIELD:CANCEL;
case CLOSE:
if(li){
delete li;
local.addr = 0;
}
return 0;
}
return -1;
}
OperatorSpec mergeDiffSpec(
" stream(D) x stream(D) -> stream(D), w in DATA",
"_ _ mergediff",
"Compuetes the difference between two DATA streams",
"query intstream(1,10) intstream(3,7) mergediff count"
);
OperatorSpec mergeSecSpec(
" stream(D) x stream(D) -> stream(D), w in DATA",
"_ _ mergesec",
"Compuetes the intersection between two DATA streams",
"query intstream(1,10) intstream(3,7) mergesec count"
);
OperatorSpec mergeUnionSpec(
" stream(D) x stream(D) -> stream(D), w in DATA",
"_ _ mergeunion",
"Computes the union between two DATA streams",
"query intstream(1,10) intstream(3,7) mergeunion count"
);
OperatorSpec mergeSpec(
" stream(D) x stream(D) -> stream(D), w in DATA",
"_ _ merge",
"merges two sorted DATA streams into a single ine",
"query intstream(1,10) intstream(3,7) merge count"
);
Operator mergediffOp(
"mergediff",
mergeDiffSpec.getStr(),
mergeOpVMT<DIFF>,
Operator::SimpleSelect,
mergeOpTM
);
Operator mergesecOp(
"mergesec",
mergeSecSpec.getStr(),
mergeOpVMT<SEC>,
Operator::SimpleSelect,
mergeOpTM
);
Operator mergeunionOp(
"mergeunion",
mergeUnionSpec.getStr(),
mergeOpVMT<UNION>,
Operator::SimpleSelect,
mergeOpTM
);
Operator mergeOp(
"mergeattrstreams",
mergeSpec.getStr(),
mergeOpVMT<MERGE>,
Operator::SimpleSelect,
mergeOpTM
);
/*
6.15 Operator rdup
*/
ListExpr rdupTM(ListExpr args){
if(!nl->HasLength(args,1)){
return listutils::typeError("expected one arg");
}
if(!Stream<Attribute>::checkType(nl->First(args))){
return listutils::typeError("expected stream(DATA)");
}
return nl->First(args);
}
class rdupInfo{
public:
rdupInfo(Word _stream): stream(_stream),last(0),first(true){
stream.open();
}
~rdupInfo(){
if(last){
last->DeleteIfAllowed();
}
stream.close();
}
Attribute* next(){
if(first){
first = false;
last = stream.request();
if(!last) return 0;
return last->Copy();
}
if(!last){
return 0;
}
Attribute* current;
while( (current=stream.request())){
int cmp = last->Compare(current);
if(cmp==0){
current->DeleteIfAllowed();
} else {
last->DeleteIfAllowed();
last = current;
return last->Copy();
}
}
last->DeleteIfAllowed();
last = 0;
return 0;
}
private:
Stream<Attribute> stream;
Attribute* last;
bool first;
};
int rdupVM(Word* args, Word& result,
int message, Word& local, Supplier s){
rdupInfo* li = (rdupInfo*) local.addr;
switch(message){
case OPEN: if(li) delete li;
local.addr = new rdupInfo(args[0]);
return 0;
case REQUEST:
result.addr = li?li->next():0;
return result.addr?YIELD:CANCEL;
case CLOSE:
if(li){
delete li;
local.addr = 0;
}
}
return -1;
}
OperatorSpec rdupSpec(
"stream(D) -> stream(D) , D in DATA",
"_ rdup",
"removes duplicates from a sorted attribute stream",
"query intstream(1,10) intstream(1,10) merge rdup count"
);
Operator rdupOp(
"rdup",
rdupSpec.getStr(),
rdupVM,
Operator::SimpleSelect,
rdupTM
);
/*
Operator xth
*/
ListExpr xthTM(ListExpr args){
if(!nl->HasLength(args,2)){
return listutils::typeError("two args expected");
}
if(!Stream<Attribute>::checkType(nl->First(args))){
return listutils::typeError("first arg mut be astream of DATA");
}
if(!CcInt::checkType(nl->Second(args))){
return listutils::typeError("second arg must be an int");
}
return nl->Second(nl->First(args));
}
int xthVM(Word* args, Word& result,
int message, Word& local, Supplier s){
result = qp->ResultStorage(s);
Attribute* res = (Attribute*) result.addr;
CcInt* x = (CcInt*) args[1].addr;
if(!x->IsDefined()){
res->SetDefined(false);
return 0;
}
int a = x->GetValue();
if(a<1){
res->SetDefined(false);
return 0;
}
Stream<Attribute> stream(args[0]);
stream.open();
int i=1;
Attribute* r;
while( (r=stream.request())){
if(i==a){
res->CopyFrom(r);
r->DeleteIfAllowed();
stream.close();
return 0;
}
r->DeleteIfAllowed();
i++;
}
stream.close();
res->SetDefined(false);
return 0;
}
OperatorSpec xthSpec(
"stream(D) x int -> D, D in DATA",
" _ xth[_] ",
"Extract the x-th attribute form a stream. "
"If this attribute does not exist, the result is undefined",
" query intstream(1,10) xth[6] "
);
Operator xthOp(
"xth",
xthSpec.getStr(),
xthVM,
Operator::SimpleSelect,
xthTM
);
ListExpr minmaxTM(ListExpr args){
if(!nl->HasLength(args,1)){
return listutils::typeError("one arg expected");
}
if(!Stream<Attribute>::checkType(nl->First(args))){
return listutils::typeError("stream(DATA) expected");
}
return nl->Second(nl->First(args));
}
template<bool isMin>
int minmaxVM(Word* args, Word& result,
int message, Word& local, Supplier s){
result = qp->ResultStorage(s);
Attribute* res = (Attribute*) result.addr;
Attribute* value = 0;
Stream<Attribute> stream(args[0]);
stream.open();
Attribute* v;
while( (v=stream.request())){
if(!value){
value = v;
} else {
int cmp = value->Compare(v);
if(!isMin){
cmp = -cmp;
}
if(cmp>0){
value->DeleteIfAllowed();
value = v;
} else {
v->DeleteIfAllowed();
}
}
}
if(value){
res->CopyFrom(value);
value->DeleteIfAllowed();
} else {
res->SetDefined(false);
}
return 0;
}
OperatorSpec minattrSpec(
"stream(T) -> T, T in DATA",
" _ minattr",
"Retrieves the minimum value within a stream of attributes",
"query intstream(1,10) minattr"
);
OperatorSpec maxattrSpec(
"stream(T) -> T, T in DATA",
" _ maxattr",
"Retrieves the maximum value within a stream of attributes",
"query intstream(1,10) maxattr"
);
Operator minattrOp(
"minattr",
minattrSpec.getStr(),
minmaxVM<true>,
Operator::SimpleSelect,
minmaxTM
);
Operator maxattrOp(
"maxattr",
maxattrSpec.getStr(),
minmaxVM<false>,
Operator::SimpleSelect,
minmaxTM
);
/*
6.19 Operator ~nth~
*/
ListExpr nthTM(ListExpr args){
if(!nl->HasLength(args,3)){
return listutils::typeError("three args expected");
}
if(!Stream<Attribute>::checkType(nl->First(args))){
return listutils::typeError("first arg must be stream of DATA");
}
if(!CcInt::checkType(nl->Second(args))){
return listutils::typeError("second arg must be an int");
}
if(!CcBool::checkType(nl->Third(args))){
return listutils::typeError("third arg must be a bool");
}
return nl->First(args);
}
class nthInfo{
public:
nthInfo(Word _stream, int _n, bool _fixed) :
stream(_stream), n(_n), fixed(_fixed) {
stream.open();
srand(time(0));
}
~nthInfo(){
stream.close();
}
Attribute* next(){
int v = n-1;
if(!fixed){
v = rand()%n;
}
Attribute* res = 0;
for( int i=0;i<n; i++){
Attribute* cand = stream.request();
if(!cand){
if(res){
res->DeleteIfAllowed();
}
return 0;
}
if( i == v ){
res = cand;
} else {
cand->DeleteIfAllowed();
}
}
return res;
}
int getN() const{
return n;
}
private:
Stream<Attribute> stream;
int n;
bool fixed;
};
int nthVM(Word* args, Word& result,
int message, Word& local, Supplier s){
nthInfo* li = (nthInfo*) local.addr;
switch(message){
case OPEN: {
if(li){
delete li;
local.addr = 0;
}
CcInt* n = (CcInt*) args[1].addr;
CcBool* fixed = (CcBool*) args[2].addr;
if(!n->IsDefined() || !fixed->IsDefined()){
return 0;
}
int v = n->GetValue();
if(v<1){
return 0;
}
local.addr = new nthInfo(args[0],v, fixed->GetValue());
return 0;
}
case REQUEST: {
result.addr = li?li->next():0;
return result.addr?YIELD:CANCEL;
}
case CLOSE : {
return 0;
}
case REQUESTPROGRESS: {
ProgressInfo p1;
ProgressInfo* pRes;
pRes = (ProgressInfo*) result.addr;
if (qp-> RequestProgress(args[0].addr, &p1) ) {
pRes->Copy(p1);
int n = li?li->getN():1;
pRes->Card = p1.Card/n;
return YIELD;
} else {
return CANCEL;
}
}
case CLOSEPROGRESS:
if(li){
delete li;
local.addr = 0;
}
return 0;
}
return -1;
}
OperatorSpec nthSpec(
"stream(D) x int x bool -> stream(D), D in DATA",
"_ nth[_,_]",
"Extracts each nth element from a attribute stream if "
"the boolean value is TRUE. In the false case from a "
"block of n attributes, randomly one is chosen.",
"query intstream(1,10) nth[2,TRUE] count"
);
Operator nthOp(
"nth",
nthSpec.getStr(),
nthVM,
Operator::SimpleSelect,
nthTM
);
/*
8.12 Operators sum and avg
*/
ListExpr sumTM(ListExpr args){
if(!nl->HasLength(args,1)){
return listutils::typeError("1 arg expected");
}
ListExpr a = nl->First(args);
if( !Stream<CcInt>::checkType(a)
&& !Stream<LongInt>::checkType(a)
&& !Stream<CcReal>::checkType(a)){
return listutils::typeError("stream(T) expected, T in {int,longint,real}");
}
return nl->Second(a);
}
ListExpr avgTM(ListExpr args){
if(!nl->HasLength(args,1)){
return listutils::typeError("1 arg expected");
}
ListExpr a = nl->First(args);
if( !Stream<CcInt>::checkType(a)
&& !Stream<LongInt>::checkType(a)
&& !Stream<CcReal>::checkType(a)){
return listutils::typeError("stream(T) expected, T in {int,longint,real}");
}
return listutils::basicSymbol<CcReal>();
}
template<class C, bool avg>
int sumavgVMT(Word* args, Word& result,
int message, Word& local, Supplier s){
result = qp->ResultStorage(s);
typename C::ctype sum = 0;
Stream<C> stream(args[0]);
C* v;
stream.open();
errno=0;
int count=0;
while( (v=stream.request())){
if(v->IsDefined()){
typename C::ctype val = v->GetValue();
sum = sum + val;
if(errno!=0){ // overflow detected
((Attribute*) result.addr)->SetDefined(false);
stream.close();
return 0;
}
count++;
} // ignore undefined values ??
v->DeleteIfAllowed();
}
stream.close();
if(avg){
if(count>0){
((CcReal*)result.addr)->Set(true, (double)sum/count);
}else {
((CcReal*)result.addr)->Set(false,0);
}
} else {
((C*)result.addr)->Set(true,sum);
}
return 0;
}
ValueMapping avgVM[] = {
sumavgVMT<CcInt,true>,
sumavgVMT<CcReal,true>,
sumavgVMT<LongInt,true>
};
ValueMapping sumVM[] = {
sumavgVMT<CcInt,false>,
sumavgVMT<CcReal,false>,
sumavgVMT<LongInt,false>
};
OperatorSpec avgSpec(
"stream(T) -> T, T in {int,real,longing}",
" _ avgattr",
"computes the average of a numeric attribute stream "
"ignoring undefined values.",
"query intstream(1,10) avgattr"
);
OperatorSpec sumSpec(
"stream(T) -> T, T in {int,real,longing}",
" _ sumattr",
"computes the sum of a numeric attribute stream "
"ignoring undefined values.",
"query intstream(1,10) sumattr"
);
int avgsumSelect(ListExpr args){
ListExpr a = nl->Second(nl->First(args));
if(CcInt::checkType(a)) return 0;
if(CcReal::checkType(a)) return 1;
if(LongInt::checkType(a)) return 2;
return -1;
}
Operator avgattrOp(
"avgattr",
avgSpec.getStr(),
3,
avgVM,
avgsumSelect,
avgTM
);
Operator sumattrOp(
"sumattr",
sumSpec.getStr(),
3,
sumVM,
avgsumSelect,
sumTM
);
/*
7.21 ~consume~ for attribute streams
*/
ListExpr consumeTM(ListExpr args){
if(!nl->HasLength(args,1)){
return listutils::typeError("one arg expected");
}
ListExpr a = nl->First(args);
if(!Stream<Attribute>::checkType(a)){
return listutils::typeError("expected attribute stream");
}
ListExpr at = nl->Second(a);
if(listutils::isSymbol(at,"arel")){
return listutils::typeError("attribute relations are not supported");
}
ListExpr attrList = nl->OneElemList(
nl->TwoElemList( nl->SymbolAtom("Elem"), at));
ListExpr res = nl->TwoElemList(
listutils::basicSymbol<Relation>(),
nl->TwoElemList(
listutils::basicSymbol<Tuple>(),
attrList));
return res;
}
int consumeVM(Word* args, Word& result,
int message, Word& local, Supplier s){
result = qp->ResultStorage(s);
GenericRelation* res = (GenericRelation*) result.addr;
Stream<Attribute> stream(args[0]);
ListExpr tupleType = nl->Second(qp->GetNumType(s));
TupleType* tt = new TupleType(tupleType);
Attribute* a;
stream.open();
while((a=stream.request())){
Tuple* tuple = new Tuple(tt);
tuple->PutAttribute(0,a);
res->AppendTuple(tuple);
tuple->DeleteIfAllowed();
}
stream.close();
tt->DeleteIfAllowed();
return 0;
}
OperatorSpec consumeSpec(
"stream(D) -> rel(tuple((Elem D))), D in DATA",
" _ consume",
"Collects an attribute stream into a relation.",
"query intstream(1,10) consume"
);
Operator consumeOp(
"consume",
consumeSpec.getStr(),
consumeVM,
Operator::SimpleSelect,
consumeTM
);
/*
9.17 operator ~ts~
*/
ListExpr tsTM(ListExpr args){
if(!nl->HasLength(args,2)){
return listutils::typeError("expected a stream and a list of funs");
}
if(!Stream<Attribute>::checkType(nl->First(args))){
return listutils::typeError("first arg is not an attribute stream");
}
ListExpr a = nl->Second(nl->First(args)); // type within stream
ListExpr funlist = nl->Second(args);
if(nl->AtomType(funlist)!=NoAtom){
return listutils::typeError("second arg is not a list");
}
ListExpr attrList= nl->TheEmptyList();
ListExpr last = nl->TheEmptyList();
bool first = true;
while(!nl->IsEmpty(funlist)){
ListExpr fun = nl->First(funlist);
funlist = nl->Rest(funlist);
if(!nl->HasLength(fun,2)){
return listutils::typeError("2nd arg contains an element which "
"is not a named function");
}
if(nl->AtomType(nl->First(fun))!=SymbolType){
return listutils::typeError("found invalid attribute name");
}
ListExpr an = nl->First(fun);
string n = nl->SymbolValue(an);
ListExpr map = nl->Second(fun);
if(!listutils::isMap<1>(map)){
return listutils::typeError("invalid function definition for "+n);
}
if(!nl->Equal(a, nl->Second(map))){
return listutils::typeError("function argument for " + n
+ " differs to the stream element");
}
ListExpr funRes = nl->Third(map);
if(!Attribute::checkType(funRes)){
return listutils::typeError("function result of " + n
+ " not in kind DATA");
}
ListExpr attr = nl->TwoElemList(an, funRes);
if(first){
attrList = nl->OneElemList(attr);
last = attrList;
first = false;
} else {
last = nl->Append(last, attr);
}
}
if(!listutils::isAttrList(attrList)){
return listutils::typeError("name conflicts or invalid names "
"in attributes");
}
return nl->TwoElemList( listutils::basicSymbol<Stream<Tuple> >(),
nl->TwoElemList(
listutils::basicSymbol<Tuple>(),
attrList));
}
class tsInfo{
public:
tsInfo(Word* args, ListExpr resTuple):
stream(args[0]){
stream.open();
tt = new TupleType(resTuple);
supplier = args[1].addr;
nofuns = qp->GetNoSons(supplier);
for(int i=0; i < nofuns;i++){
Supplier supplier2 = qp->GetSupplier(supplier, i);
Supplier supplier3 = qp->GetSupplier(supplier2, 1);
ArgVectorPointer funargs1 = qp->Argument(supplier3);
funs.push_back(supplier3);
funargs.push_back(funargs1);
}
}
~tsInfo(){
stream.close();
tt->DeleteIfAllowed();
}
Tuple* next(){
Attribute* arg = stream.request();
if(!arg){
return 0;
}
Tuple* tuple = new Tuple(tt);
Word value;
for(size_t i=0;i<funs.size();i++){
(*(funargs[i]))[0].setAddr(arg);
qp->Request(funs[i],value);
tuple->PutAttribute(i,((Attribute*)value.addr)->Clone());
}
arg->DeleteIfAllowed();
return tuple;
}
private:
Stream<Attribute> stream;
TupleType* tt;
Supplier supplier;
int nofuns;
vector<Supplier> funs;
vector<ArgVectorPointer> funargs;
};
int tsVM(Word* args, Word& result,
int message, Word& local, Supplier s){
tsInfo* li = (tsInfo*) local.addr;
switch(message){
case OPEN :
if(li) delete li;
local.addr = new tsInfo(args, nl->Second(GetTupleResultType(s)));
return 0;
case REQUEST:
result.addr = li?li->next():0;
return result.addr?YIELD:CANCEL;
case CLOSE:
if(li){
delete li;
local.addr = 0;
}
return 0;
}
return -1;
}
OperatorSpec tsSpec(
"stream(T) x funlist(T->U_i) -> stream(tuple(Ui)) ",
"_ ts[_,_] ",
"Creates a tuple stream from a stream of attributes "
" using a list of named functions",
"query intstream(1,10) ts[N1 : . + 1, N2 : . - 1] consume"
);
Operator tsOp(
"ts",
tsSpec.getStr(),
tsVM,
Operator::SimpleSelect,
tsTM
);
/*
7.19 Operator ~as~
*/
ListExpr asTM(ListExpr args){
if(!nl->HasLength(args,2)){
return listutils::typeError("stream(tuple) x fun(tuple -> attr) expected");
}
if(!Stream<Tuple>::checkType(nl->First(args))){
return listutils::typeError("first arg is not a tuple stream");
}
ListExpr fun = nl->Second(args);
if(!listutils::isMap<1>(fun)){
return listutils::typeError("second arg is not an unary function");
}
ListExpr tt = nl->Second(nl->First(args));
if(!nl->Equal(tt, nl->Second(fun))){
return listutils::typeError("function argument and tuple in stream "
"differ");
}
ListExpr funres = nl->Third(fun);
if(!Attribute::checkType(funres)){
return listutils::typeError("funres not in kind DATA");
}
return nl->TwoElemList(
listutils::basicSymbol<Stream<Attribute> >(),
funres);
}
class asInfo{
public:
asInfo(Word _stream, Word _fun):
stream(_stream){
stream.open();
fun = _fun.addr;
funargs = qp->Argument(fun);
}
~asInfo(){
stream.close();
}
Attribute* next(){
Tuple* tuple = stream.request();
if(!tuple){
return 0;
}
Word value;
(*funargs)[0].setAddr(tuple);
qp->Request(fun,value);
Attribute* res = ((Attribute*)value.addr)->Clone();
tuple->DeleteIfAllowed();
return res;
}
private:
Stream<Tuple> stream;
Supplier fun;
ArgVectorPointer funargs;
};
int asVM(Word* args, Word& result,
int message, Word& local, Supplier s){
asInfo* li = (asInfo*) local.addr;
switch(message){
case OPEN :
if(li) delete li;
local.addr = new asInfo(args[0], args[1]);
return 0;
case REQUEST:
result.addr = li?li->next():0;
return result.addr?YIELD:CANCEL;
case CLOSE:
if(li){
delete li;
local.addr = 0;
}
return 0;
}
return -1;
}
OperatorSpec asSpec(
"stream(tuple) x fun(tuple->attr) -> stream(attr)",
"_ as [_] ",
"Converts a tuple stream into an attribute stream "
"using a function.",
"query ten feed as[. + 3] count"
);
Operator asOp(
"as",
asSpec.getStr(),
asVM,
Operator::SimpleSelect,
asTM
);
/*
Operator ~streamfun~
*/
ListExpr streamfunTM(ListExpr args){
if(!nl->HasLength(args,2)){
return listutils::typeError("two args expected");
}
if(!(Stream<Attribute>::checkType(nl->First(args))
|| Stream<Tuple>::checkType(nl->First(args)))){
return listutils::typeError("first arg must be a stream "
"of DATA or a stream of tuple");
}
if(!listutils::isMap<1>(nl->Second(args))){
return listutils::typeError("second arg is not an unary function");
}
if(!nl->Equal(nl->Second(nl->First(args)), nl->Second(nl->Second(args)))){
return listutils::typeError("function argument and stream "
"elem type differ");
}
ListExpr funres = nl->Third(nl->Second(args));
if(nl->HasLength(funres,2)
&& listutils::isSymbol(nl->First(funres), "stream")){
return listutils::typeError("function result cannot be a stream");
}
return nl->First(args);
}
template<class T>
class streamfunInfo{
public:
streamfunInfo( Word& _stream, Word& _fun,
int _f = 1):
stream(_stream), fun(_fun.addr){
funarg = qp->Argument(fun);
stream.open();
count = 0;
f = _f;
}
~streamfunInfo(){
stream.close();
}
T* next(){
T* in = stream.request();
if(!in) return 0;
count++;
if(count % f == 0){
(*funarg)[0] = in;
qp->Request(fun,funres);
count = 0;
}
return in;
}
private:
Stream<T> stream;
Supplier fun;
ArgVectorPointer funarg;
Word funres;
int f;
int count;
};
template<class T>
int streamfunVMT(Word* args, Word& result,
int message, Word& local, Supplier s){
streamfunInfo<T>* li = (streamfunInfo<T>*) local.addr;
switch(message){
case OPEN : {
if(li) delete li;
local.addr = new streamfunInfo<T>(args[0],args[1]);
return 0;
}
case REQUEST:
result.addr = li?li->next():0;
return result.addr?YIELD:CANCEL;
case CLOSE:{
if(li){
delete li;
local.addr = 0;
}
return 0;
}
}
return -1;
}
int streamfunSelect(ListExpr args){
return Stream<Attribute>::checkType(nl->First(args))?0:1;
}
ValueMapping streamfunVM[] = {
streamfunVMT<Attribute>,
streamfunVMT<Tuple>
};
OperatorSpec streamfunSpec(
"stream(T) x fun(T)->.. -> stream(T), T in {DATA,tuple}",
"_ streamfun[_]",
"Performs a function for each element in a stream. "
" The result is ignored, thus this operator is only useful "
"for functions with side effects.",
"query intstream(1,10) streamfun( TRUE echo[.] ) count"
);
Operator streamfunOp(
"streamfun",
streamfunSpec.getStr(),
2,
streamfunVM,
streamfunSelect,
streamfunTM
);
/*
Operator prog
calls a function each xth stream element
*/
ListExpr progTM(ListExpr args){
if(!nl->HasLength(args,3)){
return listutils::typeError("two args expected");
}
if(!(Stream<Attribute>::checkType(nl->First(args))
|| Stream<Tuple>::checkType(nl->First(args)))){
return listutils::typeError("first arg must be a stream "
"of DATA or a stream of tuple");
}
if(!listutils::isMap<1>(nl->Second(args))){
return listutils::typeError("second arg is not an unary function");
}
if(!nl->Equal(nl->Second(nl->First(args)), nl->Second(nl->Second(args)))){
return listutils::typeError("function argument and stream "
"elem type differ");
}
ListExpr funres = nl->Third(nl->Second(args));
if(nl->HasLength(funres,2)
&& listutils::isSymbol(nl->First(funres), "stream")){
return listutils::typeError("function result cannot be a stream");
}
if(!CcInt::checkType(nl->Third(args))){
return listutils::typeError("third argument has to be of type int");
}
return nl->First(args);
}
template<class T>
int progVMT(Word* args, Word& result,
int message, Word& local, Supplier s){
streamfunInfo<T>* li = (streamfunInfo<T>*) local.addr;
switch(message){
case OPEN : {
if(li) delete li;
int f =1;
CcInt* F = (CcInt*) args[2].addr;
if(F->IsDefined() ){
f = std::max(f,F->GetValue());
}
local.addr = new streamfunInfo<T>(args[0],args[1],f);
return 0;
}
case REQUEST:
result.addr = li?li->next():0;
return result.addr?YIELD:CANCEL;
case CLOSE:{
if(li){
delete li;
local.addr = 0;
}
return 0;
}
}
return -1;
}
int progSelect(ListExpr args){
return Stream<Attribute>::checkType(nl->First(args))?0:1;
}
ValueMapping progVM[] = {
progVMT<Attribute>,
progVMT<Tuple>
};
OperatorSpec progSpec(
"stream(T) x fun(T)->.. x int -> stream(T), T in {DATA,tuple}",
"_ streamfun[_]",
"Performs a function for each xth element in a stream. "
"x is defined by the last argument."
" The result is ignored. ",
"query intstream(1,10) prog( TRUE echo[.] , 3 ) count"
);
Operator progOp(
"prog",
progSpec.getStr(),
2,
progVM,
progSelect,
progTM
);
/*
1.23 Operator ~delayS~
*/
ListExpr delaySTM(ListExpr args){
if(!nl->HasLength(args,2) && !nl->HasLength(args,3)){
return listutils::typeError("2 or 3 arguments required");
}
if(!listutils::isStream(nl->First(args))){
return listutils::typeError("first argument has to be a stream");
}
if(!CcInt::checkType(nl->Second(args))){
return listutils::typeError("second argument is not an int");
}
if(nl->HasLength(args,3) && !CcInt::checkType(nl->Third(args))){
return listutils::typeError("third argument is not an int");
}
return nl->First(args);
}
int delaySVM(Word* args, Word& result,
int message, Word& local, Supplier s){
std::pair<size_t,size_t>* li = (std::pair<size_t,size_t>*) local.addr;
switch(message){
case OPEN:
{ qp->Open(args[0].addr);
if(li){
delete li;
local.addr = 0;
}
CcInt* T1 = (CcInt*) args[1].addr;
if(!T1->IsDefined()) return 0;
int t1 = T1->GetValue();
if(t1<0) return 0;
int t2 = t1;
if(qp->GetNoSons(s)==3){
T1 = (CcInt*) args[2].addr;
if(!T1->IsDefined()){
return 0;
}
t2 = T1->GetValue();
}
if(t1>t2){
return 0;
}
local.addr = new pair<size_t,size_t>( ((size_t)t1),
((size_t)t2));
return 0;
}
case REQUEST:
qp->Request(args[0].addr, result);
if(!qp->Received(args[0].addr)) return CANCEL;
if(li){
size_t t;
if(li->first==li->second){
t = li->first;
} else {
t = rand() % (li->second - li->first) + li->first;
}
usleep(((size_t)t)*1000u);
}
return YIELD;
case CLOSE : if(li){
delete li;
local.addr = 0;
}
return 0;
}
return -1;
}
OperatorSpec delaySSpec(
"stream(X) x int [x int] -> stream(X)",
"_ delayS[_,_]",
"Delays the transfer of each element in the stream by a "
"certain duration (in millisecond). If the oprional "
"argument is present, the delay will be a random number "
"between the first and the second number.",
"query plz feed delayS[100,500] printstream count;"
);
Operator delaySOp(
"delayS",
delaySSpec.getStr(),
delaySVM,
Operator::SimpleSelect,
delaySTM
);
/*
1.39 sync
*/
ListExpr syncTM(ListExpr args){
if(!nl->HasLength(args,4)){
return listutils::typeError("4 arguments expected");
}
ListExpr stream = nl->First(args);
if(!listutils::isStream(stream)){
return listutils::typeError("stream required as the first argument");
}
ListExpr fun = nl->Second(args);
if(!listutils::isMap<1>(fun)){
return listutils::typeError("second arg is not an unary function");
}
if(!nl->Equal(nl->Second(stream), nl->Second(fun))){
return listutils::typeError("type of stream and type of function "
"argument differ");
}
if(listutils::isStream(nl->Third(fun))){
return listutils::typeError("function result cannot be a stream.");
}
if(!CcInt::checkType(nl->Third(args))){
return listutils::typeError("third arg must be an int");
}
if(!CcReal::checkType(nl->Fourth(args))){
return listutils::typeError("fourth arg must be a real");
}
return stream;
}
class syncInfo{
public:
syncInfo(Word& _stream, Supplier _fun, int _minTuples, int _minTime):
stream(_stream), fun(_fun), minElems(_minTuples){
SmiEnvironment::CommitTransaction(); //close current transaction
qp->Open(stream.addr);
av = qp->Argument(fun);
elems = 0;
lastTime = getTime();
minTime = _minTime>0?_minTime:0;
}
~syncInfo(){
qp->Close(stream.addr);
SmiEnvironment::BeginTransaction();
}
void* next(){
qp->Request(stream.addr,result);
if(!qp->Received(stream.addr)){
return 0;
}
elems++;
check(result);
return result.addr;
}
private:
Word stream;
Supplier fun;
int minElems;
size_t minTime;
ArgVectorPointer av;
int elems;
struct timeval tp;
size_t lastTime;
Word result;
Word funres;
size_t getTime(){
gettimeofday(&tp, NULL);
return (size_t)tp.tv_sec*1000+tp.tv_usec/1000;
}
void check(Word& w){
if(elems >= minElems){
elems = 0;
size_t t2 = getTime();
size_t k = t2 - lastTime;
if(k >= minTime){
lastTime = t2;
(*av)[0] = w;
SmiEnvironment::BeginTransaction();
qp->Request(fun,funres);
SmiEnvironment::CommitTransaction();
}
}
}
void newTransaction(){
SmiEnvironment::CommitTransaction();
SmiEnvironment::BeginTransaction();
}
};
int syncVM(Word* args, Word& result,
int message, Word& local, Supplier s){
syncInfo* li = (syncInfo*) local.addr;
switch(message){
case OPEN: {
if(li){
delete li;
local.addr = 0;
}
CcInt* minTu = (CcInt*) args[2].addr;
if(!minTu->IsDefined()){
return 0;
}
CcReal* minTime = (CcReal*) args[3].addr;
if(!minTime->IsDefined()){
return 0;
}
local.addr = new syncInfo(args[0], args[1].addr,
minTu->GetValue(),
(int)(minTime->GetValue()*1000));
return 0;
}
case REQUEST: result.addr = li?li->next():0;
return result.addr?YIELD:CANCEL;
case CLOSE: if(li){
delete li;
local.addr = 0;
}
return 0;
}
return -1;
}
OperatorSpec syncSpec(
"stream x fun x int x real -> stream",
"_ sync[_,_,_] ",
"After minElements in stream (3rd arg) and "
" after minimum t seconds (4th arg), this "
" operator evaluates it's parameter function (2nd arg),"
" commits the running transaction and starts a new one",
" query plz feed sync[ plz feed count, 100, 0.001] count"
);
Operator syncOp(
"sync",
syncSpec.getStr(),
syncVM,
Operator::SimpleSelect,
syncTM
);
ListExpr printStreamMessagesTM(ListExpr args){
if(!nl->HasLength(args,1)){
return listutils::typeError("one argument expected");
}
if(!listutils::isStream(nl->First(args))){
return listutils::typeError("stream expected");
}
return nl->First(args);
}
int printStreamMessagesVM(Word* args, Word& result,
int message, Word& local, Supplier s){
switch(message){
case OPEN : cout << "OPEN" << endl;
qp->Open(args[0].addr);
return 0;
case REQUEST : cout << "REQUEST" << endl;
qp->Request(args[0].addr, result);
return qp->Received(args[0].addr)?YIELD:CANCEL;
case CLOSE : cout << "CLOSE" << endl;
qp->Close(args[0].addr);
return 0;
case REQUESTPROGRESS:
cout << "REQUESTPROGRESS" << endl;
return qp->RequestProgress(args[0].addr,
(ProgressInfo*)result.addr);
case CLOSEPROGRESS:
cout << "CLOSEPROGRESS" << endl;
return 0;
default : return -1;
}
}
OperatorSpec printStreamMessagesSpec(
"stream(X) -> stream(X) ",
"_ op ",
"Debug operator, prints out messages send to value mapping.",
" query ten feed printStreamMessages count"
);
Operator printStreamMessagesOp(
"printStreamMessages",
printStreamMessagesSpec.getStr(),
printStreamMessagesVM,
Operator::SimpleSelect,
printStreamMessagesTM
);
/*
Operator ~contains~
*/
ListExpr containsTM(ListExpr args){
if(!nl->HasLength(args,2)){
return listutils::typeError("2 arguments expected");
}
if(!Stream<Attribute>::checkType(nl->First(args))){
return listutils::typeError("first argument not a stream of DATA");
}
if(!Attribute::checkType(nl->Second(args))){
return listutils::typeError("second argument not in kind DATA");
}
if(!nl->Equal(nl->Second(nl->First(args)), nl->Second(args))){
return listutils::typeError("stream type and type of second "
"argument differ");
}
return listutils::basicSymbol<CcBool>();
}
int containsVM(Word* args, Word& result,
int message, Word& local, Supplier s){
Stream<Attribute> stream(args[0]);
Attribute* elem = (Attribute*) args[1].addr;
stream.open();
Attribute* a;
bool found = false;
while(!found && (a=stream.request())!=0){
found = a->Compare(elem) == 0;
a->DeleteIfAllowed();
}
stream.close();
result = qp->ResultStorage(s);
CcBool* res = (CcBool*) result.addr;
res->Set(true,found);
return 0;
}
OperatorSpec containsSpec(
"stream<D> x D -> bool , with D in DATA",
"_ contains _ ",
"Checks whether the second argument is element of the stream "
"given as the first argument",
"query intstream(0,23) contains 10"
);
Operator containsOp(
"contains",
containsSpec.getStr(),
containsVM,
Operator::SimpleSelect,
containsTM
);
/*
Operator ~some~
*/
ListExpr someTM(ListExpr args){
if(!nl->HasLength(args,2)){
return listutils::typeError("two arguments required");
}
if(!Stream<Tuple>::checkType(nl->First(args))
&& !Stream<Attribute>::checkType(nl->First(args))){
return listutils::typeError("the first argument is not a tuple stream"
" and not a stream of DATA");
}
if(!CcInt::checkType(nl->Second(args))){
return listutils::typeError("the second argument ist not an int");
}
return nl->First(args);
}
template<class T>
class someInfo{
public:
someInfo(Word _stream, size_t bs): stream(_stream), bufferSize(bs),
count(0),currentOut(-1){
stream.open();
init();
}
~someInfo(){
for(size_t i=currentOut+1; i< buffer.size(); i++){
buffer[i]->DeleteIfAllowed();
}
stream.close();
}
T* next(){
currentOut++;
if(currentOut >= (int)buffer.size()){
return 0;
}
T* res = buffer[currentOut];
buffer[currentOut] = 0;
return res;
}
private:
Stream<T> stream;
size_t bufferSize;
size_t count;
int currentOut;
std::vector<T*> buffer;
void init(){
T* tup;
while( (tup = stream.request()) != nullptr){
count++;
insert(tup);
}
}
void insert(T* tup){
if(buffer.size() < bufferSize){
buffer.push_back(tup);
return;
}
// create a random number between 0 and count-1
// note, count contains already the current tuple
size_t rnd = std::rand()/((RAND_MAX + 1u)/count);
if(rnd < buffer.size()){
buffer[rnd]->DeleteIfAllowed();
buffer[rnd]=tup;
} else {
tup->DeleteIfAllowed();
}
}
};
template<class T>
int someVMT(Word* args, Word& result,
int message, Word& local, Supplier s){
someInfo<T>* li = (someInfo<T>*) local.addr;
switch(message){
case OPEN: {
if(li) {
delete li;
local.addr =0;
}
CcInt* bs = (CcInt*) args[1].addr;
if(bs->IsDefined()){
int s = bs->GetValue();
if(s>0) {
local.addr = new someInfo<T>(args[0], s);
}
}
return 0;
}
case REQUEST : result.addr = li?li->next():0;
return result.addr?YIELD:CANCEL;
case CLOSE : {
if(li){
delete li;
local.addr = 0;
}
return 0;
}
}
return -1;
}
ValueMapping someVM[] = {
someVMT<Tuple>,
someVMT<Attribute>
};
int someSelect(ListExpr args){
return Stream<Attribute>::checkType(nl->First(args))?1:0;
}
OperatorSpec someSpec(
"stream(X) x int -> stream(X) , X in {TUPLE, DATA}",
"_ some [_] ",
"Creates a random selection from the stream of a given size "
"using reservoir sampling.",
"query plz feed some[200] count"
);
Operator someOp(
"some",
someSpec.getStr(),
2,
someVM,
someSelect,
someTM
);
/*
Operator ~sideEffect~
*/
ListExpr sideEffectTM(ListExpr args){
if(!nl->HasLength(args,2)){
return listutils::typeError("2 arguments expected");
}
if(!listutils::isStream(nl->First(args))){
return listutils::typeError("first argument is not a stream");
}
ListExpr streamElem = nl->Second(nl->First(args));
if(!listutils::isMap<2>(nl->Second(args))){
return listutils::typeError("second argument is not a binary function");
}
ListExpr fun = nl->Second(args);
ListExpr fa1 = nl->Second(fun);
ListExpr fa2 = nl->Third(fun);
ListExpr funres = nl->Fourth(fun);
if(!nl->Equal(streamElem, fa1)){
return listutils::typeError("types of stream and first "
"function argument differ");
}
if(!CcInt::checkType(fa2)){
return listutils::typeError("second function arguemnt is not an int");
}
if(!Attribute::checkType(funres)){
return listutils::typeError("function result not in kind data");
}
return nl->First(args);
}
class sideEffectInfo{
public:
sideEffectInfo(Word& _stream, Word& _fun):
stream(_stream), fun(_fun), count(0) {
funargs = qp->Argument(fun.addr);
qp->Open(stream.addr);
(*funargs)[1] = SetWord(new CcInt(true,0));
}
~sideEffectInfo(){
((CcInt*) (*funargs)[1].addr)->DeleteIfAllowed();
qp->Close(stream.addr);
}
void* next(){
qp->Request(stream.addr, elem);
if(!qp->Received(stream.addr)) {
return 0;
}
(*funargs)[0] = elem;
((CcInt*)((*funargs)[1].addr))->Set(true,++count);
qp->Request(fun.addr,funres);
return elem.addr;
}
private:
Word stream;
Word fun;
size_t count;
ArgVectorPointer funargs;
Word funres;
Word elem;
};
int sideEffectVM(Word* args, Word& result,
int message, Word& local, Supplier s){
sideEffectInfo* li = (sideEffectInfo*) local.addr;
switch(message){
case OPEN: if(li) delete li;
local.addr = new sideEffectInfo(args[0], args[1]);
return 0;
case REQUEST: result.addr = li?li->next():0;
return result.addr?YIELD:CANCEL;
case CLOSE : if(li){
delete li;
local.addr = 0;
}
return 0;
}
return -1;
}
OperatorSpec sideEffectSpec(
"stream(X) x fun(X x int -> a) -> stream(X) , a in DATA",
"_ sideEffect[_] ",
"Evaluates a function for each element in the stream ignoring the result. "
"The second function arguemnt is set automatically to the number of "
"the current elements (counting starts with 1)",
"query plz feed sideEffect[ 1 echo[.Plz] ] count "
);
Operator sideEffectOp{
"sideEffect",
sideEffectSpec.getStr(),
sideEffectVM,
Operator::SimpleSelect,
sideEffectTM
};
/*
7 Creating the Algebra
*/
class StreamAlgebra : public Algebra
{
public:
StreamAlgebra() : Algebra()
{
AddOperator( &streamcount );
AddOperator( &streamprintstream );
AddOperator( &printstream2Op );
AddOperator( &streamtransformstream );
AddOperator( &projecttransformstream );
AddOperator( &namedtransformstream );
AddOperator( &streamfeed );
AddOperator( &streamuse );
AddOperator( &streamuse2 );
AddOperator( &streamaggregateS );
AddOperator( &streamfilter );
AddOperator( ensure_Info("ensure"), ensure_vms, ensure_sf, ensure_tm );
AddOperator( &echo );
AddOperator( realstreamInfo(), realstreamFun, realstreamTypeMap );
AddOperator( intstreamInfo(), intstreamValueMap, intstreamTypeMap );
AddOperator( &STREAMELEM );
AddOperator( &STREAMELEM2 );
AddOperator( &streamtail );
streamtail.SetUsesMemory();
AddOperator( &kinds);
AddOperator( &timeout);
AddOperator( &isOrdered);
AddOperator( &sbufferOp);
AddOperator(&intstream2);
AddOperator(&mergediffOp);
AddOperator(&mergesecOp);
AddOperator(&mergeunionOp);
AddOperator(&mergeOp);
AddOperator(&rdupOp);
AddOperator(&xthOp);
AddOperator(&minattrOp);
AddOperator(&maxattrOp);
AddOperator(&nthOp);
AddOperator(&avgattrOp);
AddOperator(&sumattrOp);
AddOperator(&consumeOp);
AddOperator(&tsOp);
AddOperator(&asOp);
AddOperator(&streamfunOp);
AddOperator(&progOp);
AddOperator(&delaySOp);
AddOperator(&syncOp);
AddOperator(&printStreamMessagesOp);
AddOperator(&containsOp);
AddOperator(&someOp);
AddOperator(&sideEffectOp);
#ifdef USE_PROGRESS
streamcount.EnableProgress();
streamtransformstream.EnableProgress();
namedtransformstream.EnableProgress();
streamfeed.EnableProgress();
streamfilter.EnableProgress();
timeout.EnableProgress();
nthOp.EnableProgress();
printStreamMessagesOp.EnableProgress();
#endif
std::srand(std::time(nullptr));
}
~StreamAlgebra() {};
};
/*
7 Initialization
Each algebra module needs an initialization function. The algebra manager
has a reference to this function if this algebra is included in the list
of required algebras, thus forcing the linker to include this module.
The algebra manager invokes this function to get a reference to the instance
of the algebra class and to provide references to the global nested list
container (used to store constructor, type, operator and object information)
and to the query processor.
The function has a C interface to make it possible to load the algebra
dynamically at runtime.
*/
extern "C"
Algebra*
InitializeStreamAlgebra( NestedList* nlRef, QueryProcessor* qpRef )
{
nl = nlRef;
qp = qpRef;
return (new StreamAlgebra());
}