Files
secondo/Algebras/CStream/receiveStream.cpp

695 lines
24 KiB
C++
Raw Permalink Normal View History

2026-01-23 17:03:45 +08:00
/*
----
This file is part of SECONDO.
Copyright (C) 2004, University in Hagen, Department of Computer Science,
Database Systems for New Applications.
SECONDO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
SECONDO is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SECONDO; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
----
//paragraph [1] Title: [{\Large \bf \begin{center}] [\end{center}}]
//characters [1] Type: [] []
//characters [2] Type: [] []
//[ae] [\"{a}]
//[oe] [\"{o}]
//[ue] [\"{u}]
//[ss] [{\ss}]
//[Ae] [\"{A}]
//[Oe] [\"{O}]
//[Ue] [\"{U}]
//[x] [$\times $]
//[->] [$\rightarrow $]
//[toc] [\tableofcontents]
[1] Implementation of operation receiveStream.
[toc]
1 Operation receiveStream implementation
*/
#include "Algebras/Relation-C++/RelationAlgebra.h"
#include "QueryProcessor.h"
#include "AlgebraManager.h"
#include "StandardTypes.h"
#include "Algebras/TupleIdentifier/TupleIdentifier.h"
#include "Algebras/FText/FTextAlgebra.h"
#include "RTuple.h"
#include "Symbols.h"
#include "ListUtils.h"
#include "Stream.h"
#include "TupleDescr.h"
#include "VTuple.h"
#include "VTHelpers.h"
#include "JTree.h"
#include "Algebras/IMEX/aisdecode.h"
#include <iostream>
#include <fstream>
#include <boost/algorithm/string.hpp>
#include "TestTuples.h"
extern NestedList* nl;
extern QueryProcessor* qp;
extern AlgebraManager* am;
namespace cstream {
using namespace std;
/*
1.1.0 LocalInfoBase
The LocalInfo-Classes implement the receiveStream operator gets the data.
At the moment, only JSON and AIS data sources are supported but this
modular approach would make wasy to implement other sources, like CSV or
XML files.
For implementing a new data source, the new LocalInfo<...> class mus only
offer a getNext() method. Also, the Value Mapping function would need
small changes.
*/
class LocalInfoBase {
public:
virtual ~LocalInfoBase() {};
virtual VTuple* getNext() = 0;
};
/*
1.1.1 LocalInfoJSON
Implements the use of JSON files as data sources.
The LocalInfoJSON class only splits a given file into single JSON-Objects.
The conversion is done by the JNode class, which builds a tree structure.
This decoupled approach allows the re-use of parsing an JSON-string for
Secondo.
*/
class LocalInfoJSON : public LocalInfoBase {
public:
LocalInfoJSON(string filename): filename(filename) {
fin.open(filename, ios::in);
}
~LocalInfoJSON() {}
VTuple* getNext() {
VTuple* res;
try {
if (!fin.is_open()) return NULL;
std::string jsonString = getNextJsonString();
LOG << ENDL << ENDL << jsonString << ENDL << ENDL;
if (jsonString == "") return NULL;
JNode* root = new JNode(jROOT, "", jsonString, NULL);
root->buildTree();
std::string tds = root->createTupleDescrString();
LOG << ENDL << tds << ENDL;
root->print();
TupleDescr* td = NULL;
try {
td = new TupleDescr(tds);
} catch (SecondoException& e) {
LOG << "LocalInfoJSON: getNext() - Error: "
<< e.msg() << ENDL;
return NULL;
}
Tuple* t = root->buildTupleTree();
#ifdef DEBUG_OUTPUT
t->Print(cout);
#endif
res = new VTuple(t, td);
root->deleteRec();
delete root;
root = NULL;
} catch (int e) {
LOG << "CStream::JsonGetNext - an exception # "
<< e << " # occurred." << ENDL;
res = NULL;
}
return res;
}
std::string getNextJsonString() {
std::string res = "";
int bracketsCount = 0;
bool inText = false;
bool doEscape = false;
char c;
while (!fin.eof() ) {
fin.get(c);
// Every json object has to start with {.
if ((c != '{') && (res.length()==0)) continue;
switch (c) {
case '\\': {
doEscape = !doEscape;
break;
}
case '"': {
if (!(doEscape)) inText = !inText;
doEscape = false;
break;
}
case '{': {
if (!doEscape && !inText) bracketsCount++;
doEscape = false;
break;
}
case '}': {
if (!doEscape && !inText) bracketsCount--;
doEscape = false;
break;
}
default: {
doEscape = false;
}
}
res += c;
if ((res.length() > 0) && (bracketsCount == 0)) break;
}
return res;
}
private:
std::string filename;
ifstream fin;
};
/*
1.1.2 LocalInfoAIS
Implements the use of AIS files as data sources.
Heavily builds on top of aisdecode.h.
*/
class LocalInfoAIS : public LocalInfoBase {
public:
LocalInfoAIS(string filename): filename(filename) {
ifstream f(filename.c_str());
if (f.good()) {
f.close();
aisd = new aisdecode::aisdecoder(filename);
} else {
f.close();
aisd = NULL;
}
for(int i=0;i<27;i++){
stdTupleTypes[i] = 0;
}
}
~LocalInfoAIS() {
if (aisd) {
delete aisd;
}
for( int i=0;i<27;i++) {
if(stdTupleTypes[i])
stdTupleTypes[i]->DeleteIfAllowed();
}
}
VTuple* getNext() {
// No filename provided or file does not exist.
if (aisd == NULL) return NULL;
aisdecode::MessageBase* msg = aisd->getNextMessage();
if (msg==0) return NULL;
std::string tupleDescr = getTupleDescription(msg);
TupleDescr* td = NULL;
try {
td = new TupleDescr(tupleDescr);
} catch (SecondoException& e) {
LOG << "LocalInfoAIS: getNext() - Error: " << e.msg() << ENDL;
return NULL;
}
Tuple* t = getTuple(td, msg);
VTuple* vt = new VTuple(t, td);
delete msg;
return vt;
}
Tuple* getTuple(TupleDescr* td, aisdecode::MessageBase* msg) {
int mt = msg->getType();
if(!stdTupleTypes[mt-1]){
stdTupleTypes[mt-1] = td->CreateTupleType();
}
TupleType* tt = stdTupleTypes[mt-1];
Tuple* t = new Tuple(tt);
switch(msg->getType()) {
case 1:
case 2:
case 3:{
LOG << "Type 123" << ENDL;
aisdecode::Message1_3* msg1_3 = static_cast<aisdecode::Message1_3*>(msg);
t->PutAttribute(0, new CcInt(true, msg1_3->messageType));
t->PutAttribute(1, new CcInt(true, msg1_3->repeatIndicator));
t->PutAttribute(2, new CcInt(true, msg1_3->mmsi));
t->PutAttribute(3, new CcInt(true, msg1_3->status));
t->PutAttribute(4, new CcInt(true, msg1_3->rot));
t->PutAttribute(5, new CcInt(true, msg1_3->sog));
t->PutAttribute(6, new CcInt(true, msg1_3->accuracy));
t->PutAttribute(7, new CcReal(true, msg1_3->longitude));
t->PutAttribute(8, new CcReal(true, msg1_3->latitude));
t->PutAttribute(9, new CcInt(true, msg1_3->cog));
t->PutAttribute(10, new CcInt(true, msg1_3->heading));
t->PutAttribute(11, new CcInt(true, msg1_3->second));
t->PutAttribute(12, new CcInt(true, msg1_3->maneuver));
t->PutAttribute(13, new CcInt(true, msg1_3->spare));
t->PutAttribute(14, new CcInt(true, msg1_3->raim));
t->PutAttribute(15, new CcInt(true, msg1_3->rstatus));
break;}
case 4:{
LOG << "Type 4" << ENDL;
aisdecode::Message4* msg4 = static_cast<aisdecode::Message4*>(msg);
t->PutAttribute(0, new CcInt(true, msg4->type));
t->PutAttribute(1, new CcInt(true, msg4->repeat));
t->PutAttribute(2, new CcInt(true, msg4->mmsi));
t->PutAttribute(3, new CcInt(true, msg4->year));
t->PutAttribute(4, new CcInt(true, msg4->month));
t->PutAttribute(5, new CcInt(true, msg4->day));
t->PutAttribute(6, new CcInt(true, msg4->hour));
t->PutAttribute(7, new CcInt(true, msg4->minute));
t->PutAttribute(8, new CcInt(true, msg4->second));
t->PutAttribute(9, new CcInt(true, msg4->fix));
t->PutAttribute(10, new CcReal(true, msg4->longitude));
t->PutAttribute(11, new CcReal(true, msg4->latitude));
t->PutAttribute(12, new CcInt(true, msg4->epfd));
t->PutAttribute(13, new CcInt(true, msg4->spare));
t->PutAttribute(14, new CcInt(true, msg4->raim));
t->PutAttribute(15, new CcInt(true, msg4->sotdma));
break;}
case 5:{
LOG << "Type 5" << ENDL;
aisdecode::Message5* msg5 = static_cast<aisdecode::Message5*>(msg);
t->PutAttribute(0, new CcInt(true, msg5->type));
t->PutAttribute(1, new CcInt(true, msg5->repeat));
t->PutAttribute(2, new CcInt(true, msg5->mmsi));
t->PutAttribute(3, new CcInt(true, msg5->ais_version));
t->PutAttribute(4, new CcInt(true, msg5->imo));
t->PutAttribute(5, new CcString(true, msg5->callSign));
t->PutAttribute(6, new CcString(true, msg5->vesselName));
t->PutAttribute(7, new CcInt(true, msg5->shipType));
t->PutAttribute(8, new CcInt(true, msg5->dimToBow));
t->PutAttribute(9, new CcInt(true, msg5->dimToStern));
t->PutAttribute(10, new CcInt(true, msg5->dimToPort));
t->PutAttribute(11, new CcInt(true, msg5->dimToStarboard));
t->PutAttribute(12, new CcInt(true, msg5->epfd));
t->PutAttribute(13, new CcInt(true, msg5->month));
t->PutAttribute(14, new CcInt(true, msg5->day));
t->PutAttribute(15, new CcInt(true, msg5->hour));
t->PutAttribute(16, new CcInt(true, msg5->minute));
t->PutAttribute(17, new CcInt(true, msg5->draught));
t->PutAttribute(18, new CcString(true, msg5->destination));
t->PutAttribute(19, new CcInt(true, msg5->dte));
break;}
case 9:{
LOG << "Type 9" << ENDL;
aisdecode::Message9* msg9 = static_cast<aisdecode::Message9*>(msg);
t->PutAttribute(0, new CcInt(true, msg9->type));
t->PutAttribute(1, new CcInt(true, msg9->repeat));
t->PutAttribute(2, new CcInt(true, msg9->mmsi));
t->PutAttribute(3, new CcInt(true, msg9->alt));
t->PutAttribute(4, new CcInt(true, msg9->sog));
t->PutAttribute(5, new CcInt(true, msg9->accuracy));
t->PutAttribute(6, new CcReal(true, msg9->longitude));
t->PutAttribute(7, new CcReal(true, msg9->latitude));
t->PutAttribute(8, new CcInt(true, msg9->cog));
t->PutAttribute(9, new CcInt(true, msg9->second));
t->PutAttribute(10, new CcInt(true, msg9->reserved));
t->PutAttribute(11, new CcInt(true, msg9->dte));
t->PutAttribute(12, new CcInt(true, msg9->assigned));
t->PutAttribute(13, new CcInt(true, msg9->raim));
t->PutAttribute(14, new CcInt(true, msg9->radio));
break;}
case 12:{
LOG << "Type 12" << ENDL;
aisdecode::Message12* msg12 = static_cast<aisdecode::Message12*>(msg);
t->PutAttribute(0, new CcString(true, msg12->omsg));
t->PutAttribute(1, new CcInt(true, msg12->type));
t->PutAttribute(2, new CcInt(true, msg12->repeat));
t->PutAttribute(3, new CcInt(true, msg12->source_mmsi));
t->PutAttribute(4, new CcInt(true, msg12->sequence_number));
t->PutAttribute(5, new CcInt(true, msg12->dest_mmsi));
t->PutAttribute(6, new CcInt(true, msg12->retransmit));
t->PutAttribute(7, new CcString(true, msg12->text));
break;}
case 14:{
LOG << "Type 14" << ENDL;
aisdecode::Message14* msg14 = static_cast<aisdecode::Message14*>(msg);
t->PutAttribute(0, new CcString(true, msg14->omsg));
t->PutAttribute(1, new CcInt(true, msg14->type));
t->PutAttribute(2, new CcInt(true, msg14->repeat));
t->PutAttribute(3, new CcInt(true, msg14->mmsi));
t->PutAttribute(4, new CcString(true, msg14->text));
break;}
case 18:{
LOG << "Type 18" << ENDL;
aisdecode::Message18* msg18 = static_cast<aisdecode::Message18*>(msg);
t->PutAttribute(0, new CcInt(true, msg18->type));
t->PutAttribute(1, new CcInt(true, msg18->repeat));
t->PutAttribute(2, new CcInt(true, msg18->mmsi));
t->PutAttribute(3, new CcInt(true, msg18->reserved1));
t->PutAttribute(4, new CcInt(true, msg18->sog));
t->PutAttribute(5, new CcInt(true, msg18->accuracy));
t->PutAttribute(6, new CcReal(true, msg18->longitude));
t->PutAttribute(7, new CcReal(true, msg18->latitude));
t->PutAttribute(8, new CcInt(true, msg18->cog));
t->PutAttribute(9, new CcInt(true, msg18->heading));
t->PutAttribute(10, new CcInt(true, msg18->second));
t->PutAttribute(11, new CcInt(true, msg18->reserved2));
t->PutAttribute(12, new CcInt(true, msg18->cs));
t->PutAttribute(13, new CcInt(true, msg18->display));
t->PutAttribute(14, new CcInt(true, msg18->dsc));
t->PutAttribute(15, new CcInt(true, msg18->band));
t->PutAttribute(16, new CcInt(true, msg18->msg22));
t->PutAttribute(17, new CcInt(true, msg18->assigned));
t->PutAttribute(18, new CcInt(true, msg18->raim));
t->PutAttribute(19, new CcInt(true, msg18->radio));
break;}
case 19:{
LOG << "Type 19" << ENDL;
aisdecode::Message19* msg19 = static_cast<aisdecode::Message19*>(msg);
t->PutAttribute(0, new CcInt(true, msg19->type));
t->PutAttribute(1, new CcInt(true, msg19->repeat));
t->PutAttribute(2, new CcInt(true, msg19->mmsi));
t->PutAttribute(3, new CcInt(true, msg19->reserved));
t->PutAttribute(4, new CcInt(true, msg19->sog));
t->PutAttribute(5, new CcInt(true, msg19->accuracy));
t->PutAttribute(6, new CcReal(true, msg19->longitude));
t->PutAttribute(7, new CcReal(true, msg19->latitude));
t->PutAttribute(8, new CcInt(true, msg19->cog));
t->PutAttribute(9, new CcInt(true, msg19->heading));
t->PutAttribute(10, new CcInt(true, msg19->second));
t->PutAttribute(11, new CcInt(true, msg19->reserved2));
t->PutAttribute(12, new CcString(true, msg19->name));
t->PutAttribute(13, new CcInt(true, msg19->shiptype));
t->PutAttribute(14, new CcInt(true, msg19->dimToBow));
t->PutAttribute(15, new CcInt(true, msg19->dimToStern));
t->PutAttribute(16, new CcInt(true, msg19->dimToPort));
t->PutAttribute(17, new CcInt(true, msg19->dimToStarboard));
t->PutAttribute(18, new CcInt(true, msg19->epfd));
t->PutAttribute(19, new CcInt(true, msg19->raim));
t->PutAttribute(20, new CcInt(true, msg19->dte));
t->PutAttribute(21, new CcInt(true, msg19->assigned));
break;}
case 24:{
LOG << "Type 24" << ENDL;
aisdecode::Message24* msg24 = static_cast<aisdecode::Message24*>(msg);
t->PutAttribute(0, new CcInt(true, msg24->type));
t->PutAttribute(1, new CcInt(true, msg24->repeat));
t->PutAttribute(2, new CcInt(true, msg24->mmsi));
t->PutAttribute(3, new CcInt(true, msg24->partno));
t->PutAttribute(4, new CcString(true, msg24->shipsname));
t->PutAttribute(5, new CcInt(true, msg24->shiptype));
t->PutAttribute(6, new CcInt(true, msg24->vendorid));
t->PutAttribute(7, new CcInt(true, msg24->model));
t->PutAttribute(8, new CcInt(true, msg24->serial));
t->PutAttribute(9, new CcString(true, msg24->callsign));
t->PutAttribute(10, new CcInt(true, msg24->dimToBow));
t->PutAttribute(11, new CcInt(true, msg24->dimToStern));
t->PutAttribute(12, new CcInt(true, msg24->dimToPort));
t->PutAttribute(13, new CcInt(true, msg24->dimToStarboard));
t->PutAttribute(14, new CcInt(true, msg24->mothership_mmsi));
break;}
}
return t;
}
std::string getTupleDescription(aisdecode::MessageBase* msg) {
switch(msg->getType()) {
case 1:
case 2:
case 3: return "((Type int)(RepeatIndicator int)(Mmsi int)"
"(Status int)(Rot int)(Sog int)(Accuracy int)(Longitude real)"
"(Latitude real)(Cog int)(Heading int)(Second int)(Maneuver int)"
"(Spare int)(Raim int)(Rstatus int))";
case 4: return "((Type int)(Repeat int)(Mmsi int)(Year int)(Month int)"
"(Day int)(Hour int)(Minute int)(Second int)(Fix int)(Longitude real)"
"(Latitude real)(Epfd int)(Spare int)(Raim int)(Sotdma int))";
case 5: return "((Type int)(Repeat int)(Mmsi int)(Ais_version int)(Imo int)"
"(CallSign string)(VesselName string)(ShipType int)(DimToBow int)"
"(DimToStern int)(DimToPort int)(DimToStarboard int)"
"(Epfd int)(Month int)(Day int)(Hour int)(Minute int)"
"(Draught int)(Destination string)(Dte int))";
case 9: return "((Type int)(Repeat int)(Mmsi int)(Alt int)(Sog int)"
"(Accuracy int)(Longitude real)(Latitude real)(Cog int)(Second int)"
"(Reserved int)(Dte int)(Assigned int)(Raim int)(Radio int))";
case 12: return "((Omsg string)(Type int)(Repeat int)(Source_mmsi int)"
"(Sequence_number int)(Dest_mmsi int)(Retransmit int)(Text string))";
case 14: return "((Type int)(Repeat int)(Source_mmsi int)"
"(Sequence_number int)(Dest_mmsi int)(Retransmit int)(Text string))";
case 18: return "((Type int)(Repeat int)(Mmsi int)(Reserved1 int)(Sog int)"
"(Accuracy int)(Longitude real)(Latitude real)(Cog int)(Heading int)"
"(Second int)(Reserved2 int)(Cs int)(Display int)(Dsc int)(Band int)"
"(Msg22 int)(Assigned int)(Raim int)(Radio int))";
case 19: return "((Type int)(Repeat int)(Mmsi int)(Reserved int)(Sog int)"
"(Accuracy int)(Longitude real)(Latitude real)(Cog int)(Heading int)"
"(Second int)(Reserved2 int)(Name string)(Shiptype int)(DimToBow int)"
"(DimToStern int)(DimToPort int)(DimToStarboard int)(Epfd int)"
"(Raim int)(Dte int)(Assigned int))";
case 24: return "((Type int)(Repeat int)(Mmsi int)(Partno int)"
"(Shipsname string)(Shiptype int)(Vendorid int)(Model int)(Serial int)"
"(Callsign string)(DimToBow int)(DimToStern int)(DimToPort int)"
"(DimToStarboard int)(Mothership_mmsi int))";
default: return "";
}
}
private:
std::string filename;
aisdecode::aisdecoder* aisd;
TupleType* stdTupleTypes[27];
};
class LocalInfoTestTuple : public LocalInfoBase {
public:
LocalInfoTestTuple(string whichTuple) {
tt = new TestTuples();
whichTuple = "1";
maxTuple = 5;
countTuple = 0;
if (whichTuple.find("*") != std::string::npos) {
maxTuple = atoi(whichTuple.substr(0,
whichTuple.find("*")).c_str());
whichTuple = whichTuple.substr(
whichTuple.find("*")+1, whichTuple.length());
}
}
~LocalInfoTestTuple() {}
VTuple* getNext() {
if (countTuple == maxTuple) return NULL;
countTuple++;
if (whichTuple == "1") return tt->Create1();
if (whichTuple == "1_1") return tt->Create1_1();
if (whichTuple == "2") return tt->Create2();
if (whichTuple == "2_1") return tt->Create2_1();
if (whichTuple == "3") return tt->Create3();
if (whichTuple == "3_1") return tt->Create3_1();
if (whichTuple == "4") return tt->Create4();
if (whichTuple == "4_1") return tt->Create4_1();
if (whichTuple == "5") return tt->Create5();
if (whichTuple == "5_1") return tt->Create5_1();
return tt->Create1();
}
private:
std::string whichTuple;
int countTuple;
int maxTuple;
TestTuples* tt;
};
/*
1.2 TypeMapping of operator ~receiveStream~
Expects two strings, first the filename, then the format (currently
AIS or JSON).
*/
ListExpr ReceiveStream_TM(ListExpr args) {
// the list is coded as ( (<type> <query part>) (<type> <query part>) )
#ifdef DEBUG_OUTPUT
VTHelpers::PrintList("ReceiveStream_TM", args, 2);
#endif
if (nl->ListLength(args) != 2)
return listutils::typeError("two argument expected");
// 1. argument: filename (FTEXT)
ListExpr arg1 = nl->First(nl->First(args));
if (!FText::checkType(arg1))
return listutils::typeError("file name as ftext expected");
// 2. argument: format (STRING)
ListExpr arg2 = nl->First(nl->Second(args));
if (!CcString::checkType(arg2))
return listutils::typeError("file format as string expected");
return nl->TwoElemList(listutils::basicSymbol<Stream<VTuple> >(),
nl->OneElemList(listutils::basicSymbol<VTuple>()));
}
/*
1.2 ValueMapping of operator ~receiveStream~
Plain and simple ValueMapping. All real work is done by the getNext()
function of the corresponding LocalInfo<> Class.
*/
int ReceiveStream_VM(Word* args, Word& result, int message,
Word& local, Supplier s) {
LOG << "ReceiveStream_VM: ";
LocalInfoBase* li = (LocalInfoBase*) local.addr;
switch(message) {
case OPEN: {
LOG << "OPEN" << ENDL;
FText* filenameFText = (FText*) args[0].addr;
CcString* datatype = (CcString*) args[1].addr;
if(! filenameFText ->IsDefined()) {
cout << "Error: Filename is undefined" << endl;
return 0;
}
if(! datatype ->IsDefined()) {
cout << "Error: datatype is undefined" << endl;
return 0;
}
std::string type = datatype->GetValue();
std::string filename = filenameFText->GetValue();
boost::to_upper(type);
if (li) {
delete li;
}
if (type == "AIS") {
LOG << "(AIS) File: " << filename << ENDL;
local.addr = new LocalInfoAIS( filename );
} else if (type == "JSON") {
LOG << "(JSON) File: " << filename << ENDL;
local.addr = new LocalInfoJSON( filename );
} else if (type == "TEST") {
LOG << "(TEST) TestTuple: " << filename << ENDL;
local.addr = new LocalInfoTestTuple( filename );
} // else if (type == "CSV") {
// LOG << "(JSON) File: " << filename->GetValue() << ENDL;
// local.addr = new LocalInfoCSV( filename );
// } <-- Add new allowed file types like this!
return 0;
}
case REQUEST: {
LOG << "REQUEST";
if (!li) return CANCEL;
VTuple* res = li->getNext();
if (res) {
LOG << " yield" << ENDL;
result = res;
return YIELD;
}
LOG << " cancel" << ENDL;
result.addr = 0;
return CANCEL;
}
case CLOSE: {
LOG << "CLOSE" << ENDL;
if (li) {
delete li;
local.addr = 0;
}
return 0;
}
}
return 0;
}
/*
5.8.3 Specification of operator ~receiveStream~
*/
const std::string ReceiveStream_Spec =
"( ( \"Signature\" \"Syntax\" \"Meaning\" \"Example\" ) "
"( <text>(text string)"
" -> (stream (vtuple))"
"</text--->"
"<text>receiveStream(_, _)</text--->"
"<text>Converts a file of JSON or AIS data to a stream of VTuples.</text--->"
"<text>receivestring(\'somefile.json\', \"json\")</text--->"
") )";
/*
5.8.3 Definition of operator ~receiveStream~
*/
Operator receiveStream_Op(
"receiveStream",
ReceiveStream_Spec,
ReceiveStream_VM,
Operator::SimpleSelect,
ReceiveStream_TM
);
} /* end of namespace */