Files
secondo/Algebras/Hadoop/HadoopAlgebra.cpp

6505 lines
190 KiB
C++
Raw Permalink Normal View History

2026-01-23 17:03:45 +08:00
/*
----
This file is part of SECONDO.
Copyright (C) 2004-2008, University in Hagen, Faculty of Mathematics and
Computer Science, Database Systems for New Applications.
SECONDO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
SECONDO is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SECONDO; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
----
//paragraph [1] Title: [{\Large \bf] [}]
//paragraph [10] Footnote: [{\footnote{] [}}]
//[TOC] [\tableofcontents]
//[newpage] [\newpage]
//[<] [$<$]
//[>] [$>$]
//[INSET] [$\in$]
[1] Implementation of HadoopParallelAlgebra
April 2010 Jiamin Lu
[TOC]
[newpage]
1 Abstract
HadoopParallelAlgebra implements all operators containing precast Hadoop jobs.
This algebra cannot be completely compiled, if the \$HADOOP\_HOME variable is
not correctly set.
This algebra includes follow operators:
* ~hadoopMap~.
* ~hadoopReduce~.
* ~hadoopReduce2~.
1 Includes, Globals
*/
#include "HadoopAlgebra.h"
using namespace std;
using namespace temporalalgebra;
/*
9 Data Type fList
During the parallel processing, we need to indicate Secondo objects
distributed in slave nodes of the cluster. There are two situations
need to be considered:
* Object's pieces are kept in Secondo databases, with a same name.
* Object's pieces are kept in a series of binary files, start with a
same name.
The fList is used to process both situations.
Assume a Secondo relation is divided to ~r~ * ~c~ a matrix relation,
with ~r~ rows and ~c~ columns. All part(cell) inside this metrix
are all either stored in slaves' Secondo databases,
or exported as data and type files, kept in slaves' file systems.
If they are kept in database system, it must be a ~n~ * 1 matrix,
~n~ is the number of the slaves,
as each slave database only has one part of the object with a same name.
If the data are kept in partition files, then each slave may contain
several rows of partition files.
For each fList, during the procedures with Hadoop jobs,
each map task process one row data in the matrix relation,
and divide it into columns of partition files after the internal processing.
Each reduce task process one column partition files,
and transpose it into one row of the output fList.
A fList contains following variables:
* objectName. The name of the distributed Secondo object. If data is
distributed as Secondo objects in slaves, then all objects
are named as this value. If data is distributed as partition files,
then all these partition file names start with this value.
* objectType. The schema of this distributed Secondo Object.
At present, it must be a relation.
* nodesList. Indicate the locations of slaves where the data are
distributed on.
It's used to be set by user manually, but now can be read from
the PARALLEL\_SECONDO\_SLAVES list when the fList is built up at the first time,
and then is kept independently in the fList.
While reading an exist fList, and its kept nodesList is different from the
current slave list, then it will be marked as unavailable.
The master node doesn't take part in the distribution of data.
* fileLocList. Indicate the location of the objects.
* [<] READ DB / [>] : Here indicate the object is distributed as Secondo
Objects in every slave Secondo databases.
* [<] file path [>] : Here indicate the file location of a partition file,
and it must be a absolute file path.
* null value : Here indicate the partition file is kept in the default file
path, which is defined in master or slave list.
* duplicateTimes. Indicate the duplication times for each partition file.
If data are distributed as Secondo objects, then this value must be 1.
* Available. Denotes whether the last operator which created this
list is successfully performed.
* Distributed. A boolean used to indicate whether data are distributed.
The *Distributed* variable only can be set by operators like ~spread~
which create a flist data, together with data files.
When a flist is undistributed, then it cannot be read data from it,
like what the operator ~collect~ does.
Update at 6th Feb. 2012
For building up the generic parallel Secondo,
a fList object should be used to describe three different kinds of objects:
* DGO : Distributed global object. Its value is kept on the master node only.
* DLO : Distributed local object. Its value is distributed on slaves,
as Secondo objects.
* DLF : Distributed local file list. Its value is distributed on slaves,
as disk files, only for relation.
Update at 19th Mar. 2012
Remove the DGO kind, since I decided to not create special flist object,
only for pointing out some objects that have already been created is a flist.
Besides, I also need to remove two useless attributes from the value list:
* available: when the objectKind field is set as UNDEF kind
or the distributed field is set as false, then available is false, or else
it is true.
* inDB: it can be replaced by the objectKind too.
*/
fList::fList(string _on, NList _tl, clusterInfo* _ci, NList _fll,
size_t _dp, bool _idd, fListKind _fkd, size_t _mrn, size_t _mcn,
NList _uemq):
subName(_on), objectType(_tl),
interCluster(new clusterInfo(*_ci)),
fileLocList(_fll),
dupTimes(_dp),
mrNum(_mrn), mcNum(_mcn),
isDistributed(_idd),
objKind(_fkd),
UEMapQuery(_uemq) //By default, it is an empty list
{
if (mrNum == 0 || mcNum == 0){
if (!verifyLocList()){
isDistributed = false;
}
}
}
fList::fList(fList& rhg):
subName(rhg.getSubName()),
objectType(rhg.objectType),
interCluster(new clusterInfo(*rhg.interCluster)),
fileLocList(rhg.getLocList()),
dupTimes(rhg.getDupTimes()),
mrNum(rhg.getMtxRowNum()),
mcNum(rhg.getMtxColNum()),
isDistributed(rhg.isDistributed),
objKind(rhg.objKind)
{}
/*
9.1 fList::In Function
The ~In~ function accepts following parameters
* A Secondo object name.
This is a string value express the name of an exist Secondo object.
We use it to get the ~objectName~ and its type expression.
In some cases, the object may not exist in the current database,
then it must has been exported into the file system,
and its type file must be kept in the local *parallel* directory.
If neither the object or the type file exists,
then set the ~correct~ as FALSE.
* A nodesList.
This is a list of string values, each specifies a IP address of
a node in the cluster. And the first one is viewed as the master
node by default.
* A fileLocMatrix
This is a nested list that composed by integer numbers,
which denotes the matrix of cell files
E.g., it may looks like this:
---- ( (1 ( 1 2 3 4 5))
(2 ( 1 2 3 4 5))
(3 ( 1 2 3 4 5))
(4 ( 1 2 3 4 5)) )
----
The above example shows that a Secondo objects is divided into
a 4x5 matrix file, and is distributed to a cluster with 4 nodes.
Each node, including the master node, contains five cell files.
* A duplicate times
This is a integer number used to tell how many duplications of a
cell file are kept inside the cluster.
At present, we adopt a simple chained declustering mechanism to
backup the duplications of the cell files.
Besides the primary node that is denoted in the fileLoc matrix,
it will be copied to (~dupTimes~ - 1) nodes that are listed after
the primary node within the nodesList.
Update at 26/12/2011
The ~nodesList~ is set up at the first time when the fList is built,
by creating a clusterInfo object, and doesn't need to be manually indicated.
After reading, the node list is kept inside the fList,
in case it need to be reloaded into another database.
Therefore it also can be manually indicated,
but the given nodelist must be a subset of the current node list,
so as to keep a fList object while the cluster scale increases.
In clusters like ours, that each node contains two hard disks,
and contains two independent miniSecondo databases,
these databases will be viewed as different slaves inside the cluster.
For the fileLocMatrix, it's possible that one slave may contains
several rows of files, some partition files may don't exist.
All partition files belong to one row must be stored at one slave,
and also in one file path, which is indicated as the last text of each row.
The row number of these partition files have nothing to do with the
slave nodes that store these files.
Hence it may looks like:
----( (2 (1 2 3 4 5) '')
(2 (1 2 3 4 5) '')
(1 (1 2 5) '\/mnt\/diskb')
(4 (1 2 3 4 5) '') )
----
The above example also shows a 4x5 matrix relation,
distributed on a cluster at least has 4 nodes.
Each row data has been partitioned into at most 5 pieces.
The 2th node has two rows partition files, while the 3th node doesn't have
any one of it.
And in the 1th node, the third and fourth column partition files don't exist.
The first row partition files are kept in the 2th node,
while the third row partition files are kept in the 1th node.
All files are kept in slaves' default parallel location,
except the third row, which are kept in a specific path of 1th node.
Update at 01/09/12
As some relation may cannot produce partitions for all rows,
we left an empty row inside the fileLocMatrix,
to denote a row without any partition files.
Hence now the fileLocMatrix should looks like:
----( (1 (1 2 3 4 5) '')
(2 (1 2 3 4 5) '')
(1 () '')
(2 (1 2 5) '\/mnt\/diskb')
(1 (1 2 3 4 5) '') )
----
Here the example indicates a 5x5 matrix relation,
distributed on a cluster with 2 slaves.
The third row is an empty row, it's column list is empty,
since there is no partition files produced for this row.
Update at 01/13/12
Since ~flist~ describes the distribution of tuples,
the type of a flist should be changed from simply flist
to flist(tuple(....)).
At the same time, there is no necessary to indicate an exist
object or type file anymore, since the type is given by users.
The anonymous of flist objects are checked by comparing the
type files if they exist.
The type file keeps the schema as tuple relation,
since it may be required by file-relevant operators.
Updated when ~spread2~ operator is created.
The type for DLF kind flist is set as
----
flist(stream(tuple(....)))
----
The type for DLO kind flist with relation type object is set as
----
flist(rel(tuple(....)))
----
So we can distinguish these two different types.
Update 23th Mar. 2012
I disable the ~In~ function of the fList,
since all the fileLocation is set by operators.
But it may still be needed, in case we want to reload
a fList to another cluster.
Update 18th Apr. 2012
An empty row in the fileLocList is expressed as an empty list directly,
as there is no slave keep the data of this row. Afterwards,
a fileLocMatrix containing empty rows looks like:
----( (1 (1 2 3 4 5) '')
(2 (1 2 3 4 5) '')
()
(2 (1 2 5) '\/mnt\/diskb')
(1 (1 2 3 4 5) '') )
----
Here this relation is divided into 5 rows, one row is empty, and the left 4 rows
are kept in two slaves equally.
*/
Word fList::In(const ListExpr typeInfo, const ListExpr instance,
const int errorPos, ListExpr& errorInfo, bool& correct)
{
correct = false;
return SetWord(Address(0));
}
ListExpr fList::Out(ListExpr typeInfo, Word value)
{
if (value.addr)
{
fList* fl = static_cast<fList*>(value.addr);
NList outList;
outList.append(NList(fl->getSubName(), true, false));
outList.append(fl->getNodeList());
outList.append(fl->getLocList());
outList.append(NList(fl->getMtxRowNum()));
outList.append(NList(fl->getMtxColNum()));
outList.append(NList(fl->getDupTimes()));
outList.append(NList(fl->isDistributed, false));
outList.append(NList(fl->objKind));
outList.append(fl->getUEMapQuery());
return outList.listExpr();
}
else
return nl->SymbolAtom("undefined");
}
Word fList::Create(const ListExpr typeInfo)
{
return SetWord(
new fList("", NList(),new clusterInfo(), NList(), 1));
}
void fList::Delete(const ListExpr typeInfo, Word& w)
{
fList* data = (fList*)w.addr;
if (data)
{
string objName = data->getSubName();
string typeFilePath = getLocalFilePath("", objName, "_type");
if (FileSystem::FileOrFolderExists(typeFilePath)){
FileSystem::DeleteFileOrFolder(typeFilePath);
}
delete data;
}
w.addr = 0;
}
void fList::Close(const ListExpr typeInfo, Word& w)
{
delete (fList*)w.addr;
w.addr = 0;
}
Word fList::Clone(const ListExpr typeInfo, const Word& w)
{
return SetWord(new fList(*(fList*)w.addr));
}
bool fList::Save(SmiRecord& valueRecord, size_t& offset,
const ListExpr typeInfo, Word& w)
{
bool ok = true;
ListExpr valueList = Out(typeInfo, w);
valueList = nl->OneElemList(valueList);
string valueStr;
nl->WriteToString(valueStr, valueList);
int valueLength = valueStr.length();
ok = ok && valueRecord.Write(&valueLength, sizeof(int), offset);
offset += sizeof(int);
ok = ok && valueRecord.Write(valueStr.data(), valueLength, offset);
offset += valueLength;
return ok;
}
bool fList::Open(SmiRecord& valueRecord,
size_t& offset,
const ListExpr typeInfo,
Word& value)
{
size_t valueLen;
string valueStr = "";
ListExpr valueList = 0;
char *buf = 0;
ListExpr errorInfo = nl->OneElemList(nl->SymbolAtom("ERRORS"));
bool correct;
bool ok = true;
ok &= (valueRecord.Read(&valueLen, sizeof(int), offset) == sizeof(int));
if (!ok) return false;
offset += sizeof(int);
buf = new char[valueLen];
ok &= (valueRecord.Read(buf, valueLen, offset) == valueLen);
offset += valueLen;
if (ok){
valueStr.assign(buf, valueLen);
nl->ReadFromString(valueStr, valueList);
value = RestoreFromList(typeInfo, nl->First(valueList),
1, errorInfo, correct);
if (errorInfo != 0)
nl->Destroy(errorInfo);
nl->Destroy(valueList);
}
delete []buf;
return ok;
}
Word fList::RestoreFromList(
const ListExpr typeInfo, const ListExpr instance,
const int errorPos, ListExpr& errorInfo, bool& correct )
{
NList il = NList(instance);
string objName = il.first().str();
NList typeList = NList(AntiNumericType(typeInfo));
NList nodeList = il.second();
NList locList = il.third();
size_t maxRNum = il.fourth().intval();
size_t maxCNum = il.fifth().intval();
size_t dupTimes = il.sixth().intval();
bool distributed = il.seventh().boolval();
fListKind kind = (fListKind)il.eigth().intval();
NList ueMapQuery;
if (il.length() == 9)
ueMapQuery = il.nineth();
clusterInfo *ci = new clusterInfo();
fList* fl = 0;
if (ci->covers(nodeList))
{
fl = new fList(objName, typeList, ci, locList, dupTimes,
distributed, kind, maxRNum, maxCNum, ueMapQuery);
return SetWord(fl);
}
else{
correct = false;
return SetWord(Address(0));
}
}
bool CheckFList(ListExpr type, ListExpr& errorInfo)
{
if ( (nl->ListLength(type) == 2)
&&(nl->IsEqual(nl->First(type), fList::BasicType()))){
return true;
}
else{
cmsg.otherError("FList Data Type Check Fails!");
return false;
}
}
/*
The ~In~ function only checks the type of the fileLocList,
but doesn't check the availability of the matrix relation.
This function checks whether the value of the flist is available,
by checking following conditions:
* each slave index must be less than nodesNum
* each column number must be a positive number
* duplicate number is less than nodesNum
* data path is set as [<]READ DB\/[>] while the data are not in database,
or the other way round.
Check the available of a file location list, also get the maximum row
and column number.
*/
bool fList::verifyLocList()
{
if (isAvailable() && (mrNum > 0 || mcNum > 0)){
return true;
}
if (!fileLocList.isEmpty())
{
mrNum = fileLocList.length();
mcNum = 0;
NList fll = fileLocList;
while (!fll.isEmpty())
{
NList aRow = fll.first();
if (!aRow.isEmpty())
{
int nodeNum = aRow.first().intval();
if (nodeNum >= (int)interCluster->getClusterSize())
{
cerr << "Improper data server number: " << nodeNum << endl;
return false;
}
NList cfList = aRow.second();
while (!cfList.isEmpty())
{
NList aPF = cfList.first(); //A partition file suffix
int partNum = aPF.intval();
if (partNum < 1)
{
cerr << "Improper column number: " << partNum << endl;
return false;
}
mcNum = (partNum > (int)mcNum) ? partNum : mcNum;
cfList.rest();
}
string dataLoc = aRow.third().str();
/*
If the fileLocList is not empty, then it must belong to DLF kind
If the path value is ~dbLoc~, or is not an absolute path
then it returns false
It is impossible to check the available of these paths,
since they exist on remote machines
*/
if (dataLoc.length() > 0)
{
if ( dataLoc.find('/') != 0
&& dataLoc.compare(dbLoc) != 0){
cerr << "Improper file path" << dataLoc << endl;
return false;
}
}
}
fll.rest();
}
if ((isInDB() && dupTimes > 1)
|| (dupTimes >= interCluster->getClusterSize()) )
{
cerr << "Improper duplication times: " << dupTimes
<< (isInDB() ? " , when data are kept in databases " : "")
<< endl;
return false;
}
return true;
}
return true;
}
size_t fList::getPartitionFileLoc(
size_t row, vector<string>& locations)
{
if (!isAvailable()){
cerr << "not available"
<< " with " << objKind << "," << (isDistributed?"T":"F")
<< endl;
return 0;
}
if ( row > mrNum ){
cerr << "The row (" << row << ":" << mrNum << ") "
<< " is illegal" << endl;
return 0;
}
ListExpr rowLoc = nl->Nth(row, fileLocList.listExpr());
if (nl->IsEmpty(rowLoc)){
return 0; //The current row is empty.
}
locations.resize(0);
stringstream ss;
int ssIndex = nl->IntValue(nl->First(rowLoc));
for (size_t i = 0; i < dupTimes; i++){
string sIPAddr = interCluster->getIP(ssIndex + i, true);
string dataLoc = nl->Text2String(nl->Third(rowLoc));
if (objKind == DLO){
dataLoc = dbLoc;
}
else if ((dataLoc.length() == 0) || (i > 0)){
// duplicated files are kept at remote node's default path
// Only output the remote folder, not complete file path.
dataLoc = interCluster->getRemotePath(ssIndex + i,
false, true, false);
}
string remotePath = sIPAddr + ":" + dataLoc;
locations.push_back(remotePath);
}
return dupTimes;
}
ListExpr fList::getColumnList(size_t row)
{
if ( isAvailable() && row <= mrNum){
if (!fileLocList.elem(row).isEmpty()){
return nl->Second(nl->Nth(row, fileLocList.listExpr()));
}
}
return nl->TheEmptyList();
}
void fList::appendFileLocList(NList elem)
{
if (isDistributed){
//cannot append new loc information to distributed flist
cerr << "Error! Cannot append new locations to "
"a distributed flist."
<< endl;
return;
}
fileLocList.append(elem);
}
struct fListInfo: ConstructorInfo
{
fListInfo()
{
name = "flist";
signature = " ANY -> FLIST ";
typeExample = "(flist(rel(tuple((PLZ int)(Ort string)))))";
listRep = "( objName fileLocList dupTimes inDB )";
valueExample =
"( \"plz\" ( (1 (1 2) '') (2 (1 2) '') (1 () '') ) 2 FALSE) ";
remarks = "Describe distributed data over computer clusters.";
}
};
struct fListFunctions: ConstructorFunctions<fList>
{
fListFunctions()
{
in = fList::In;
out = fList::Out;
create = fList::Create;
deletion = fList::Delete;
close = fList::Close;
clone = fList::Clone;
kindCheck = CheckFList;
save = fList::Save;
open = fList::Open;
}
};
fListInfo fli;
fListFunctions flf;
TypeConstructor flTC(fli, flf);
/*
5 Operator ~spread~
This operator accepts a tuple stream,
distributes tuples into a matrix relation based on their values of
a given attribute and returns a flist.
The map of the ~spread~ operator is:
----
stream(tuple(a1 ... ai ... aj ... an))
x fileName x filePath x [dupTime]
x ai x [scale] x [keepAI]
x [aj] x [scale2] x [keepAJ]
-> flist
----
All paratition files are kept as disk files in slave nodes,
and their file names are built up as: fileName\_row\_column,
row in [1,Scale],column in[1,Scale2].
Both fileName and filePath parameters are indispensable.
But the filePath can be set as an empty string,
and its default value listed in the SecondoConfig.ini is then used.
For the purpose of fault-tolerance, each partition file is
duplicated on ~dupTime~ continuous slave nodes,
the default value of ~dupTime~ is 1.
All duplicated files are kepts in nodes' default paths.
By default, a tuple stream is divided into ~sn~ * 1 partition files,
based on the value of the indispensable parameter ~ai~.
The ~sn~ is the number of slave nodes indicated in the
PARALLEL\_SECONDO\_SLAVES.
The partition attribute ~ai~ will be removed after the operation, except
the ~keepAI~ is set as true.
The ~sn~ can also be replaced by the optional parameter ~scale~ parameter,
and files of each row are distributed into a slave node,
based on the order denoted in the PARALLEL\_SECONDO\_SLAVES.
On each row, the data can be further divided into several column partition
files, if the second key-attribute ~aj~ is given.
The ~aj~ must be different from ~ai~, to avoid producing empty partition files.
The number of columns is decided by the number of values of ~aj~,
and also can be indicated by the optional parameter ~scale2~.
~aj~ also will be removed by default after the operation, except the ~keepAJ~
is set as true.
*/
struct SpreadInfo : OperatorInfo {
SpreadInfo() : OperatorInfo()
{
name = "spread";
signature = "stream(tuple(a1 ... ai ... aj ... an)) "
" x string x text x [int] "
" x ai x [int] x [bool] "
" x [aj] x [int] x [bool] "
" -> flist(tuple(a1 ... ai ... aj ... an))";
syntax = "stream(tuple(a1 ... ai ... aj ... an)) "
" x fileName x filePath x [dupTime] "
" x ai x [scale] x [keepAI] "
" x [aj] x [scale2] x [keepAJ] "
" -> flist(tuple(a1 ... ai ... aj ... an))";
meaning = "This operator accepts a tuple stream, "
"distributes its tuples into a matrix relation "
"based on their values of a given attribute "
"and returns a flist.";
}
};
/*
5.1 Type Mapping
----
stream(tuple(a1 ... ai ... aj ... an))
x string x text x [int]
x ai x [int] x [bool]
x [aj] x [int] x [bool]
-> flist(tuple(a1 ... ai ... aj ... an))
----
During the type mapping function, as we use several optional parameters,
hence it's better to divide these parameters into lists.
The specification of this operator is:
----
_ op[ list;list;list]
----
The first list denotes the fileName and filePath, and the optional ~dupTime~.
The second list denotes the first keyAttribute, together with its
optional scale.
The third list denotes the optional second keyAttribute, together with
its optional scale.
The type mapping function produces the text type file for the result files,
and duplicate it to every node's default pathlisted in
PARALLEL\_SECONDO\_SLAVES\/MASTER.
21th Mar. 2012
Set the file name and path as optional parameters too,
they can be set by rules, without always set by users.
Also the type of the output flist becomes flist(stream(tuple(T))),
used to distinguish with the DLO flists for tuple relations,
which have the type of flist(rel(tuple(T)))
Now the map becomes:
----
stream(tuple(a1 ... ai ... aj ... an))
x string x text x [int]
x ai x [int] x [bool]
x [aj] x [int] x [bool]
-> flist( stream ( tuple (a1 ... ai ... aj ... an)))
----
*/
ListExpr SpreadTypeMap(ListExpr args){
try{
NList l(args);
string err[] = {
// 0
"ERROR! Operator expects 4 lists arguments.",
"ERROR! Operator expects (stream(tuple(a1, a2, ..., an)))"
"x ( [string] x [text] x [int] ) "
"x ( ai x [int] x [bool] ) "
"x ( [aj] x [int] x [bool] )",
"ERROR! Infeasible evaluation in TM for attribute: ",
"ERROR! Unavailable file name: ",
"ERROR! Operator cannot find the dividing attribute: ",
// 5
"ERROR! Two keyAttributes must be different from each other.",
"ERROR! The result stream tuple type cannot be empty.",
"ERROR! Cannot create homonymous flists. ",
"ERROR! Cannot open file at ",
"ERROR! PARALLEL_SECONDO_SLAVES/MASTER is not set up. ",
// 10
"ERROR! Remote copy fails to path: ",
"ERROR! This Secondo database is not listed inside "
"PARALLEL_SECONDO_SLAVES/MASTER "
};
bool keepAI = false, keepAJ = false;
if (l.length() != 4)
return l.typeError(err[0]);
NList pType, pValue;
//First list, stream(tuple())
string fileName = "", filePath = "";
NList attrList;
if (!l.first().first().checkStreamTuple(attrList)){
return l.typeError(err[1]);
}
NList inStream = l.first().second();
//Second list, ([string] [text] [int] )
NList bpList = l.second(); //basic parameters
pType = bpList.first();
pValue = bpList.second();
if (pType.length() > 3){
return l.typeError(err[1]);
}
int len = pType.length();
for (int i = 1 ; i <= len ; i++)
{
NList pp = pType.elem(i);
NList pv = pValue.elem(i);
if (pp.isSymbol(CcString::BasicType()))
{
//Set the file name
ListExpr fnList;
if (!QueryProcessor::GetNLArgValueInTM(pv.listExpr(), fnList)){
return l.typeError(err[2] + "fileName");
}
fileName = NList(fnList).str();
}
else if (pp.isSymbol(FText::BasicType()))
{
//Set the file path
ListExpr fpList;
if (!QueryProcessor::GetNLArgValueInTM(pv.listExpr(), fpList)){
return l.typeError(err[2] + "filePath");
}
filePath = NList(fpList).str();
}
else if (!pp.isSymbol(CcInt::BasicType())){
return l.typeError(err[1]);
}
}
//Third list, (keyAttr1 [int] [bool])
NList scList1 = l.third(); //scale list 1
pType = scList1.first();
pValue= scList1.second();
if (pType.length() < 1 || pType.length() > 3){
return l.typeError(err[1]);
}
if (!pType.first().isSymbol()){
return l.typeError(err[1]);
}
string keyAI = pType.first().convertToString();
ListExpr attrType;
int attrIndex1 =
listutils::findAttribute(attrList.listExpr(), keyAI, attrType);
if (attrIndex1 < 1){
return l.typeError(err[4] + keyAI);
}
if (pType.length() == 2){
if (pType.second().isSymbol(CcBool::BasicType())){
keepAI = pValue.second().boolval();
}
else if (!pType.second().isSymbol(CcInt::BasicType())){
return l.typeError(err[1]);
}
}
else if (pType.length() == 3){
if (pType.second().isSymbol(CcInt::BasicType())
&& pType.third().isSymbol(CcBool::BasicType())){
keepAI = pValue.third().boolval();
}
else{
return l.typeError(err[1]);
}
}
//Fourth list, ([keyAttr2] [int] [bool])
NList scList2 = l.fourth(); //scale list 1
pType = scList2.first();
pValue= scList2.second();
string keyAJ = "";
int attrIndex2 = -1;
if (pType.length() > 3){
return l.typeError(err[1]);
}
if (pType.length() > 0){
if (!pType.first().isSymbol()){
return l.typeError(err[1]);
}
keyAJ = pType.first().convertToString();
attrIndex2 =
listutils::findAttribute(attrList.listExpr(), keyAJ, attrType);
if (attrIndex2 < 1){
return l.typeError(err[4] + keyAJ);
}
else if (attrIndex2 == attrIndex1){
return l.typeError(err[5]);
}
if (pType.length() == 2){
if (pType.second().isSymbol(CcBool::BasicType())){
keepAJ = pValue.second().boolval();
}
else if (!pType.second().isSymbol(CcInt::BasicType())){
return l.typeError(err[1]);
}
}
else if (pType.length() == 3){
if (pType.second().isSymbol(CcInt::BasicType())
&& pType.third().isSymbol(CcBool::BasicType())){
keepAJ = pValue.third().boolval();
}
else{
return l.typeError(err[1]);
}
}
}
NList newAttrList;
if (keepAI && keepAJ){
newAttrList = attrList;
}
else{
NList rest = attrList;
while (!rest.isEmpty()){
NList elem = rest.first();
if ( ((elem.first().str() != keyAI) || (keepAI))
&& ((elem.first().str() != keyAJ) || (keepAJ)) ){
newAttrList.append(elem);
}
rest.rest();
}
}
if (newAttrList.length() == 0){
return l.typeError(err[6]);
}
//Create the type file
if (fileName.length() == 0)
fileName = fList::tempName(false);
NList resultList =
NList(NList(fList::BasicType(),
NList(NList(Stream<Tuple>::BasicType()),
NList(NList(Tuple::BasicType()), newAttrList))));
filePath = getLocalFilePath(filePath,
(fileName + "_type"), "", true);
if (FileSystem::FileOrFolderExists(filePath)){
ListExpr exeType;
bool ok = false;
if (nl->ReadFromFile(filePath, exeType)){
if (listutils::isTupleStream(exeType)){
if (nl->Equal(exeType, nl->Second(resultList.listExpr()))){
ok = true;
}
}
}
if (!ok)
return l.typeError(err[7] + filePath);
}
else{
ListExpr expList = nl->Second(resultList.listExpr());
if (!nl->WriteToFile(filePath, expList)){
return l.typeError(err[8] + filePath);
}
}
//Duplicate the type to master and all slave nodes
clusterInfo* ci = new clusterInfo();
if (!ci->isOK()){
return l.typeError(err[9]);
}
if (ci->getLocalNode() < 0){
return l.typeError(err[11]);
}
string masterPath;
if (!ci->isLocalTheMaster()){
masterPath = ci->getRemotePath(0);
if ( 0 != system(
(scpCommand + filePath + " " + masterPath).c_str())){
return l.typeError(err[10] + masterPath);
}
}
for (size_t i = 1; i <= ci->getSlaveSize(); i++){
//Copy the type file to every slave
string rPath = ci->getRemotePath(i, false);
if ( 0 != system(
(scpCommand + filePath + " " + rPath).c_str())){
return l.typeError(err[10] + rPath);
}
}
return NList(NList(Symbol::APPEND()),
NList(NList(attrIndex1),
NList(attrIndex2),
NList(fileName,true, false)),
resultList).listExpr();
} catch(...){
return listutils::typeError("invalid input");
}
}
/*
5.2 Value Mapping
Each partition inside the flist produced by ~spread~ operator
is also composed by two files, the type file and the data file.
All partition files in a same row, i.e. kept in a same slave node,
share a same type file.
Both kinds files are as same as the files created by ~fconsume~ or
~fdistribute~ files,
so that they can be read by using ~ffeed~ operator as normal.
The reason is that, during the parallel processing,
the slave nodes don't contain the flist object,
but only read files from their local or neighbors' disks.
The master node deonted by PARALLEL\_SECONDO\_MASTER must contain the type file,
in order to avoid dirty data by checking these type files' names on the
master node.
Every slave node has one type file. If one slave node has several rows partition
files, then all rows share a same type file.
where to put the data file? Partition files are produced at the disk where the
spread operation is executed,
and then is copied to target after the production is finished.
*/
int SpreadValueMap(Word* args, Word& result,
int message, Word& local, Supplier s){
SpreadLocalInfo *lif = 0;
if ( message <= CLOSE )
{
//Get the parameters
result = qp->ResultStorage(s);
Supplier bspList = args[1].addr,
partList1 = args[2].addr,
partList2 = args[3].addr;
string fileName = "", filePath = "";
int keyIdxI = -1, keyIdxJ = -1;
int scaleI = 0, scaleJ = 0;
bool keepAI = false, keepAJ = false;
size_t dupTimes = 1;
keyIdxI = ((CcInt*)args[4].addr)->GetValue() - 1;
keyIdxJ = ((CcInt*)args[5].addr)->GetValue() - 1;
int blLen = qp->GetNoSons(bspList);
for (int i = 0; i < blLen; i++)
{
ListExpr pp = qp->GetType(qp->GetSupplierSon(bspList,i));
if (nl->IsEqual(pp, CcString::BasicType())){
//File Name is set
fileName = ((CcString*)qp->Request(
qp->GetSupplierSon(bspList, i)).addr)->GetValue();
}
else if (nl->IsEqual(pp, FText::BasicType())){
//File Path is set
filePath = ((FText*)qp->Request(
qp->GetSupplierSon(bspList, i)).addr)->GetValue();
}
else if (nl->IsEqual(pp, CcInt::BasicType())){
//duplicate time is set
dupTimes = ((CcInt*)qp->Request(
qp->GetSupplier(bspList,i)).addr)->GetValue();
}
}
if (fileName.length() == 0){
//The file name is not set
fileName = ((CcString*)args[6].addr)->GetValue();
}
int plLen1 = qp->GetNoSons(partList1);
if (plLen1 == 2){
ListExpr argType = qp->GetType(qp->GetSupplier(partList1,1));
if (nl->IsEqual(argType, CcBool::BasicType())){
keepAI = ((CcBool*)qp->Request(
qp->GetSupplier(partList1, 1)).addr)->GetValue();
}
else{
scaleI = ((CcInt*)qp->Request(
qp->GetSupplier(partList1, 1)).addr)->GetValue();
}
}
else if (plLen1 == 3){
scaleI = ((CcInt*)qp->Request(
qp->GetSupplier(partList1, 1)).addr)->GetValue();
keepAI = ((CcBool*)qp->Request(
qp->GetSupplier(partList1, 2)).addr)->GetValue();
}
int plLen2 = qp->GetNoSons(partList2);
if (plLen2 == 2){
ListExpr argType = qp->GetType(qp->GetSupplier(partList2,1));
if (nl->IsEqual(argType, CcBool::BasicType())){
keepAJ = ((CcBool*)qp->Request(
qp->GetSupplier(partList2, 1)).addr)->GetValue();
}
else{
scaleJ = ((CcInt*)qp->Request(
qp->GetSupplier(partList2, 1)).addr)->GetValue();
}
}
else if (plLen2 == 3){
scaleJ = ((CcInt*)qp->Request(
qp->GetSupplier(partList2, 1)).addr)->GetValue();
keepAJ = ((CcBool*)qp->Request(
qp->GetSupplier(partList2, 2)).addr)->GetValue();
}
lif = (SpreadLocalInfo*)local.addr;
if (lif) delete lif;
lif = new SpreadLocalInfo(fileName, filePath, dupTimes,
keyIdxI, scaleI, keepAI, keyIdxJ, scaleJ, keepAJ);
if (!lif->isAvailable()){ return 0; }
local.setAddr(lif);
Word wTuple(Address(0));
qp->Open(args[0].addr);
qp->Request(args[0].addr, wTuple);
while (qp->Received(args[0].addr))
{
if (!lif->insertTuple(wTuple)){
cerr << "Inserting tuple to files fail. " << endl;
break;
}
qp->Request(args[0].addr, wTuple);
}
if (lif->closeAllPartFiles()){
result.addr = new fList(*(lif->getResultList()));
}
else{
cerr << "Closing partition files fails" << endl;
}
delete lif;
local.setAddr(0);
}
//TODO add the progress estimation in the future
// else if ( message == REQUESTPROGRESS )
// else if ( message == CLOSEPROGRESS )
return 0;
}
SpreadLocalInfo::SpreadLocalInfo(
string fileName, string filePath, int _dp,
int _ai1, int _rn, bool _kai,
int _ai2, int _cn, bool _kaj):
partFileName(fileName),
attrIndex1(_ai1), attrIndex2(_ai2),
rowAmount(_rn),colAmount(_cn),
keepA1(_kai), keepA2(_kaj), done(false), tupleCounter(0),
dupTimes(_dp)
{
partFilePath = getLocalFilePath(filePath, fileName, "", false);
// Read the schema from the type file created in type mapping
string typeFilePath = getLocalFilePath(
filePath, fileName, "_type");
ListExpr resultTypeList;
if (!nl->ReadFromFile(typeFilePath, resultTypeList)){
cerr << "Reading result schema from the type file fails. "
<< endl;
return;
}
ci = new clusterInfo();
if (ci->isOK())
{
if (rowAmount == 0){
//By default, the row number of a flist is the number of slaves
rowAmount = ci->getSlaveSize();
}
//This operator only creates DLF kind distributed relations
resultList = new fList(fileName,
NList(resultTypeList), ci, NList(),
dupTimes, false, DLF, rowAmount, colAmount);
SecondoCatalog* sc = SecondoSystem::GetCatalog();
resultTypeList = sc->NumericType(nl->Second(resultTypeList));
exportTupleType = new TupleType(resultTypeList);
}
}
bool SpreadLocalInfo::insertTuple(Word wTuple)
{
// Insert a tuple into a proper data file.
// row and column number are calculated based on
// tuple's key attributes value.
// If the file has not been created, then create it,
// and insert the file pointer to the matrix
// Or else, find the file pointer from the matrix,
Tuple *tuple = static_cast<Tuple*>(wTuple.addr);
size_t row = hashValue(tuple, attrIndex1, rowAmount) + 1;
size_t column = 1;
if ( attrIndex2 >= 0 ){
column = hashValue(tuple, attrIndex2, colAmount) + 1;
}
fileInfo* fp = 0;
map<size_t, rowFile*>::iterator mit = matrixRel.find(row);
if (mit != matrixRel.end()){
rowFile::iterator rit = mit->second->find(column);
if (rit != mit->second->end()){
fp = rit->second;
}else{
//create a new partition file.
fp = new fileInfo(column, partFilePath, partFileName,
exportTupleType->GetNoAttributes(), row);
mit->second->insert(pair<size_t, fileInfo*>(column, fp));
}
}else
{
// Create a new rowFile
fp = new fileInfo(column, partFilePath, partFileName,
exportTupleType->GetNoAttributes(), row);
rowFile *newRow = new rowFile();
newRow->insert(pair<size_t, fileInfo*>(column, fp));
matrixRel.insert(pair<size_t, rowFile*>(row, newRow));
}
bool ok = openFile(fp);
if (ok)
{
if (!fp->writeTuple(tuple, tupleCounter, exportTupleType,
attrIndex1, keepA1, attrIndex2, keepA2)){
cerr << "Block file " << fp->getFilePath()
<< " write failes." << endl;
ok = false;
}
else{
tupleCounter++;
tuple->DeleteIfAllowed();
}
}
return ok;
}
bool SpreadLocalInfo::openFile(fileInfo *fp){
//Control the amount of opening file handles
if (fp->isFileOpen()){
return true;
}
if (openFileList.size() >= MAX_OPENFILE_NUM)
{
//sort fileInfos according to their last tuples' indices
sort(openFileList.begin(), openFileList.end(), compFileInfo);
//The last one of the vector is the idler
bool poped = false;
//It's possible that fileInfos kept in the stack,
//are closed from other functions.
while(!poped && openFileList.size() > 0)
{
fileInfo* oldestFile = openFileList.back();
if (oldestFile->isFileOpen())
{
oldestFile->closeFile();
poped = true;
}
openFileList.pop_back();
}
}
bool ok = fp->openFile();
if (ok){
openFileList.push_back(fp);
}
return ok;
}
bool SpreadLocalInfo::closeAllPartFiles()
{
//traverse the whole matrix,
//to add the last description list on all part files.
//Then close and duplicate them.
size_t lastRow = 0;
map<size_t, rowFile*>::iterator mit = matrixRel.begin();
while (mit != matrixRel.end()){
size_t row = mit->first;
if (row > (lastRow + 1)){
//Insert empty rows
NList emptyRowList = NList();
for (size_t erow = (lastRow + 1); erow < row; erow++)
{
resultList->appendFileLocList(emptyRowList);
}
}
lastRow = row;
rowFile::iterator rit = mit->second->begin();
NList columnList;
string filePaths = "";
while ( rit!= mit->second->end()){
size_t column = rit->first;
columnList.append(NList((int)column));
fileInfo* fp = rit->second;
if (openFile(fp))
{
fp->writeLastDscr();
fp->closeFile();
}
else
{
cerr << "Part file " << fp->getFilePath()
<< " Cannot be correctly opened, "
"when writing the last description list. " << endl;
return false;
}
filePaths += (fp->getFilePath() + " ");
rit++;
}
bool *copyList = new bool[ci->getClusterSize()];
memset(copyList, 0, ci->getClusterSize());
size_t startNode = row;
for (size_t i = 0; i < dupTimes; i++){
size_t dupNode = ci->getInterIndex(
(startNode + i), false, true);
copyList[dupNode] = true;
}
for(size_t i = 0; i < ci->getClusterSize(); i++){
if (copyList[i]){
string remotePath =
ci->getRemotePath(i, false, true, true);
int copyTime = MAX_COPYTIMES;
while (copyTime-- > 0)
{
if (system(
(scpCommand + filePaths + " " + remotePath).c_str())
!= 0 ){
cerr << "Warning! Duplicate files "
<< filePaths << " to " << endl
<< remotePath << " fails. " << endl;
//<< strerror(error) << endl;
}
else
break;
}
if (copyTime <= 0){
cerr << "Error! Duplicate remote files fail." << endl;
return false;
}
}
}
//add the first duplicate location to the fileLocList
string remoteLocalPath =
ci->getRemotePath(row, false, true, false);
NList rowList = NList(
NList((int)ci->getInterIndex(row, false, true)),
columnList,
NList(remoteLocalPath, true, true));
resultList->appendFileLocList(rowList);
mit++;
}
resultList->setDistributed();
done = true;
return true;
}
size_t SpreadLocalInfo::hashValue(
Tuple *t, int attrIndex, int scale){
size_t hashValue =
((Attribute*)t->GetAttribute(attrIndex))->HashValue();
if (scale > 0){
hashValue %= scale;
}
return hashValue;
}
Operator spreadOp(SpreadInfo(), SpreadValueMap, SpreadTypeMap);
/*
5 Operator ~spreadFiles~
This operator reads a set of sub-files, and spread them to slave Data Servers.
It is prepared to cooperate with the operators like ~divide\_osm~,
spreading a set of divided files to slave Data Servers.
Its signature is:
----
FileName: string x Path: text
x [Size: int]
-> bool
----
It detects the files through the FileName and the Path argument.
All files must be named like: FileName\_No.
The ~No~ is an integer, starting from 0,
By default an empty path indicates the \$SECONDO\_BUILD\_DIR\/bin,
so that the implementers of the operators like ~divide\_osm~ don't need to
consider about the special path setting in Parallel SECONDO.
The Size is an optional argument, by default it is the cluster size,
i.e. the number of slave Data Servers.
*/
struct SpreadFilesInfo : OperatorInfo {
SpreadFilesInfo() : OperatorInfo()
{
name = "spreadFiles";
signature = "string x text x [int] -> bool";
syntax = "FileName x Path x [Size] -> Success";
meaning = "This operator reads a set of sub-files, "
"and spread them to slave Data Servers.";
}
};
/*
5.1 Type Mapping
This operator returns a boolean result,
which is true if all sub-files are copied to their respective target.
*/
ListExpr SpreadFilesTypeMap(ListExpr args){
NList l(args);
string err[] = {
//0
"ERROR!! Operator expects two or three arguments.",
"ERROR!! Operator expects string x text x [int] as the input",
""
};
if (l.length() < 2 || l.length() > 3){
return l.typeError(err[0]);
}
NList pType, pValue;
//FileName
if (!l.first().first().isSymbol(CcString::BasicType()))
return l.typeError(err[1]);
//Path
if (!l.second().first().isSymbol(FText::BasicType()))
return l.typeError(err[1]);
//[Size]
if (l.length() == 3){
if (!l.third().first().isSymbol(CcInt::BasicType()))
return l.typeError(err[1]);
}
return NList(NList(CcBool::BasicType())).listExpr();
}
/*
5.2 Value Mapping
Some checking are done in the value mappting stage,
also only give the warning message.
If the given Size is different from the current cluster size,
then a warning message will be prompted.
If the file postfix doesn't coherence, several files are missing,
the user will also be warned.
If the Size is larger than the cluster size,
then the operator stops at the last file within the cluster range.
Each sub-file is renamed after being copied to the destination Data Server,
by removing its numerical postfix.
Therefore, all sub-files in the Data Servers share the same file.
Files are all copied to Mini-Secondo\/bin directory,
for the same reason that uses \$SECONDO\_BUILD\_DIR\/bin as the default path.
*/
int SpreadFilesValueMap(Word* args, Word& result,
int message, Word& local, Supplier s){
result = qp->ResultStorage(s);
string fileName, filePath;
size_t size;
fileName = ((CcString*)args[0].addr)->GetValue();
filePath = ((FText*)args[1].addr)->GetValue();
clusterInfo* ci = new clusterInfo();
if (qp->GetNoSons(s) == 3){
size = (size_t)((CcInt*)args[2].addr)->GetValue();
} else {
size = (size_t)ci->getSlaveSize();
}
if (size > ci->getSlaveSize()){
cerr << "Warning!! The Size parameter is larger than "
"the current cluster size. " << endl;
size = ci->getSlaveSize();
cerr << "Only " << size << " files will be spread. " << endl;
}
else if (size < ci->getSlaveSize()){
cerr << "Warning!! The Size parameter is less than "
"the current cluster size. " << endl;
}
if (fileName.length() == 0){
cerr << "Error!! The file name cannot be set empty. " << endl;
((CcBool*)(result.addr))->Set(true, false);
}
if (filePath.length() == 0)
filePath = FileSystem::GetCurrentFolder();
else if (filePath.find_first_of("/") != 0){
cerr << "Error!! An absolute path is required for the Path argument."
<< endl;
((CcBool*)(result.addr))->Set(true, false);
return 0;
}
const int PipeWidth = 10;
pthread_t threadID[PipeWidth];
SPF_LocalInfo *sli = new SPF_LocalInfo();
pthread_mutex_init(&CLI_mutex, NULL);
//Each thread transfer one file, and mark the result
SPF_Thread* sts[size];
for (int fi = 0; (size_t)fi < size;)
{
string file = filePath + "/" + fileName + "_" + int2string(fi);
if ((size_t)fi >= size)
break;
//Files should be distributed on slaves only
string rmsBin = ci->getMSECPath(fi + 1, true, false);
FileSystem::AppendItem(rmsBin, "bin/"+fileName);
for (int ti = 0; ti < PipeWidth; ti++)
{
if (!sli->getTokenPass(ti) || pthread_kill(threadID[ti], 0))
{
sli->setTokenPass(ti, true);
sts[fi] = new SPF_Thread(sli, ti, fi, file, rmsBin);
pthread_create(&threadID[ti], NULL, SPF_Thread::tCopyFile, sts[fi]);
fi++;
break;
}
}
}
for (int ti = 0; ti < PipeWidth;ti++)
{
if (sli->getTokenPass(ti)){
pthread_join(threadID[ti], NULL);
}
}
for (int fi = 0; (size_t)fi < size; fi++)
{
if (!sts[fi]->getResult()){
((CcBool*)(result.addr))->Set(true, false);
}
}
((CcBool*)(result.addr))->Set(true, true);
return 0;
}
void* SPF_Thread::tCopyFile(void* ptr)
{
SPF_Thread* st = (SPF_Thread*)ptr;
string local = st->source;
if (!FileSystem::FileOrFolderExists(local)){
pthread_mutex_lock(&CLI_mutex);
cerr << "Warning!! Cannot locate the file " << local << endl;
pthread_mutex_unlock(&CLI_mutex);
st->setResult(true);
return NULL;
} else if (FileSystem::IsDirectory(local)){
pthread_mutex_lock(&CLI_mutex);
cerr << "Error!! File " << local
<< " should not be a directory." << endl;
pthread_mutex_unlock(&CLI_mutex);
st->setResult(false);
return NULL;
}
int copyTimes = MAX_COPYTIMES;
bool ok = false;
pthread_mutex_lock(&CLI_mutex);
pthread_mutex_unlock(&CLI_mutex);
while (copyTimes-- > 0){
if (0 == system((scpCommand + local + " " + st->dest).c_str())){
ok = true;
break;
}
}
st->setResult(ok);
pthread_mutex_lock(&CLI_mutex);
if (!ok){
cerr << "Error!! Cannot transfer the file " << local << endl;
}
pthread_mutex_unlock(&CLI_mutex);
st->releaseToken();
return NULL;
}
Operator spreadFilesOp(
SpreadFilesInfo(), SpreadFilesValueMap, SpreadFilesTypeMap);
/*
5 Operator ~collect~
This operator is used to collect the data from the partition files denoted
by the given flist.
----
flist(tuple) x [row] x [column] -> stream(tuple)
----
*/
struct CollectInfo : OperatorInfo {
CollectInfo() : OperatorInfo()
{
name = "collect";
signature = "flist(tuple) x [int] x [int] -> stream(tuple)";
syntax = "flist(tuple) x [row] x [column] -> stream(tuple)";
meaning = "This operator is used to collect the data "
"from the partition files denoted by the given flist.";
}
};
/*
5.1 Type Mapping
First ensure the distributed data in flist is a rel(tuple) type.
----
flist(tuple) x [int] x [int] -> stream(tuple)
----
If only one optional parameter is given, then it's viewed as a row number.
The optional parameters only accept non-negative integer numbers.
Any operators create new flist objects,
cannot be used before the ~collect~ operator,
or else the creation will be done twice since we use the
GetNLArgValueInTM function in query processor,
*/
ListExpr CollectTypeMap(ListExpr args)
{
NList l(args);
string err[] = {
//0
"ERROR! Operator expects flist x [int] x [int]. ",
"ERROR! Unavailable optional parameters.",
"ERROR! Operator expects row and column numbers "
"are non-negative values.",
};
try{
NList pType, pValue;
if (l.length() != 2)
return l.typeError(err[0]);
//First flist
pType = l.first().first();
pValue = l.first().second();
if (!isFListStreamDescription(pType)){
return l.typeError(err[0]);
}
NList tupleType = pType.second().second();
//Optional parameters
pType = l.second().first();
pValue = l.second().second();
if (pType.length() > 2){
return l.typeError(err[0]);
}
else if (pType.length() > 0){
if (!pType.first().isSymbol(CcInt::BasicType())){
return l.typeError(err[0]);
}
ListExpr opVal;
if (!qp->GetNLArgValueInTM(pValue.first().listExpr(), opVal)){
return l.typeError(err[1]);
}else{
int rowNum = NList(opVal).intval();
if (rowNum < 0){
return l.typeError(err[2]);
}
}
if (pType.length() > 1){
if (!pType.first().isSymbol(CcInt::BasicType())){
return l.typeError(err[0]);
}
if (!qp->GetNLArgValueInTM(pValue.second().listExpr(), opVal)){
return l.typeError(err[1]);
}else{
int columnNum = NList(opVal).intval();
if (columnNum < 0){
return l.typeError(err[2]);
}
}
}
}
NList streamType =
NList(NList(Symbol::STREAM()),
NList(tupleType));
return streamType.listExpr();
} catch(...){
return listutils::typeError(err[0]);
}
}
/*
5.2 Value Mapping
By default, it reads all partition files denoted in the given flist, and
returns the tuples in a stream.
If the optional parameters are given, then this operator only reads
part partition files.
For any partition files listed a flist, both it's row and column numbers are
non-zero positive integer numbers.
If the row number is 0, then it denotes a complete row partition files, while
a complete column part files are denoted when the column number is 0. E.g,
* collect[1] or collect[1,0] means to collect all partitions files
in the first row,
* collect[0,2] means to collect all partition files in the second column,
* collect[1,2] means to collect one partition file in the 1th row
and 2th column,
* collect[0,0] means to collect all files inside the matrix.
*/
int CollectValueMap(Word* args, Word& result,
int message, Word& local, Supplier s){
CollectLocalInfo* cli = 0;
switch(message){
case OPEN: {
fList* partitionFileList = (fList*)args[0].addr;
NList currentCluster = clusterInfo().toNestedList();
if (!partitionFileList->isCollectable(currentCluster)){
cerr << "This flist is not collectable" << endl;
return CANCEL;
}
size_t row = 0, column = 0;
Supplier optList = args[1].addr;
int optLen = qp->GetNoSons(optList);
if (1 == optLen){
row = ((CcInt*)
qp->Request(qp->GetSupplier(optList,0)).addr)->GetValue();
}else if (2 == optLen){
row = ((CcInt*)
qp->Request(qp->GetSupplier(optList,0)).addr)->GetValue();
column = ((CcInt*)
qp->Request(qp->GetSupplier(optList,1)).addr)->GetValue();
}
cli = (CollectLocalInfo*)local.addr;
if (cli){
delete cli;
cli = 0;
}
cli = new CollectLocalInfo(partitionFileList, row, column);
#ifdef SEQCOPY
if (cli->fetchAllPartFiles()){
local.setAddr(cli);
}
else{
delete cli;
cli = 0;
local.setAddr(Address(0));
}
#endif
local.setAddr(cli);
return 0;
}
case REQUEST: {
if (0 == local.addr)
return CANCEL;
cli = (CollectLocalInfo*)local.addr;
#ifdef PIPECOPY
result.setAddr(cli->getNextTuple2());
#else
result.setAddr(cli->getNextTuple());
#endif
if ( 0 == result.addr)
{
return CANCEL;
}
else
{
return YIELD;
}
}
case CLOSE: {
cli = (CollectLocalInfo*)local.addr;
if (!cli)
return CANCEL;
else{
delete cli;
cli = 0;
local.setAddr(Address(0));
}
return 0; //must return
}
}
return 0;
}
#ifdef PIPECOPY
pthread_mutex_t CollectLocalInfo::CLI_OWN_Mutex;
#endif
CollectLocalInfo::CollectLocalInfo(fList* _fl, size_t _r, size_t _c):
fileList(_fl), row(_r), column(_c), inputFile(0)
{
NList typeList = fileList->getInterTypeList();
//Get the resultType
SecondoCatalog* sc = SecondoSystem::GetCatalog();
ListExpr resultTypeList =
sc->NumericType(typeList.second().listExpr());
resultType = new TupleType(resultTypeList);
#ifdef PIPECOPY
//Assign the tasks here.
size_t rBegin = ( row == 0 ) ? 1 : row,
rEnd = ( row == 0 ) ? fileList->getMtxRowNum() : row;
fileStatus = new bool[(rEnd - rBegin + 1)];
memset(tokenPass, false, PipeWidth);
pthread_mutex_init(&CLI_OWN_Mutex,NULL);
for (size_t ri = rBegin; ri <= rEnd; ri++)
{
ListExpr columnList = fileList->getColumnList(ri);
CLI_Thread* ct = new CLI_Thread(this, fileList->getSubName(),
ri, columnList);
int cand = fileList->getPartitionFileLoc(ri, *ct->remotePaths);
if (cand > 0)
{
if (column > 0)
{
//Read one column only
ListExpr rest = columnList;
bool findColumn = false;
while(!nl->IsEmpty(rest))
{
int elem = nl->IntValue(nl->First(rest));
if ((int)column == elem){
findColumn = true;
break;
}
rest = nl->Rest(rest);
}
if (findColumn){
ct->allColumns->push_back((int)column);
}
else{
cerr << "The partition file (" << ri << "," << column << ") "
"is unavailable. " << endl;
fileStatus[ri - 1] = false;
break;
}
}
else
{
//Read all columns
ListExpr rest = columnList;
while(!nl->IsEmpty(rest))
{
ct->allColumns->push_back(nl->IntValue(nl->First(rest)));
rest = nl->Rest(rest);
}
}
fileTasks.push_back(ct);
}
else
{
delete ct;
}
}
pthread_create(&faf_TID, NULL, fetchAllPartFiles2, this);
fIdx = 0;
#endif
}
#ifdef SEQCOPY
bool CollectLocalInfo::fetchAllPartFiles()
{
//According to the accepted row and column number,
//fetch data files and fullfill their names to the partFiles vector
size_t rBegin = ( row == 0 ) ? 1 : row,
rEnd = ( row == 0 ) ? fileList->getMtxRowNum() : row;
clusterInfo* cluster = new clusterInfo();
string localIP = cluster->getLocalIP();
for ( size_t ri = rBegin; ri <= rEnd; ri++)
{
//Files of a same row are always kept together
vector<string> remotePaths;
size_t cand =
fileList->getPartitionFileLoc(ri, remotePaths);
if (cand > 0)
{
NList newColList = fileList->getColumnList(ri);
if (column != 0)
{
//find one file of each row
//check the column number is exist in the column list
NList rest = newColList;
bool find = false;
while (!rest.isEmpty()){
int elem = rest.first().intval();
if ( (int)column == elem){
find = true;
break;
}
rest.rest();
}
if (find){
newColList = NList();
newColList.append(NList((int)column));
}
else{
cerr << "The partition ("
<< ri << "," << column <<") is unavailable. " << endl;
return false;
}
}
NList rest = newColList;
while (!rest.isEmpty()){
size_t ci = rest.first().intval();
string fileName = fileList->getSubName()
+ "_" + int2string(ri) + "_" + int2string(ci);
string localPath = getLocalFilePath("", fileName, "", true);
vector<string>::iterator pit = remotePaths.begin();
while (pit != remotePaths.end()){
string rPath = *pit;
FileSystem::AppendItem(rPath, fileName);
string remoteIP, remoteLocalPath;
istringstream iss(rPath);
getline(iss, remoteIP, ':');
getline(iss, remoteLocalPath, ':');
bool found = false;
if (remoteIP.compare(localIP) == 0)
{
localPath = remoteLocalPath;
found = true;
}
else
{
int copyTime = MAX_COPYTIMES;
while (copyTime-- > 0){
if (0 != system(
(scpCommand + rPath + " " + localPath).c_str()))
{
cerr << "Warning! Copy remote file from " << rPath
<< " doesn't work yet." << endl;
}else
{
break;
}
}
if (copyTime <= 0){
cerr << "ERROR! Copy remote file from " << rPath
<< " fails" << endl;
return false;
}
else{
found = true;
}
}
if (found)
{
partFiles.push_back(localPath);
break;
}
pit++;
}
rest.rest();
}
}
}
return true;
}
#endif
#ifdef PIPECOPY
/*
With pipeline copy method, one thread starts when the valueMapping function
starts, to fetch all files over the cluster.
In this thread, it starts one independent thread for each row of the matrix,
each file is listed into ~partFiles~ when it is prepared.
When the first file in the ~partFiles~ is set, the operator starts to
read the tuples from it.
However, as the nestedList class in Secondo is not safe for thread processing,
all preparation work for each row have to be done inside the main process,
instead of done by every thread.
*/
void* CollectLocalInfo::fetchAllPartFiles2(void* ptr)
{
CollectLocalInfo* cli = (CollectLocalInfo*)ptr;
size_t row = cli->getRow();
size_t rBegin = ( row == 0 ) ? 1 : row,
rEnd = ( row == 0 ) ? cli->getFileList()->getMtxRowNum() : row;
pthread_t threadID[PipeWidth];
for (vector<CLI_Thread*>::iterator task = cli->fileTasks.begin();
task != cli->fileTasks.end();)
{
//For each task(row), start an independent thread to copy it from remote
//computer, if it does not exist in the present one.
for (int token = 0; token < PipeWidth; token++)
{
if (task == cli->fileTasks.end())
break;
if (!cli->tokenPass[token] || pthread_kill(threadID[token], 0))
{
cli->tokenPass[token] = true;
(*task)->setToken(token);
pthread_create(&threadID[token], NULL, tCopyFile, (*task));
task++;
}
}
}
for (int token = 0; token < PipeWidth; token++)
{
//Wait until all copy threads finishes
if (cli->tokenPass[token]){
pthread_join(threadID[token], NULL);
}
}
for (size_t ri = 0; ri <(rEnd - rBegin + 1); ri++){
if (!cli->fileStatus[ri]){
//It is possible that some rows are empty,
//hence only the warning message instead of error message is given.
cerr << "Warning! Row " << (rBegin + ri) << " is not prepared" << endl;
}
}
return NULL;
}
void* CollectLocalInfo::tCopyFile(void* ptr)
{
//Copy thread function
CLI_Thread *ct = (CLI_Thread*)ptr;
int token = ct->token;
CollectLocalInfo* li = ct->cli;
vector<string>* remotePaths = ct->remotePaths;
int ri = ct->row;
string localIP = clusterInfo().getLocalIP();
string subName = ct->subName;
vector<int>* allColumns = ct->allColumns;
//Copy files of the current row, either copy one column or the whole row
string fileName = subName + "_" + int2string(ri)
+ "_" + (li->column > 0 ? int2string(li->column) : "*");
//Get only the file path, without attaching any file name after it.
string localPath = getLocalFilePath("", "", "", false);
vector<string>::iterator pit = remotePaths->begin();
while (pit != remotePaths->end())
{
string rPath = *pit;
string remoteIP, remoteLocalPath;
istringstream iss(rPath);
getline(iss, remoteIP, ':');
getline(iss, remoteLocalPath, ':');
bool getFile = false;
if (remoteIP.compare(localIP) == 0)
{
localPath = remoteLocalPath;
getFile = true;
}
else
{
FileSystem::AppendItem(rPath, fileName);
//Copy from remote
int copyTime = MAX_COPYTIMES;
while (copyTime-- > 0){
if (0 == system((scpCommand + rPath + " " + localPath).c_str())){
getFile = true;
break;
}
}
if (!getFile){
pthread_mutex_lock(&CLI_OWN_Mutex);
cerr << "ERROR! Copy remote file from " << rPath << " fail." << endl;
pthread_mutex_unlock(&CLI_OWN_Mutex);
}
}
//Check all file is there
bool findFile = true;
if (getFile)
{
for (vector<int>::iterator ci = allColumns->begin();
ci != allColumns->end(); ci++)
{
fileName = subName + "_" + int2string(ri)
+ "_" + int2string(*ci);
string file = localPath;
FileSystem::AppendItem(file, fileName);
if (!FileSystem::FileOrFolderExists(file)){
findFile = false;
break;
}
else{
pthread_mutex_lock(&CLI_OWN_Mutex);
li->partFiles.push_back(file);
pthread_mutex_unlock(&CLI_OWN_Mutex);
}
}
}
if (findFile){
pthread_mutex_lock(&CLI_OWN_Mutex);
li->fileStatus[ri - 1] = true;
li->tokenPass[token] = false;
pthread_mutex_unlock(&CLI_OWN_Mutex);
break;
}
pit++;
}
return NULL;
}
#endif
#ifdef SEQCOPY
bool CollectLocalInfo::partFileOpened()
{
if (0 == inputFile){
if (partFiles.size() != 0){
string partFileName = partFiles.back();
//open the file to inputFiles
inputFile = new ifstream(partFileName.c_str(), ios::binary);
if (!inputFile->good()){
inputFile = 0;
return false;
}
//Read the tail description list
u_int32_t descSize;
size_t fileLength;
inputFile->seekg(0, ios::end);
fileLength = inputFile->tellg();
inputFile->seekg(
(fileLength - sizeof(descSize)), ios::beg);
inputFile->read((char*)&descSize, sizeof(descSize));
char descStr[descSize];
inputFile->seekg(
(fileLength - (descSize + sizeof(descSize))), ios::beg);
inputFile->read(descStr, descSize);
inputFile->seekg(0, ios::beg);
NList descList = NList(binDecode(string(descStr)));
if (descList.isEmpty())
{
cerr << "\nERROR! Tail description list read in "
<< partFileName << "fail." << endl;
return false;
}
return true;
}
else{
return false; //No more file in the partFiles list
}
}
else{
return true; //Exist an opened file
}
}
Tuple* CollectLocalInfo::getNextTuple()
{
Tuple* t = 0;
//Read one tuple from the present opened file
//if there is no more data in the current file,
//then remove the file from the partFiles list,
//and open the new one.
while (partFileOpened()){
u_int32_t blockSize = 0;
inputFile->read( reinterpret_cast<char*>(&blockSize),
sizeof(blockSize));
if (blockSize > 0)
{
//read and return the tuple
blockSize -= sizeof(blockSize);
char *tupleBlock = new char[blockSize];
inputFile->read(tupleBlock, blockSize);
t = new Tuple(resultType);
t->ReadFromBin(tupleBlock, blockSize);
delete[] tupleBlock;
break;
}
else
{
//close the opened file,
//pop the file name from the file list.
inputFile->close();
delete inputFile;
inputFile = 0;
partFiles.pop_back();
}
}
return t;
}
#endif
#ifdef PIPECOPY
Tuple* CollectLocalInfo::getNextTuple2()
{
Tuple* t = 0;
if (inputFile != 0)
{
t = readTupleFromFile(inputFile, resultType, 1);
if ( t == 0 ){
inputFile->close();
delete inputFile;
inputFile = 0;
}
else
return t;
}
string inputFileName = "";
while(true)
{
if (inputFileName.length() != 0){
inputFile = new ifstream(inputFileName.c_str(), ios::binary);
if (!inputFile->good()){
cerr << "ERROR! Read file " << inputFileName << " fail.\n\n";
inputFile = 0;
return 0;
}
//Read the description list
u_int32_t descSize;
size_t fileLength;
inputFile->seekg(0, ios::end);
fileLength = inputFile->tellg();
inputFile->seekg((fileLength - sizeof(descSize)), ios::beg);
inputFile->read((char*)&descSize, sizeof(descSize));
char descStr[descSize];
inputFile->seekg((fileLength - (descSize + sizeof(descSize))),
ios::beg);
inputFile->read(descStr, descSize);
inputFile->seekg(0, ios::beg);
NList descList = NList(binDecode(string(descStr)));
if (descList.isEmpty()){
cerr << "ERROR! Format error with fetched file "
<< inputFileName << endl;
inputFile->close();
delete inputFile;
inputFile = 0;
return 0;
}
t = readTupleFromFile(inputFile, resultType, 1);
if ( t == 0 ){
cerr << "Waring! File " << inputFileName
<< " is empty! " << endl;
inputFile->close();
delete inputFile;
inputFile = 0;
inputFileName = "";
}
else
return t;
}
else if (partFiles.size() > 0 && fIdx < partFiles.size()){
inputFileName = partFiles[fIdx];
fIdx++;
}
if (pthread_kill(faf_TID, 0)
&& (fIdx >= partFiles.size())
&& inputFileName.length() == 0){
break;
}
}
return 0;
}
#endif
Operator collectOp(CollectInfo(), CollectValueMap, CollectTypeMap);
/*
4. Type Operator ~PARA~
*/
struct ParaInfo : OperatorInfo
{
ParaInfo()
{
name = "para";
signature =
"( flist(T) ) -> T \n"
"T -> T, T in DELIVERABLE";
syntax = "type operator";
meaning = "Extract the data type from a flist object";
}
};
ListExpr ParaTypeMapping( ListExpr args)
{
NList l(args);
string tpeErr = "Eexpect one flist or DELIVERABLE input";
if (l.length() != 1)
return l.typeError(tpeErr);
NList inType;
if (l.first().isAtom())
{
//non flist input
if (!listutils::isKind(
l.first().listExpr(), Kind::DELIVERABLE())){
return l.typeError(tpeErr);
}
inType = l.first();
}
else
{
if (!l.first().first().isSymbol(fList::BasicType()))
return l.typeError(tpeErr);
inType = l.first().second();
}
return inType.listExpr();
}
int ParaValueMapping(Word* args, Word& result,
int message, Word& local, Supplier s){
cerr << "\nOps... It is not allowed to use para operator in "
"any directly evaluable queries."
<< endl;
return 0;
}
Operator paraOp(ParaInfo(), ParaValueMapping, ParaTypeMapping);
struct TParaInfo : OperatorInfo
{
TParaInfo()
{
name = "TPARA";
signature =
"( flist(ANY) ) -> ANY";
syntax = "type operator";
meaning = "Extract the data type from the first flist object";
}
};
ListExpr TParaTypeMapping( ListExpr args)
{
try{
NList l(args);
if (l.length() < 1){
return l.typeError("Expect at least one argument.");
}
NList ffListType = l.first();
if (ffListType.isAtom()){
return ffListType.listExpr();
}
else
{
if (ffListType.first().isSymbol(fList::BasicType())){
return ffListType.second().listExpr();
}
else{
return ffListType.listExpr();
}
}
} catch(...){
return listutils::typeError("invalid input");
}
}
struct TPara2Info : OperatorInfo
{
TPara2Info()
{
name = "TPARA2";
signature =
"( ANY x flist(ANY) ) -> ANY";
syntax = "type operator";
meaning = "Extract the data type from the second flist object";
}
};
ListExpr TPara2TypeMapping( ListExpr args)
{
try{
NList l(args);
if (l.length() < 2)
return l.typeError("Expect at least two arguments.");
NList sfListType = l.second();
if (sfListType.isAtom()){
return sfListType.listExpr();
}
else
{
if (sfListType.first().isSymbol(fList::BasicType())){
return sfListType.second().listExpr();
}
else{
return sfListType.listExpr();
}
}
} catch(...){
return listutils::typeError("invalid input");
}
}
/*
3 Operator ~pffeed~
Created on 14th Jul, 2012
Jiamin Lu
This operator reads a set of data files from the cluster,
and returns the tuples from them. Files are copied to the current
node by pipeline. It maps
----
stream(tuple(aR:int, aC:int, aD:int, ...))
x aR x aC x aD
x fileName x [filePath] x [attemptTimes]
x [T1] x [T2]
\to stream(tuple())
----
The input tuple stream contains at least 3 attributes:
* aR: The files must be contained inside a file-matrix,
and this attribute tells their row numbers.
* aC: It tells the files' column numbers.
* aD: It tells the files' first destination data server ID in the DS-catalog.
The following 3 symbols tells the names of above 3 attributes.
Also the fileName of the data files are given as the fourth parameter,
following with the optional file path.
All files must start with an identical fileName, and kept in the same
file path if the filePath parameter is set.
By default, they are kept in the pathes described in the DS-catalog.
All files can be copied from attemptTimes data servers,
if the copy fails in its first destination. The default value of the
attemptTimes is 1.
If the type file is not found in the current node, it can be fetched
from T1 or T2 nodes, if these two optional parameters are set.
*/
struct pffeed1Info : OperatorInfo
{
pffeed1Info()
{
name = "pffeed";
signature =
"stream(tuple(aR:int, aC:int, aD:int, ...)) "
"x aR x aC x aD x string x [text] x [int] "
"x [int] x [int] -> stream(tuple())";
meaning = "Reads a set of data files from the cluster,"
"and returns the tuples from them.";
}
};
ListExpr pffeedTypeMap(ListExpr args, bool noFlob)
{
try{
NList l(args);
string lenErr = "ERROR! Operator pffeed expects 3 argument lists. ";
string inSTErr = "ERROR! Operator pffeed expects a "
"stream(tuple()) as input";
string firstPLErr = "ERROR! Operator pffeed expects first list as "
"aR x aC x aD x fileName x [filePath] x [attemptTimes]";
string rcdErr = "ERROR! aR, aC and aD do not exist or "
"are not int attributes.";
string secondPLErr = "ERROR! Operator pffeed expects second list as"
" [int] x [int]";
string atrErr = "ERROR! Required attribute is not found in the "
"given tuple stream";
string tpeErr = "ERROR! The type file is not found.";
string ifeErr = "ERROR! Parameter cannot be evaluated: ";
string ntfErr = "ERROR! No exist type file: ";
string ntrErr = "ERROR! A tuple relation type list is "
"NOT contained in file: ";
string nslErr = "ERROR! The slave list file does not exist."
"Is $PARALLEL_SECONDO_SLAVES correctly set up ?";
string norErr = "ERROR! Remote node for type file is out of range.";
string ctfErr = "ERROR! Copy remote type file fail.";
string iatErr = "ERROR! Infeasible attempt times.";
string fileName, filePath;
NList pType, pValue;
int attTimes = 1;
if (l.length() != 3)
return l.typeError(lenErr);
//1th stream(tuple())
pType = l.first().first();
NList attrsList;
if (!pType.checkStreamTuple(attrsList))
return l.typeError(inSTErr);
//2th aR x aC x aD x fileName x [filePath] x [attemptTimes]
pType = l.second().first();
pValue = l.second().second();
string attrNam[3];
int attrPos[3];
for (int i = 1; i <= 3; i++)
{
if (!pType.elem(i).isSymbol())
return l.typeError(firstPLErr);
attrNam[(i-1)] = pType.elem(i).convertToString();
ListExpr attrType;
attrPos[(i-1)] = listutils::findAttribute(attrsList.listExpr(),
attrNam[(i-1)], attrType);
if (attrPos[(i-1)] < 1)
return l.typeError(rcdErr);
if (!NList(attrType).isSymbol(CcInt::BasicType()))
return l.typeError(rcdErr);
}
if (!pType.fourth().isSymbol(CcString::BasicType()))
return l.typeError(firstPLErr);
ListExpr fnList;
if (!QueryProcessor::GetNLArgValueInTM(pValue.fourth().listExpr(),fnList))
return l.typeError(ifeErr + "fileName");
fileName = NList(fnList).str();
// text, int, or text x int
Cardinal opIdx = 5;
while (opIdx <= pType.length())
{
if (pType.elem(opIdx).isSymbol(FText::BasicType()))
{
ListExpr fpList;
if (!QueryProcessor::GetNLArgValueInTM(
pValue.elem(opIdx).listExpr(), fpList))
return l.typeError(ifeErr + "filePath");
filePath = NList(fpList).str();
}
else if (pType.elem(opIdx).isSymbol(CcInt::BasicType()))
{
ListExpr atList;
if (!QueryProcessor::GetNLArgValueInTM(
pValue.elem(opIdx).listExpr(), atList))
return l.typeError(ifeErr + "attemptTimes");
attTimes = NList(atList).intval();
if (attTimes < 1)
return l.typeError(iatErr);
}
else
return l.typeError(firstPLErr);
opIdx++;
}
filePath = getLocalFilePath(filePath, fileName, "_type");
//3th [int] x [int]
pType = l.third().first();
pValue = l.third().second();
int typeNode[2] = { -1, -1};
Cardinal pIdx = 0;
while ( pIdx < pType.length())
{
if (!pType.elem(pIdx + 1).isSymbol(CcInt::BasicType()))
return l.typeError(secondPLErr);
ListExpr tnList;
if (!QueryProcessor::GetNLArgValueInTM(pValue.elem(pIdx + 1).listExpr(),
tnList))
return l.typeError(ifeErr + "type node");
typeNode[pIdx] = NList(tnList).intval();
pIdx++;
}
bool delTypeFile = false;
bool typePrepared = false;
for (int i = 0; i < 2; i++)
{
if (typeNode[i] >= 0)
{
clusterInfo *ci = new clusterInfo();
if (!ci->isOK())
return l.typeError(nslErr);
if (typeNode[i] > (int)ci->getClusterSize())
{
ci->print();
return l.typeError(norErr);
}
string rPath = ci->getRemotePath(typeNode[i], true, false, true,
true, (fileName + "_type"));
filePath = FileSystem::MakeTemp(filePath);
// cerr << "Copy the type file " << filePath
// << " from <-" << "\t" << rPath << endl;
int atimes = MAX_COPYTIMES;
int rc = 0;
while (atimes-- > 0){
rc = system((scpCommand + rPath + " " + filePath).c_str());
if (0 == rc){
delTypeFile = true;
typePrepared = true;
break;
} else {
WinUnix::sleep(1);
}
}
if (typePrepared)
break;
}
}
if (!typePrepared)
return l.typeError(ctfErr);
ListExpr relType;
if (!nl->ReadFromFile(filePath, relType))
return l.typeError(ntfErr + filePath);
if (delTypeFile){
FileSystem::DeleteFileOrFolder(filePath);
}
NList resultType;
string ostStr, nstStr;
//ost: old stream type, type from the type file, no DS\_IDX
//nst: new stream type, for pffeed3, with DS\_IDX
if (noFlob)
{
//Ignore the incomplete term and add the DS\_IDX attribute
int count = 0;
ListExpr realRelType = rmTermNL(relType, "incomplete", count);
if (!(listutils::isRelDescription(realRelType)
|| listutils::isTupleStream(realRelType)))
return l.typeError(ntrErr + filePath);
NList osType = NList(NList(Symbol::STREAM()),
NList(NList(realRelType).second()));
ostStr = osType.convertToString();
NList resultAttrList = NList(nl->Second(nl->Second(realRelType)));
resultType =
NList(NList(Symbol::STREAM()),
NList(NList(Tuple::BasicType()), resultAttrList));
nstStr = resultType.convertToString();
assert(nstStr.compare(ostStr) == 0);
}
else
{
if (!(listutils::isRelDescription(relType)
|| listutils::isTupleStream(relType)))
return l.typeError(ntrErr + filePath);
resultType =
NList(NList(Symbol::STREAM()),
NList(NList(relType).second()));
nstStr = ostStr = resultType.convertToString();
}
//Remove the type file name
filePath = filePath.substr(0, filePath.find_last_of("/"));
return NList(NList(Symbol::APPEND()),
NList(NList(attrPos[0]),
NList(attrPos[1]),
NList(attrPos[2]),
NList(filePath, true, true),
NList(attTimes),
NList(ostStr, true, true)),
resultType).listExpr();
} catch(...){
return listutils::typeError("invalid input");
}
}
int pffeedValueMap(Word* args, Word& result,
int message, Word& local, Supplier s, int mode)
{
PFFeedLocalInfo* pli = 0;
switch(message)
{
case OPEN: {
//Initialize the local info,
//start the thread to copy the files
int rp, cp, dp;
rp = ((CcInt*)args[3].addr)->GetValue() -1;
cp = ((CcInt*)args[4].addr)->GetValue() -1;
dp = ((CcInt*)args[5].addr)->GetValue() -1;
string fileName, filePath;
fileName = ((CcString*)qp->Request(
qp->GetSupplierSon(args[1].addr, 3)).addr)->GetValue();
filePath = ((FText*)args[6].addr)->GetValue();
int attTimes = ((CcInt*)args[7].addr)->GetValue();
pli = (PFFeedLocalInfo*)local.addr;
if (pli) delete pli;
assert((mode == 1) || (mode == 2) || (mode == 3));
bool noFlob = mode < 3 ? false : true;
pli = new PFFeedLocalInfo(s, args[0],
rp, cp, dp, fileName, filePath, attTimes, noFlob);
local.setAddr(pli);
return 0;
}
case REQUEST: {
pli = static_cast<PFFeedLocalInfo*>(local.addr);
if (!pli)
return CANCEL;
Tuple* tuple = pli->getNextTuple(mode);
if (tuple)
{
result.setAddr(tuple);
return YIELD;
}
return CANCEL;
}
case CLOSE: {
pli = static_cast<PFFeedLocalInfo*>(local.addr);
if (pli)
delete pli;
local.setAddr(0);
return 0;
}
}
return 0;
}
ListExpr pffeed1TypeMap(ListExpr args){
return pffeedTypeMap(args, false);
}
int pffeed1ValueMap(Word* args, Word& result,
int message, Word& local, Supplier s){
return pffeedValueMap(args, result, message, local, s, 1);
}
Operator pffeedOp(pffeed1Info(), pffeed1ValueMap, pffeed1TypeMap);
PFFeedLocalInfo::PFFeedLocalInfo(Supplier s, Word inputStream,
int rp, int cp, int dp, string _fn, string _fp, int _at, bool _nf)
: noFlob(_nf), fileName(_fn), localFilePath(_fp), attTimes(_at)
{
ListExpr streamTypeList = qp->GetType(s);
resultType = new TupleType(SecondoSystem::GetCatalog()
->NumericType(nl->Second(streamTypeList)));
inputType = 0;
if (noFlob)
{
//set the inputType
string ostStr = ((FText*)qp->Request(
qp->GetSupplierSon(s, 8)).addr)->GetValue();
ListExpr inputTypeList;
nl->ReadFromString(ostStr, inputTypeList);
inputType = new TupleType(SecondoSystem::GetCatalog()
->NumericType(nl->Second(inputTypeList)));
}
interCluster = new clusterInfo();
PLI_FAF_Thread* ft = new PLI_FAF_Thread(
this, inputStream, rp, cp, dp);
pthread_create(&faf_TID, NULL, fetchAllFiles, ft);
fIdx = 0;
curFileName = "";
curFilePt = 0;
curPrdIndex = -1;
}
void* PFFeedLocalInfo::fetchAllFiles(void* ptr)
{
PLI_FAF_Thread* pft = (PLI_FAF_Thread*)ptr;
PFFeedLocalInfo* pli = pft->pli;
Word inputStream = pft->inputStream;
int rp = pft->attrPos[0],
cp = pft->attrPos[1],
dp = pft->attrPos[2];
pthread_t threadID[PipeWidth];
memset(pli->tokenPass, false, PipeWidth);
pthread_mutex_init(&CLI_mutex, NULL);
Word tupleWord;
qp->Open(inputStream.addr);
qp->Request(inputStream.addr, tupleWord);
while(qp->Received(inputStream.addr))
{
Tuple * nextTuple = (Tuple*)tupleWord.addr;
int row = ((CcInt*)nextTuple->GetAttribute(rp))->GetValue(),
column = ((CcInt*)nextTuple->GetAttribute(cp))->GetValue(),
dest = ((CcInt*)nextTuple->GetAttribute(dp))->GetValue();
//Copy files over the cluster
for (int token = 0; token < PipeWidth; token++)
{
if (!pli->tokenPass[token] || pthread_kill(threadID[token],0))
{
pli->tokenPass[token] = true;
PLI_CF_Thread* ct =
new PLI_CF_Thread(pli, row, column, dest,token);
pthread_create(&threadID[token], NULL, tCopyFile, ct);
nextTuple->DeleteIfAllowed();
qp->Request(inputStream.addr, tupleWord);
break;
}
}
}
qp->Close(inputStream.addr);
for (int token = 0; token < PipeWidth; token++)
{
if (pli->tokenPass[token]){
pthread_join(threadID[token], NULL);
}
}
return NULL;
}
void* PFFeedLocalInfo::tCopyFile(void* ptr)
{
PLI_CF_Thread* ct = (PLI_CF_Thread*)ptr;
PFFeedLocalInfo* pli = (PFFeedLocalInfo*)ct->pli;
int row = ct->row, column = ct->column,
dest = ct->dest, token = ct->token;
int localNodeID = pli->interCluster->getLocalNode();
string localIP = pli->interCluster->getLocalIP();
int cand = pli->getAttemptTimes();
string fileName = pli->getFilePrefixName() + "_" + int2string(row)
+ "_" + int2string(column);
string filePath = pli->getLocalFilePath();
bool fileFound = false;
if ( (row < 0) || (column < 0) || (dest < 0))
{
pthread_mutex_lock(&CLI_mutex);
cerr << "Cannot get partition file with triple: "
<< "(" << row << "," << column << "," << dest << ")" << endl;
pthread_mutex_unlock(&CLI_mutex);
}
string lPath = filePath;
for (int att = 0; att < cand; att++)
{
//Copy the file from one node,
//traverse the next one if the copy fails
int nodeID = dest + att;
bool api = (nodeID !=
getRoundRobinIndex(row, pli->interCluster->getSlaveSize()))
? true : false;
//Attach Producer IP
string producerIP = pli->interCluster->getIP(row, true);
string targetIP = pli->interCluster->getIP(nodeID, true);
//remote and local file path
string rPath = pli->interCluster->getRemotePath(
nodeID, true, true, true, true,
fileName, api, producerIP);
FileSystem::AppendItem(lPath, fileName);
if (localIP.compare(targetIP) == 0){
if ( localNodeID != nodeID )
lPath = rPath.substr(rPath.find(":") + 1);
}
else{
//Copy file from the remote node
int copyTimes = MAX_COPYTIMES;
while (copyTimes-- > 0){
if (0 == system((scpCommand + rPath + " " + lPath).c_str())){
break;
}
else{
WinUnix::sleep(1);
}
}
}
if (FileSystem::FileOrFolderExists(lPath)){
fileFound = !FileSystem::IsDirectory(lPath);
}
if (fileFound){
break;
}
else{
pthread_mutex_lock(&CLI_mutex);
cerr << "Warning! Cannot fetch file at : "
<< lPath << " from " << rPath << endl;
pthread_mutex_unlock(&CLI_mutex);
}
}
pthread_mutex_lock(&CLI_mutex);
if (!fileFound){
cerr << "ERROR! Get file " << fileName << " from node " << dest
<< " in thread " << token << " fails! " << endl;
}
else{
int prdIndex = pli->interCluster->getInterIndex(row, true, true);
pli->partFiles.push_back(make_pair(lPath, prdIndex));
pli->tokenPass[token] = false;
}
pthread_mutex_unlock(&CLI_mutex);
return NULL;
}
Tuple* PFFeedLocalInfo::getNextTuple(int mode)
{
Tuple* t = 0;
if (curFilePt != 0)
{
t = readTupleFromFile(curFilePt, resultType, mode, curFileName);
if (t == 0){
//Read the next file
curFilePt->close();
delete curFilePt;
curFilePt = 0;
curFileName = "";
curPrdIndex = -1;
}
else
return t;
}
while (true)
{
if (curFileName.length() != 0){
curFilePt = new ifstream(curFileName.c_str(), ios::binary);
if (!curFilePt->good()){
cerr << "ERROR! Read file " << curFileName << " fail.\n\n";
curFilePt = 0;
return 0;
}
//Read the description list
u_int32_t descSize;
size_t fileLength;
curFilePt->seekg(0, ios::end);
fileLength = curFilePt->tellg();
curFilePt->seekg((fileLength - sizeof(descSize)), ios::beg);
curFilePt->read((char*)&descSize, sizeof(descSize));
char descStr[descSize];
curFilePt->seekg((fileLength - (descSize + sizeof(descSize))),
ios::beg);
curFilePt->read(descStr, descSize);
curFilePt->seekg(0, ios::beg);
NList descList = NList(binDecode(string(descStr)));
if (descList.isEmpty()){
cerr << "ERROR! Format error with fetched file "
<< curFileName << endl;
curFilePt->close();
delete curFilePt;
curFilePt = 0;
return 0;
}
t = readTupleFromFile(curFilePt, resultType, mode, curFileName);
if ( t == 0 ){
cerr << "Waring! File " << curFileName
<< " is empty! " << endl;
curFilePt->close();
delete curFilePt;
curFilePt = 0;
return 0;
}
return t;
}
else if (partFiles.size() > 0 && fIdx < partFiles.size()){
curFileName = partFiles[fIdx].first;
curPrdIndex = partFiles[fIdx].second;
fIdx++;
}
if (pthread_kill(faf_TID, 0)
&& (fIdx >= partFiles.size())
&& curFileName.length() == 0){
break;
}
}
return 0;
}
/*
4 Operator ~pffeed2~
It works like the combination of ~pffeed~ and ~ffeed2~,
fetching all required data files simultaneously and reading the tuples first.
The Flob data are read only when they are really needed.
However, both the tuple and Flob data are stored and transferred together
inside the same data file.
*/
struct pffeed2Info : OperatorInfo
{
pffeed2Info()
{
name = "pffeed2";
signature =
"stream(tuple(aR:int, aC:int, aD:int, ...)) "
"x aR x aC x aD x string x [text] x [int] "
"x [int] x [int] -> stream(tuple())";
meaning = "Reads a set of data files from the cluster,"
"and returns the tuples without reading the Flob data at first.";
}
};
ListExpr pffeed2TypeMap(ListExpr args){
return pffeedTypeMap(args, false);
}
int pffeed2ValueMap(Word* args, Word& result,
int message, Word& local, Supplier s){
return pffeedValueMap(args, result, message, local, s, 2);
}
Operator pffeed2Op(pffeed2Info(), pffeed2ValueMap, pffeed2TypeMap);
/*
4 Operator ~pffeed3~
It works like the combination of ~pffeed~ and ~ffeed3~,
fetching all required data files simultaneously and reading the tuples only.
The Flob data are remotely read by the ~fetchFlob~ operator later when they
are needed after the filter step.
The tuple and Flob data are stored and transferred separately in different files.
*/
struct pffeed3Info : OperatorInfo
{
pffeed3Info()
{
name = "pffeed3";
signature =
"stream(tuple(aR:int, aC:int, aD:int, ...)) "
"x aR x aC x aD x string x [text] x [int] "
"x [int] x [int] -> stream(tuple())";
meaning = "Reads a set of data files from the cluster,"
"and returns the tuples without Flob data.";
}
};
ListExpr pffeed3TypeMap(ListExpr args){
return pffeedTypeMap(args, true);
}
int pffeed3ValueMap(Word* args, Word& result,
int message, Word& local, Supplier s){
return pffeedValueMap(args, result, message, local, s, 3);
}
Operator pffeed3Op(pffeed3Info(), pffeed3ValueMap, pffeed3TypeMap);
/*
4 Operator ~hadoopMap~
Create DGO DLO kind flist after being queried
within the map step of the embedded Hadoop job.
Update at 22th Mar. 2012
Jiamin
Remove the DGO kind, but it can create either DLO or DLF kind flist.
The operator name is changed to ~hadoopMap~,
which means it creates a new flist only through the map step of
the preset Hadoop job.
The map step in MR works similar as a ~filter~ operation,
hence this operator only accepts one flist as the input,
but in its internal function, several other flists can be used too.
It is not necessary to indicate name to created sub-objects or sub-files,
which can be set by the system.
However, for DLF kind flist, it is better to keep the tridition.
Therefore, file name and file path is prepared as optional argument.
Also, an optional symbol is prepared, used to indicate the kind of the
result flist.
By default, it is DLO, but also can be set as DLF.
I didn't use an integer or a bool for this value,
since it is possible for us to create more kinds for the flist.
*/
struct hadoopMapInfo : OperatorInfo
{
hadoopMapInfo()
{
name = "hadoopMap";
signature =
"flist(T) x [Name:string] x [Path:text] "
"x [(DLO):DLF] x [Executed:bool] "
"x ( map T T1 ) -> flist(T1)";
meaning = "Create DLO or DLF kind flist after the map step";
}
};
/*
4.1 Type Mapping of ~hadoopMap~
This operator maps
----
T x string -> flist
----
The T is the data created by the prepositive query.
If this query is only an exist Secondo object name,
then this operator creates a DGO flist data.
Or else if the query contains at least one ~para~ operator,
then this operator creates a DLO flist data,
and creates local Secondo objects in slave databases through
delivering the prepositive query by a generic Hadoop job.
The string is the object name created on every slave database,
and it must be start with a capital character.
If the object name is given as an empty string,
then it creates a DGO kind flist by default.
Update at 21th Mar. 2012
Jiamin
Now this operator maps
----
flist(T) x [string] x [text] x [(DLO):DLF]
x ( map ( T -> T1) ) -> flist(T1)
----
Update at 14th May. 2012 by Jiamin
Adds the optional parameter mapTaskNum, now the operator maps
----
flist(T) x [string] x [text] x [(DLO):DLF] x [mapTaskNum]
x ( map ( T -> T1) ) -> flist(T1)
----
If the produced flist kind is DLO, then the mapTaskNum must be less than
the scale of slaves, as each slave database can only keep one sub-object.
Update at 21th Aug. 2012 by Jiamin
Adds an optional parameter executed:bool, to indicate whether the
Hadoop job practically process. If it is false, then this operator creates
an intermediate flist object. An intermediate flist contains an umempty
UEMapQuery (UnExecuted Map Query), describing the process in the Map stage.
It is composed by four elements(MapQuery DLO\_List DLF\_List mapTaskNum),
both hadoopReduce and hadoopReduce2 operators set their Map tasks based on
this UEMapQuery.
By default this value is true. Now this operator maps
----
flist(T) x [string] x [text] x [(DLO):DLF] x [int] x [bool]
x ( map ( T -> T1) ) -> flist(T1)
----
*/
ListExpr hadoopMapTypeMap(ListExpr args){
try{
NList l(args);
string lenErr = "ERROR! Operator hadoopMap expects 3 argument lists. ";
string typErr = "ERROR! Operator hadoopMap expects "
"flist(T) x [string] x [text] x [(DLO):DLF] x [int] x [bool] "
"x (map T T1))";
string ifaErr = "ERROR! Infeasible evaluation in TM of attribute:";
string mtnErr = "ERROR! Expect a positive map task number. ";
string umnErr = "ERROR! It is not allowed to produce a DLO kind flist, "
"with a row number that is larger than the slave scale.";
string nprErr = "ERROR! Operator hadoopMap expects "
"creating a new DLO or DLF kind flist.";
string onmErr = "ERROR! Operator hadoopMap expects the created "
"object name starts with upper case. ";
string uafErr = "ERROR!! The internal function is unavailable.";
string hnmErr = "ERROR! Exists homonymous flist type file in: ";
string fwtErr = "ERROR! Failed writing type into file: ";
string expErr = "ERROR! Improper output type for DLF flist";
string udnErr = "ERROR! Long database name is set.";
string uewErr = "ERROR! An unexecuted flist must be DLF type.";
string objName, filePath, qStr;
fListKind kind = DLO;
bool executed = true;
if (l.length() != 3)
return l.typeError(lenErr);
//The input must be a flist type data.
NList inputType = l.first().first();
if (inputType.isSymbol("typeerror") || inputType.length() != 2 ){
return l.typeError(typErr);
}
if (!inputType.first().isSymbol(fList::BasicType())){
return l.typeError(typErr);
}
//Optional parameters
int len = l.second().first().length();
clusterInfo ci;
size_t mapTaskNum = ci.getSlaveSize();
if (len > 0)
{
if ( len > 3 )
return l.typeError(typErr);
for (int i = 1; i <= len; i++)
{
NList pp = l.second().first().elem(i);
NList pv = l.second().second().elem(i);
if (pp.isSymbol(CcString::BasicType()))
{
//ObjectName defined.
ListExpr fnList;
if (!QueryProcessor::GetNLArgValueInTM(pv.listExpr(), fnList)){
return l.typeError(ifaErr + "objName");
}
objName = NList(fnList).str();
}
else if (pp.isSymbol(FText::BasicType()))
{
//FilePath defined
ListExpr fpList;
if (!QueryProcessor::GetNLArgValueInTM(pv.listExpr(), fpList)){
return l.typeError(ifaErr + "filePath");
}
filePath = NList(fpList).str();
}
else if (pp.isSymbol(CcInt::BasicType()))
{
//mapTaskNum
ListExpr mtnList;
if (!QueryProcessor::GetNLArgValueInTM(pv.listExpr(), mtnList)){
return l.typeError(ifaErr + "mapTaskNum");
}
mapTaskNum = NList(mtnList).intval();
}
else if (pp.isSymbol("DLF")){
kind = DLF;
}
else if (pp.isSymbol("DLO")){
kind = DLO;
}
else if (pp.isSymbol(CcBool::BasicType())){
ListExpr etList;
if (!QueryProcessor::GetNLArgValueInTM(pv.listExpr(), etList)){
return l.typeError(ifaErr + "executed");
}
executed = NList(etList).boolval();
}
else{
return l.typeError(typErr);
}
}
}
if (!executed && (kind != DLF)){
return l.typeError(uewErr);
}
if (mapTaskNum <= 0){
return l.typeError(mtnErr);
} else if ((kind == DLO) && (mapTaskNum > ci.getSlaveSize())){
return l.typeError(umnErr);
}
//Encapsulated object type
NList coType = l.third().first().third(); //Create Type
NList coQuery = l.third().second().third(); //Create Query
NList coBType; //Create Base Type
if (coType.isAtom()){
coBType = coType;
}
else{
coBType = coType.first();
}
if (coBType.isEqual("typeerror"))
return l.typeError(uafErr);
qStr = coQuery.convertToString();
//Query Parameter
string coParaName = l.third().second().second().first().str();
//If it is a DLF, then the output type must be a tuple stream
if (kind == DLF && !listutils::isTupleStream(coType.listExpr())){
return l.typeError(expErr);
}
NList resultType = NList(NList(fList::BasicType()), NList(coType));
if (objName.length() == 0)
objName = fList::tempName(false);
else
{
char f = objName[0];
if (f<'A' || f>'Z'){
return l.typeError(onmErr);
}
//If the name of sub-objects or sub-files are denoted by users,
//Then it must be kept in a text type file,
//in case of the homonymous problem
//also this file must be kept in the default file path.
string filePath =
getLocalFilePath("", (objName + "_type"),"", true);
if (FileSystem::FileOrFolderExists(filePath)){
ListExpr exeType;
bool ok = false;
if (nl->ReadFromFile(filePath, exeType)){
//TODO Need to be more compatible with file-related operators
if (nl->Equal(nl->Second(exeType),
resultType.second().second().listExpr())){
ok = true;
}
}
if (!ok)
return l.typeError(hnmErr + filePath);
}
else{
ListExpr expList = nl->Second(resultType.listExpr());
if (nl->IsAtom(expList)){
expList = nl->OneElemList(expList);
}
if (!nl->WriteToFile(filePath, expList)){
return l.typeError(fwtErr + filePath);
}
}
}
//Check the length of the database name
string dbName = fList::tempName(true);
if (dbName.length() >= 16){
//16 is the maximum length that a database name can be
return l.typeError(udnErr);
}
return NList(NList(Symbol::APPEND()),
NList(NList(qStr, true, true),
NList(objName, true, false),
NList((int)kind),
NList(coParaName, true, false),
NList(dbName, true, false)),
resultType).listExpr();
} catch(...){
return listutils::typeError("invalid input");
}
}
int hadoopMapValueMap(Word* args, Word& result,
int message, Word& local, Supplier s){
if ( message <= CLOSE )
{
//Appended parameters
//Query string
string CreateQuery =
((FText*)args[3].addr)->GetValue();
//Get the object name.
string CreateObjectName =
((CcString*)args[4].addr)->GetValue();
//Get the export file type
fListKind kind =
(fListKind)((CcInt*)args[5].addr)->GetValue();
//Get the object name of the input flist
string inParaName =
((CcString*)args[6].addr)->GetValue();
//Get the database name
string dbName =
((CcString*)args[7].addr)->GetValue();
//Optional parameters
string CreateFilePath = "";
Supplier oppList = args[1].addr;
int opLen = qp->GetNoSons(oppList);
int mapTaskNum = -1;
bool executed = true;
for (int i = 0; i < opLen; i++)
{
ListExpr pp = qp->GetType(qp->GetSupplierSon(oppList,i));
if (nl->IsEqual(pp, FText::BasicType())){
CreateFilePath = ((FText*)
qp->Request(qp->GetSupplier(oppList,i)).addr)->GetValue();
}
else if (nl->IsEqual(pp, CcInt::BasicType())){
mapTaskNum = ((CcInt*)
qp->Request(qp->GetSupplier(oppList, i)).addr)->GetValue();
}
else if (nl->IsEqual(pp, CcBool::BasicType())){
executed =
((CcBool*)qp->Request(qp->GetSupplier(oppList, i)).addr)->GetValue();
}
}
//Result type
clusterInfo *ci = new clusterInfo();
ListExpr resultType = qp->GetType(s);
fList* resultFList = new fList(CreateObjectName, NList(resultType),
ci, NList(), 1, false, kind);
fList* inputFList = (fList*)args[0].addr;
int dupTimes = inputFList->getDupTimes();
//Parameters required by the Hadoop job are:
ListExpr CreateQueryList;
nl->ReadFromString(CreateQuery, CreateQueryList);
vector<string> flistParaList;
vector<fList*> flistObjList;
flistParaList.push_back(inParaName); //The input parameter
flistObjList.push_back(inputFList);
vector<string> DLF_NameList, DLF_fileLocList;
vector<string> DLO_NameList, DLO_locList;
//Scan para operations in the internal function list
//If the ~para~ operator contains a DELIEVERABLE object,
//then extract its value in nested-list format,
//and embed inside the query.
//Or else, replace the para operation with the flist name,
//then left to the next function.
bool ok = true;
CreateQueryList = replaceParaOp(CreateQueryList, flistParaList,
flistObjList, ok);
if (!ok){
cerr << "Replace para operation fails" << endl;
}
else{
CreateQueryList = replaceSecObj(CreateQueryList, ok);
if (ok)
{
//Replace parameter value according to their flist value
for (size_t i = 0; i < flistParaList.size(); i++)
{
int argIndex = (i == 0 ? 1 : 0);
CreateQueryList =
replaceDLOF(CreateQueryList,
flistParaList[i], flistObjList[i],
DLF_NameList, DLF_fileLocList,
DLO_NameList, DLO_locList, false, ok, argIndex);
if (!ok){
cerr << "Replace DLF flist fails" << endl;
break;
}
}
}
}
NList dlfNameList, dlfLocList;
NList dloNameList, dloLocList;
ListExpr sidList;
if (!ok){
cerr << "Preparing Hadoop job parameters fails." << endl;
}
else
{
for (size_t i = 0; i < DLF_NameList.size(); i++)
{
dlfNameList.append(NList(DLF_NameList[i], true, false));
ListExpr locList;
nl->ReadFromString(DLF_fileLocList[i], locList);
dlfLocList.append(NList(locList));
}
for (size_t i = 0; i < DLO_NameList.size(); i++)
{
dloNameList.append(NList(DLO_NameList[i], true, false));
ListExpr locList;
nl->ReadFromString(DLO_locList[i], locList);
dloLocList.append(NList(locList));
}
if (mapTaskNum < 0)
mapTaskNum = ci->getSlaveSize();
if (executed)
{
//Call the Hadoop job
stringstream queryStr;
queryStr
<< "hadoop jar ParallelSecondo.jar "
"ParallelSecondo.PS_HadoopMap \\\n"
<< dbName << " " << CreateObjectName << " "
<< " \"" << tranStr(nl->ToString(CreateQueryList),
"\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(dlfNameList.convertToString(),
"\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(dlfLocList.convertToString(),
"\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(dloNameList.convertToString(),
"\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(dloLocList.convertToString(),
"\"", "\\\"") << "\" \\\n"
<< dupTimes << " " << kind << " "
<< " \"" << CreateFilePath << "\" "
<< mapTaskNum << endl;
int rtn = -1;
cout << queryStr.str() << endl;
rtn = system(queryStr.str().c_str());
ok = (rtn == 0);
if (ok)
{
FILE *fs;
char buf[MAX_STRINGSIZE];
fs = popen("hadoop dfs -cat OUTPUT/part*", "r");
if (NULL != fs)
{
stringstream ss;
while (!feof(fs) && fgets(buf,sizeof(buf),fs))
{
ss << buf;
}
string locListStr = ss.str();
locListStr = locListStr.substr(locListStr.find_first_of(' '));
nl->ReadFromString(locListStr, sidList);
}
else
ok = false;
}
if (ok)
{
NList fileLocList;
if (kind == DLO)
CreateFilePath = dbLoc;
//Create file location list
ListExpr rest = sidList;
int lastRow = 0;
NList emptyRow = NList();
map<int, int> resultMap;
while(!nl->IsEmpty(rest))
{
ListExpr rowInfo = nl->First(rest);
int rowNum = nl->IntValue(nl->First(rowInfo));
int slaveIdx = nl->IntValue(nl->Second(rowInfo));
resultMap.insert(pair<int, int>(rowNum, slaveIdx));
rest = nl->Rest(rest);
}
//Sort according to the row number automatically
map<int,int>::iterator it;
for (it = resultMap.begin(); it != resultMap.end(); it++)
{
int rowNum = it->first;
int slaveIdx = it->second;
//Insert empty rows
for (lastRow++; lastRow < rowNum; lastRow++)
{
if (fileLocList.isEmpty())
{
fileLocList.makeHead(emptyRow);
}
else
{
fileLocList.append(emptyRow);
}
}
//Insert the current row
NList newRow = NList(NList(slaveIdx), NList(1).enclose(), NList(
CreateFilePath, true, true));
if (fileLocList.isEmpty())
{
fileLocList.makeHead(newRow);
}
else
{
fileLocList.append(newRow);
}
}
resultFList = new fList(CreateObjectName, NList(resultType), ci,
fileLocList, 1, true, kind);
}
}
else
{//executed == false
//Create an intermediate flist
NList UEMapQuery(
NList(CreateQueryList),
NList(dlfNameList, dlfLocList),
NList(dloNameList, dloLocList),
NList(mapTaskNum));
resultFList = new fList(CreateObjectName, NList(resultType),
ci, NList(), 1, false, kind, 0, 0, UEMapQuery);
}
}
result.setAddr(resultFList);
}
//TODO add the progress estimation in the future
// else if ( message == REQUESTPROGRESS )
// else if ( message == CLOSEPROGRESS )
return 0;
}
Operator hadoopMapOP(
hadoopMapInfo(), hadoopMapValueMap, hadoopMapTypeMap);
/*
4 Operator ~hadoopMap2~
Created in 7th Aug. 2014
Cheng \& Jiamin
This operator maps
----
flist(T1) x flist(T2) x [Name: string] x [Path: text] x [(DLO):DLF] x [Executed: bool]
x (map (T1 x T2 -> T3)) -> flist(T3)
----
Basically it invokes Map tasks in a Hadoop job,
each task reads the same row from the both sides that are also stored together.
The task is assigned to the data server where both rows are stored,
hence it can perform the task without retrieving remote data over the network.
The number of the map tasks is decided by the row number of the inputs.
In the type mapping, we need check:
* both side inputs have the same row number
* each row from both sides should be stored on the same slave server
This operator basically reads the input and set the parameters for the template Hadoop job,
then starts the job at last.
The template Hadoop is encapsulated in the ParallelSecondo package, in Java.
It contains both the Map and Reduce stage, while the UDF is set in the Map tasks only.
If this operation is set un-executed, then all the input parameters are then
encapsulated into the output un-executed flist, which will then be merged into
the up-coming Reduce operators, including both hadoopReduce and hadoopReduce2.
By default, the execute argument is set TRUE.
Since we want each task to read the inputs locally,
hence both flists should make sure that each row from both sides should stored together.
Therefore, both inputs should be generated by the ~spread~ operator only.
After any Reduce operations, it cannot be garanteed in such a way since we depend on
Hadoop to distribute the tasks and the output results.
The example query listed in the example query has been validated by Jiamin.
13 Jan, 2015
*/
struct hadoopMap2Info: OperatorInfo {
hadoopMap2Info() {
name = "hadoopMap2";
signature = "flist(T1) x flist(T2) x [Name:string] x [Path:text] "
"x [(DLO):DLF] x [Executed:bool] "
"x ( map (T1 T2) T3 ) -> flist(T3)";
meaning = "Create DLO or DLF kind flist after the map step";
}
};
ListExpr hadoopMap2TypeMap(ListExpr args) {
try {
NList l(args);
string lenErr = "ERROR! Operator hadoopMap expects 4 argument lists. ";
string typErr =
"ERROR! Operator hadoopMap expects "
"flist(T1) x flist(T2) x [string] x [text]"
"x [(DLO):DLF] x [int] x [bool] x (map (T1 T2) T3))";
string ifaErr = "ERROR! Infeasible evaluation in TM of attribute:";
string mtnErr = "ERROR! Expect a positive map task number. ";
string umnErr = "ERROR! It is not allowed to produce a DLO kind flist, "
"with a row number that is larger than the slave scale.";
string nprErr = "ERROR! Operator hadoopMap expects "
"creating a new DLO or DLF kind flist.";
string onmErr = "ERROR! Operator hadoopMap expects the created "
"object name starts with upper case. ";
string uafErr = "ERROR!! The internal function is unavailable.";
string hnmErr = "ERROR! Exists homonymous flist type file in: ";
string fwtErr = "ERROR! Failed writing type into file: ";
string expErr = "ERROR! Improper output type for DLF flist";
string udnErr = "ERROR! Long database name is set.";
string uewErr = "ERROR! An unexecuted flist must be DLF type.";
string objName, filePath, qStr;
fListKind kind = DLO;
bool executed = true;
//they are four arguments, i.e.
//@flist(T1)
//@flist(T2)
//@[Name:string] x [Path:text]x [(DLO):DLF] x [Executed:bool]
//@(map (T1 T2) T3 )
if (l.length() != 4)
return l.typeError(lenErr);
int argIndex = 0;
//The first two flist arguments, i.e. @flist(T1) @flist(T2)
NList flistType[2];
while (argIndex < 2) {
flistType[argIndex] = l.elem(argIndex + 1).first();
if (flistType[argIndex].isSymbol("typeerror")
|| flistType[argIndex].length() != 2) {
return l.typeError(typErr);
}
if (!flistType[argIndex].first().isSymbol(fList::BasicType())) {
return l.typeError(typErr);
}
if (!listutils::isRelDescription(
flistType[argIndex].second().listExpr())
&& !listutils::isTupleStream(
flistType[argIndex].second().listExpr())) {
return l.typeError(typErr);
}
argIndex++;
}
//Optional parameters,
//i.e. @[Name:string] x [Path:text] x [(DLO):DLF] x [Executed:bool]
NList OptinalList = l.third();
int numbersOfPara = OptinalList.first().length();
clusterInfo ci;
// size_t mapTaskNum = ci.getSlaveSize();
if (numbersOfPara > 0) {
if (numbersOfPara > 4)
return l.typeError(typErr);
for (int i = 1; i <= numbersOfPara; i++) {
NList paraType = OptinalList.first().elem(i);
NList paraValue = OptinalList.second().elem(i);
if (paraType.isSymbol(CcString::BasicType())) {
//ObjectName defined.
ListExpr fnList;
if (!QueryProcessor::GetNLArgValueInTM(paraValue.listExpr(),fnList)) {
return l.typeError(ifaErr + "objName");
}
objName = NList(fnList).str();
} else if (paraType.isSymbol(FText::BasicType())) {
//FilePath defined
ListExpr fpList;
if (!QueryProcessor::GetNLArgValueInTM(paraValue.listExpr(),fpList)) {
return l.typeError(ifaErr + "filePath");
}
filePath = NList(fpList).str();
} else if (paraType.isSymbol("DLF")) {
kind = DLF;
} else if (paraType.isSymbol("DLO")) {
kind = DLO;
} else if (paraType.isSymbol(CcBool::BasicType())) {
ListExpr etList;
if (!QueryProcessor::GetNLArgValueInTM(paraValue.listExpr(),etList)) {
return l.typeError(ifaErr + "executed");
}
executed = NList(etList).boolval();
} else {
return l.typeError(typErr);
}
}
}
if (!executed && (kind != DLF)) {
return l.typeError(uewErr);
}
//Encapsulated object type @(map (T1 T2) T3 )
NList fourthPara = l.fourth();
NList coType = fourthPara.first().fourth(); //Create Type
NList coQuery = fourthPara.second().fourth(); //Create Query
NList coBType; //Create Base Type
if (coType.isAtom()) {
coBType = coType;
} else {
coBType = coType.first();
}
if (coBType.isEqual("typeerror"))
return l.typeError(uafErr);
qStr = coQuery.convertToString();
//Query Parameter
string coParaName1 = fourthPara.second().second().first().str();
string coParaName2 = fourthPara.second().third().first().str();
//If it is a DLF, then the output type must be a tuple stream
if (kind == DLF && !listutils::isTupleStream(coType.listExpr())) {
return l.typeError(expErr);
}
NList resultType = NList(NList(fList::BasicType()), NList(coType));
if (objName.length() == 0)
objName = fList::tempName(false);
else {
char f = objName[0];
if (f < 'A' || f > 'Z') {
return l.typeError(onmErr);
}
//If the name of sub-objects or sub-files are denoted by users,
//Then it must be kept in a text type file,
//in case of the homonymous problem
//also this file must be kept in the default file path.
string filePath = getLocalFilePath("", (objName + "_type"), "",
true);
if (FileSystem::FileOrFolderExists(filePath)) {
ListExpr exeType;
bool ok = false;
if (nl->ReadFromFile(filePath, exeType)) {
//TODO Need to be more compatible with file-related operators
if (nl->Equal(nl->Second(exeType),
resultType.second().second().listExpr())) {
ok = true;
}
}
if (!ok)
return l.typeError(hnmErr + filePath);
} else {
ListExpr expList = nl->Second(resultType.listExpr());
if (nl->IsAtom(expList)) {
expList = nl->OneElemList(expList);
}
if (!nl->WriteToFile(filePath, expList)) {
return l.typeError(fwtErr + filePath);
}
}
}
//Check the length of the database name
string dbName = fList::tempName(true);
if (dbName.length() >= 16) {
//16 is the maximum length that a database name can be
return l.typeError(udnErr);
}
return NList(NList(Symbol::APPEND()),
NList(NList(qStr, true, true),
NList(objName, true, false),
NList((int) kind),
NList(coParaName1, true, false),
NList(coParaName2, true, false),
NList(dbName, true, false)), resultType).listExpr();
} catch (...) {
return listutils::typeError("invalid input");
}
}
int hadoopMap2ValueMap(Word* args, Word& result, int message, Word& local,
Supplier s) {
if (message <= CLOSE) {
//Appended parameters
//Query string
string CreateQuery = ((FText*) args[4].addr)->GetValue();
//Get the object name.
string CreateObjectName = ((CcString*) args[5].addr)->GetValue();
//Get the export file type
fListKind kind = (fListKind) ((CcInt*) args[6].addr)->GetValue();
//Get the object name of the input flist
string inParaName1 = ((CcString*) args[7].addr)->GetValue();
//Get the object name of the input flist
string inParaName2 = ((CcString*) args[8].addr)->GetValue();
//Get the database name
string dbName = ((CcString*) args[9].addr)->GetValue();
//Optional parameters
string CreateFilePath = "";
Supplier oppList = args[2].addr;
int opLen = qp->GetNoSons(oppList);
int mapTaskNum = -1;
bool executed = true;
for (int i = 0; i < opLen; i++) {
ListExpr pp = qp->GetType(qp->GetSupplierSon(oppList, i));
if (nl->IsEqual(pp, FText::BasicType())) {
CreateFilePath = ((FText*) qp->Request(
qp->GetSupplier(oppList, i)).addr)->GetValue();
} else if (nl->IsEqual(pp, CcBool::BasicType())) {
executed =((CcBool*)
qp->Request(qp->GetSupplier(oppList, i)).addr)->GetValue();
}
}
//Result type
clusterInfo *ci = new clusterInfo();
ListExpr resultType = qp->GetType(s);
fList* resultFList = new fList(CreateObjectName, NList(resultType), ci,
NList(), 1, false, kind);
fList* inputFList1 = (fList*) args[0].addr;
fList* inputFList2 = (fList*) args[1].addr;
NList localList1 = inputFList1->getLocList();
NList localList2 = inputFList2->getLocList();
int numOfNodes = localList1.length();
if (numOfNodes != (int)localList2.length()) {
cerr << "Two Flist haven't distributed on the same nodes" << endl;
return 0;
}
int index = numOfNodes ;
while (index-- > 0) {
if (localList1.elem(index + 1).first()
!= localList1.elem(index + 1).first()) {
cerr << "Row:"<< index + 1 <<
" from both input flists are not stored on the same data server\n";
return 0;
}
}
mapTaskNum = numOfNodes ;
int dupTimes = min(inputFList1->getDupTimes(),inputFList2->getDupTimes() );
//Parameters required by the Hadoop job are:
ListExpr CreateQueryList;
nl->ReadFromString(CreateQuery, CreateQueryList);
Para_FList para_flists;
para_flists.push_back(make_pair(inParaName1,inputFList1));
para_flists.push_back(make_pair(inParaName2,inputFList2));
vector<string> DLF_NameVec, DLF_fileLocVec;
vector<string> DLO_NameVec, DLO_locVec;
/*
Scan para operations in the internal function list.
If the ~para~ operator contains a DELIEVERABLE object,
then extract its value as a nested-list and embed inside the query.
Otherwise, replace the para operation with the flist name, then left it to the next function.
*/
bool ok = true;
CreateQueryList = replaceParaOp(CreateQueryList,para_flists , ok);
if (!ok) {
cerr << "Replace para operation fails" << endl;
return 0;
} else {
CreateQueryList = replaceSecObj(CreateQueryList, ok);
if (ok) {
//Replace parameter value according to their flist value
for (size_t i = 0; i < para_flists.size(); i++) {
int argIndex = ((i == 0 || i == 1) ? (i + 1) : 0);
CreateQueryList = replaceDLOF(CreateQueryList,
para_flists[i].first, para_flists[i].second, DLF_NameVec,
DLF_fileLocVec, DLO_NameVec, DLO_locVec, false,
ok, argIndex);
if (!ok) {
cerr << "Replace DLF flist fails" << endl;
break;
}
}
}
}
NList dlfNameList, dlfLocList;
NList dloNameList, dloLocList;
ListExpr sidList;
if (!ok) {
cerr << "Preparing Hadoop job parameters fails." << endl;
} else {
for (size_t i = 0; i < DLF_NameVec.size(); i++) {
dlfNameList.append(NList(DLF_NameVec[i], true, false));
ListExpr locList;
nl->ReadFromString(DLF_fileLocVec[i], locList);
dlfLocList.append(NList(locList));
}
for (size_t i = 0; i < DLO_NameVec.size(); i++) {
dloNameList.append(NList(DLO_NameVec[i], true, false));
ListExpr locList;
nl->ReadFromString(DLO_locVec[i], locList);
dloLocList.append(NList(locList));
}
if (executed) {
//Call the Hadoop job
stringstream queryStr;
queryStr << "hadoop jar ParallelSecondo.jar "
"ParallelSecondo.PS_HadoopMap2 \\\n" << dbName << " "
<< CreateObjectName << " " << " \""
<< tranStr(nl->ToString(CreateQueryList), "\"", "\\\"")
<< "\" \\\n" << " \""
<< tranStr(dlfNameList.convertToString(), "\"", "\\\"")
<< "\" \\\n" << " \""
<< tranStr(dlfLocList.convertToString(), "\"", "\\\"")
<< "\" \\\n" << " \""
<< tranStr(dloNameList.convertToString(), "\"", "\\\"")
<< "\" \\\n" << " \""
<< tranStr(dloLocList.convertToString(), "\"", "\\\"")
<< "\" \\\n" << dupTimes << " " << kind << " " << " \""
<< CreateFilePath << "\" " << mapTaskNum <<endl;
int rtn = -1;
cerr << queryStr.str() << endl;
//return 0;
rtn = system(queryStr.str().c_str());
cerr << "the process result is: " << rtn << endl;
ok = (rtn == 0);
if (ok) {
FILE *fs;
char buf[MAX_STRINGSIZE];
fs = popen("hadoop dfs -cat OUTPUT/part*", "r");
if (NULL != fs) {
stringstream ss;
while (!feof(fs) && fgets(buf, sizeof(buf), fs)) {
ss << buf;
}
string locListStr = ss.str();
locListStr = locListStr.substr(
locListStr.find_first_of(' '));
nl->ReadFromString(locListStr, sidList);
} else
ok = false;
}
if (ok) {
NList fileLocList;
if (kind == DLO)
CreateFilePath = dbLoc;
//Create file location list
ListExpr rest = sidList;
int lastRow = 0;
NList emptyRow = NList();
map<int, int> resultMap;
while (!nl->IsEmpty(rest)) {
ListExpr rowInfo = nl->First(rest);
int rowNum = nl->IntValue(nl->First(rowInfo));
int slaveIdx = nl->IntValue(nl->Second(rowInfo));
resultMap.insert(pair<int, int>(rowNum, slaveIdx));
rest = nl->Rest(rest);
}
//Sort according to the row number automatically
map<int, int>::iterator it;
for (it = resultMap.begin(); it != resultMap.end(); it++) {
int rowNum = it->first;
int slaveIdx = it->second;
//Insert empty rows
for (lastRow++; lastRow < rowNum; lastRow++) {
if (fileLocList.isEmpty()) {
fileLocList.makeHead(emptyRow);
} else {
fileLocList.append(emptyRow);
}
}
//Insert the current row
NList newRow = NList(NList(slaveIdx),
NList(1).enclose(),
NList(CreateFilePath, true, true));
if (fileLocList.isEmpty()) {
fileLocList.makeHead(newRow);
} else {
fileLocList.append(newRow);
}
}
resultFList = new fList(CreateObjectName, NList(resultType),
ci, fileLocList, 1, true, kind);
}
} else { //executed == false
//Create an intermediate flist
NList UEMapQuery(NList(CreateQueryList),
NList(dlfNameList, dlfLocList),
NList(dloNameList, dloLocList), NList(mapTaskNum));
resultFList = new fList(CreateObjectName, NList(resultType), ci,
NList(), 1, false, kind, 0, 0, UEMapQuery);
}
}
result.setAddr(resultFList);
}
//TODO add the progress estimation in the future
// else if ( message == REQUESTPROGRESS )
// else if ( message == CLOSEPROGRESS )
return 0;
}
Operator hadoopMap2OP(
hadoopMap2Info(), hadoopMap2ValueMap, hadoopMap2TypeMap);
/*
6 Operator ~hadoopMapAll~
Create at 10th Jul. 2013
Jiamin
This operator works very similar as the ~hadoopMap~,
except it doesn't ask for an input flist object.
Instead it starts a map task on every slave Data Server,
and process the argument function in it.
It would be quite difficult to re-use the codes in ~hadoopMap~ here,
therefore, it is better to completely create this operator,
although some codes can be copied.
The operator returns a stream of tuples,
telling in which DS the map function successes.
This relation can be used later in the ~createFList~ operator
to create the corresponding flist objects.
In the specification file, the function is not indicated as an argument function,
since there is no parameter given for it.
We can call it as sub-query.
All required parameters for the sub-query are set either by DELIVERABLE data,
or by other flist objects with the ~para~ operator.
This operator only processes the given function and returns the location relation.
Therefore, it doesn't accept any parameters for flist construction.
Besides, it works on all slave Data Servers, also it must be executed.
Therefore it only accepts the sub-query.
*/
struct hadoopMapAllInfo : OperatorInfo
{
hadoopMapAllInfo()
{
name = "hadoopMapAll";
signature =
"T -> stream(tuple(Row:int, DS:int, Column:int, Path:text))";
meaning = "Process the sub-query as map tasks on all slave DSs, "
"and return in which sDSs the task finishes.";
}
};
/*
5.1 Type Mapping
Its signature is:
----
T -> stream(tuple(Row:int, DS:int, Column:int, Path:text))
----
T should not be a stream, array, relation.
*/
ListExpr hadoopMapAllTypeMap(ListExpr args)
{
if (!nl->HasLength(args, 1)){
return listutils::typeError("Expected one argument only.");
}
ListExpr arg1 = nl->First(args);
ListExpr type = nl->First(arg1);
ListExpr subQuery = nl->Second(arg1);
if (!listutils::isDATA(type)){
return listutils::typeError("The sub-query must be in kind DATA");
}
NList rtnAttrList = NList(
NList(NList("Row", false, false), NList(CcInt::BasicType())),
NList(NList("DS", false, false), NList(CcInt::BasicType())),
NList(NList("Column", false, false), NList(CcInt::BasicType())),
NList(NList("Path", false, false), NList(FText::BasicType())),
NList(NList("Success", false, false), NList(CcBool::BasicType())),
NList(NList("Result", false, false), NList(type)));
NList locStreamType = NList().tupleStreamOf(rtnAttrList);
string dbName = fList::tempName(true);
return NList(NList(Symbol::APPEND()),
NList(NList(nl->ToString(subQuery), true, true),
NList(dbName, true, false)),
locStreamType).listExpr();
}
/*
5.2 Value Mapping
*/
int hadoopMapAllValueMap(Word* args, Word& result,
int message, Word& local, Supplier s){
HadoopMapAllLocalInfo* hmaLI;
switch(message)
{
case OPEN: {
//set the result locations
hmaLI = (HadoopMapAllLocalInfo*)local.addr;
if (hmaLI) delete hmaLI;
hmaLI = new HadoopMapAllLocalInfo();
Word res;
qp->Request(args[1].addr, res);
string subQuery = ((FText*)res.addr)->GetValue();
cerr << "The task query is: " << subQuery << endl;
qp->Request(args[2].addr, res);
string dbName = ((CcString*)res.addr)->GetValue();
//execute the hadoop job
ListExpr subQueryList;
nl->ReadFromString(subQuery,subQueryList);
//Scan the subQuery to find out all possible flist objects.
vector<string> flistParaList;
vector<fList*> flistObjList;
vector<string> DLF_NameList, DLF_fileLocList;
vector<string> DLO_NameList, DLO_locList;
bool ok = true;
subQueryList = replaceParaOp(
subQueryList, flistParaList, flistObjList, ok);
if (!ok){
cerr << "Replacing flist objects fail. " << endl;
return 0;
}
else {
subQueryList = replaceSecObj(subQueryList, ok);
if (ok)
{
for (size_t i = 0; i < flistParaList.size(); i++)
{
//This operator doesn't accept input flist.
subQueryList = replaceDLOF(subQueryList,
flistParaList[i], flistObjList[i],
DLF_NameList, DLF_fileLocList,
DLO_NameList, DLO_locList, false, ok);
if (!ok){
cerr << "Replacing DLF flist objects fail. " << endl;
return 0;
}
}
}
}
//Prepare the parameters for the Hadoop job
NList dlfNameList, dlfLocList;
NList dloNameList, dloLocList;
ListExpr sidList;
if (!ok){
cerr << "Preparing Hadoop job parameters fails" << endl;
return CANCEL;
}
else
{
for (size_t i = 0; i < DLF_NameList.size(); i++)
{
dlfNameList.append(NList(DLF_NameList[i], true, false));
ListExpr locList;
nl->ReadFromString(DLF_fileLocList[i], locList);
dlfLocList.append(NList(locList));
}
for (size_t i = 0; i < DLO_NameList.size(); i++)
{
dloNameList.append(NList(DLO_NameList[i], true, false));
ListExpr locList;
nl->ReadFromString(DLO_locList[i], locList);
dloLocList.append(NList(locList));
}
stringstream queryStr;
queryStr << "hadoop jar ParallelSecondo.jar "
"ParallelSecondo.PS_HadoopMapAll \\\n"
<< dbName << " "
<< " \"" << tranStr(nl->ToString(subQueryList),
"\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(dlfNameList.convertToString(),
"\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(dlfLocList.convertToString(),
"\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(dloNameList.convertToString(),
"\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(dloLocList.convertToString(),
"\"", "\\\"") << "\" " << endl;
int rtn = -1;
cout << queryStr.str() << endl;
rtn = system(queryStr.str().c_str());
ok = (rtn == 0);
if (ok)
{
FILE *fs;
char buf[MAX_STRINGSIZE];
fs = popen("hadoop dfs -cat OUTPUT/part*", "r");
if ( NULL != fs )
{
stringstream ss;
while(!feof(fs) && fgets(buf, sizeof(buf), fs))
{
ss << buf ;
}
string locListStr = ss.str();
locListStr = locListStr.substr(locListStr.find_first_of(' '));
nl->ReadFromString(locListStr, sidList);
}
else
{
cerr << "Read Hadoop job results fails" << endl;
ok = false;
}
}
else
{
cerr << "The Hadoop job fails" << endl;
}
if (ok)
{
ListExpr rest = sidList;
while(!nl->IsEmpty(rest))
{
ListExpr row = nl->First(rest);
int rowNum = nl->IntValue(nl->First(row));
int slaveIdx = nl->IntValue(nl->Second(row));
bool succ = nl->BoolValue(nl->Third(row));
hmaLI->addLoc(rowNum, slaveIdx, succ, nl->Fourth(row));
rest = nl->Rest(rest);
}
}
}
hmaLI->makeScan();
local.setAddr(hmaLI);
return 0;
}
case REQUEST: {
if (!local.addr)
return CANCEL;
else
hmaLI = (HadoopMapAllLocalInfo*)local.addr;
if (!hmaLI->isEnd())
{
HMA_taskResult loc = hmaLI->getItem();
ListExpr resultTupleList = GetTupleResultType(s);
TupleType *resultType = new TupleType(nl->Second(resultTupleList));
Tuple* resultTuple = new Tuple(resultType);
resultTuple->PutAttribute(0, new CcInt(loc.row));
resultTuple->PutAttribute(1, new CcInt(loc.slave));
resultTuple->PutAttribute(2, new CcInt(1));
resultTuple->PutAttribute(3, new FText(true, ""));
resultTuple->PutAttribute(4, new CcBool(true, loc.succ));
int algebraId, typeId;
ListExpr errorInfo;
bool correct;
algebraId = resultType->GetAttributeType(5).algId;
typeId = resultType->GetAttributeType(5).typeId;
Word qrAttr = (am->InObj(algebraId, typeId)
( nl->TwoElemList(nl->IntAtom(algebraId), nl->IntAtom(typeId)),
loc.result,
5, /* attrbute no*/
errorInfo,
correct));
if (correct)
{
resultTuple->PutAttribute(5, (Attribute*)qrAttr.addr);
result.setAddr(resultTuple);
return YIELD;
}
else
{
cerr << "Error!! The subQuery result is wrong: "
<< nl->ToString(loc.result) << endl;
resultTuple->DeleteIfAllowed();
result.setAddr(0);
return CANCEL;
}
}
return CANCEL;
}
case CLOSE: {
hmaLI = static_cast<HadoopMapAllLocalInfo*>(local.addr);
if (hmaLI)
delete hmaLI;
local.addr = 0;
return 0;
}
// case REQUESTPROGRESS:{}
// case CLOSEPROGRESS:{}
}
//TODO add the progress estimation in the future
// else if ( message == REQUESTPROGRESS )
// else if ( message == CLOSEPROGRESS )
return 0;
}
Operator hadoopMapAllOP(
hadoopMapAllInfo(), hadoopMapAllValueMap, hadoopMapAllTypeMap);
/*
4 Operator ~hadoopMap2~
Created in 7th Aug. 2014
Cheng \& Jiamin
This operator maps
----
flist(T1) x flist(T2) x [Name: string] x [Path: text] x [(DLO):DLF] x [Execute: bool]
x (map (T1 x T2 -> T3)) -> flist(T3)
----
Basically it invokes Map tasks in a Hadoop job,
each task reads the same row from the both sides that are also stored together.
The task is assigned to the data server where both rows are stored,
hence it can perform the task without retrieving remote data over the network.
The number of the map tasks is decided by the row number of the inputs.
In the type mapping, we need check:
* both side inputs have the same row number
* each row from both sides should be stored on the same slave server
This operator basically reads the input and set the parameters for the template Hadoop job,
then starts the job at last.
The template Hadoop is encapsulated in the ParallelSecondo package, in Java.
It contains both the Map and Reduce stage, while the UDF is set in the Map tasks only.
If this operation is set un-executed, then all the input parameters are then
encapsulated into the output un-executed flist, which will then be mergered into
the up-coming Reduce operators, including both hadoopReduce and hadoopReduce2.
By default, the execute argument is set TRUE.
Since we want each task to read the inputs locally,
hence both flists should make sure that each row from both sides should stored together.
Therefore, both inputs should be generated by the ~spread~ operator only.
After any Reduce operations, it cannot be garanteed in such a way since we depend on
Hadoop to distribute the tasks and the output results.
*/
/*
5 Operator ~hadoopReduce~
Create at 11th Apr. 2012
Jiamin
As the hadoop operator needs us to change the QueryProcessor, which
may takes a while.
I started to create another new operator, named ~hadoopReduce~.
This is mainly used to process the reduce step of the precast Hadoop job.
It also creates both DLO and DLF kind flist.
This one provides the unary operation, while the binary operator will be
implemented later.
Update 22th Aug. 2012 by Jiamin
Make it possible to accept intermediate flist object as the input parameter.
All flist objects used in its argument functions cannot be in intermediate
status.
*/
struct hadoopReduceInfo : OperatorInfo
{
hadoopReduceInfo()
{
name = "hadoopReduce";
signature =
"flist(T) x partAttr x [Name:string] x [Path:text] "
"x [DLO:(DLF)] x [ReduceTaskNum:int] "
"x [M1:(M2:M3)]"
"x ( map T T1 ) -> flist(T1)";
meaning = "Create DLO or DLF kind flist after the reduce step";
}
};
/*
5.1 Type Mapping of ~hadoopReduce~
For unary operation, the mapping is:
----
flist(T) x partAttribute
x [string] x [text] x [(DLO):DLF] x [int]
x ( map ( T -> T1) )
\to flist(T1)
T is rel(tuple) or stream(tuple)
----
Comparing with the ~hadoopMap~ operator, an additional argument partAttribute
is required to redistribute the input flist over the cluster.
Therefore, the input flist must be a DLF kind, or a DLO kind contains relation
type.
The syntax is:
----
flist(T) x partAttribute
x [objectName: string] x [objectPath: text] x [(DLO):DLF] x
[reduceTaskNum: int]
x ( interFunc: map ( T \to T1) )
\to flist(T1)
T is rel(tuple) or stream(tuple)
----
The reduceTaskNum is used to denote the scale of the parallelism.
The number of map tasks is denoted by the input flist.
If the operator creates a DLO kind flist, then the reduceTaskNum
cannot be larger than the amount of data servers,
since one data server can only keep at most one sub-object of a DLO flist.
However, if the created flist belong to DLF kind, then the reduceTaskNum can be
set to arbitrary values.
It is an optional parameter, with the default value as the cluster scale.
Update 8th Jan. 2014
Jiamin Lu
Add one more optional Mode parameter to this operator with three values:
M1, M2 and M3.
They identify the three modes used in the PSFS operators to shuffle data
between the Map and Reduce stages.
In M1 and M2, map result are distributed with ~fdistribute~ operator,
exporting both tuple and Flob data into the same data file.
Consequently, the reduce tasks read the data with ~pffeed~ and ~pffeed2~ operators.
In M2, only the tuples are read while the Flob data are left in the copied data files,
in order to reduce the unnecessary disk IO overhead.
In M3, data are exported with the ~fdistribute3~ operator, then read by the ~pffeed3~ operator.
In this case, tuple and Flob data are stored in separated files, and only the tuple files
are shuffled before the Reduce stage. Flob data are fetched only when they are really needed.
The default value for this operator is M1. Now this operator maps:
----
flist(T) x partAttribute
x [objectName: string] x [objectPath: text]
x [(DLO):DLF] x [reduceTaskNum: int]
x [M1:(M2:M3)]
x ( interFunc: map ( T \to T1) )
\to flist(T1)
T is rel(tuple) or stream(tuple)
----
*/
ListExpr hadoopReduceTypeMap(ListExpr args){
try{
NList l(args);
string lenErr = "ERROR! Operator hadoopReduce expects 3 argument lists. ";
string typErr = "ERROR! Operator hadoopReduce expects "
"flist(T) x partAttr x [string] x [text] "
"x [DLO:(DLF)] x [int] x [M1:(M2:M3))] x (map T T1))";
string ifsErr = "ERROR! Operator hadoopReduce expects the input flist "
"contains either a tuple stream or a tuple relation. ";
string upaErr = "ERROR! The partition attribute "
"is not found in the input flist";
string ernErr = "ERROR! The reduceTaskNum cannot be larger than the"
" cluster scale while producing a DLO flist object.";
string onmErr = "ERROR! Operator hadoopMap expects the created "
"object name starts with upper case. ";
string ifaErr = "ERROR! Infeasible evaluation in TM of attribute:";
string nprErr = "ERROR! Operator hadoopReduce expects "
"creating a new DLO or DLF kind flist.";
string uafErr = "ERROR!! The internal function is unavailable.";
string expErr = "ERROR! Improper output type for DLF flist";
string fwtErr = "ERROR! Failed writing type into file: ";
string hnmErr = "ERROR! Exists homonymous flist type file in: ";
string udnErr = "ERROR! Long database name is set.";
string objName, filePath, qStr;
fListKind kind = DLO;
int psfsMode = 1; //Its value can only be 1, 2 or 3.
clusterInfo ci;
int reduceTaskNum = ci.getSlaveSize();
if (l.length() != 3)
return l.typeError(lenErr);
//The first flist type data.
NList inputType = l.first().first();
if (inputType.isSymbol("typeerror") || inputType.length() != 2 ){
return l.typeError(typErr);
}
if (!inputType.first().isSymbol(fList::BasicType())){
return l.typeError(typErr);
}
if (!listutils::isRelDescription(inputType.second().listExpr()) &&
!listutils::isTupleStream(inputType.second().listExpr())){
return l.typeError(ifsErr);
}
//The second list: partAttribute x [string] x [text] x [(DLO):DLF]
int len = l.second().first().length();
string PAName = "";
int PAIndex = 0;
ListExpr PAType;
if (len < 1 || len > 5 )
return l.typeError(typErr);
else
{
//The first argument in this list must be the partition attribute name
NList pType = l.second().first();
NList pValue = l.second().second();
PAName = pType.first().str();
ListExpr AttrList = inputType.second().second().second().listExpr();
PAIndex = listutils::findAttribute(AttrList, PAName, PAType);
if (PAIndex <= 0)
return l.typeError(upaErr);
if (len > 1)
{
for (int i = 2; i <= len; i++)
{
NList pp = pType.elem(i);
NList pv = pValue.elem(i);
if (pp.isSymbol(CcString::BasicType()))
{
//ObjectName defined.
ListExpr fnList;
if (!QueryProcessor::GetNLArgValueInTM(pv.listExpr(), fnList)){
return l.typeError(ifaErr + "objName");
}
objName = NList(fnList).str();
}
else if (pp.isSymbol(FText::BasicType()))
{
//FilePath defined
ListExpr fpList;
if (!QueryProcessor::GetNLArgValueInTM(pv.listExpr(), fpList)){
return l.typeError(ifaErr + "filePath");
}
filePath = NList(fpList).str();
}
else if (pp.isSymbol(CcInt::BasicType())){
ListExpr rnList;
if (!QueryProcessor::GetNLArgValueInTM(pv.listExpr(), rnList)){
return l.typeError(ifaErr + "objName");
}
reduceTaskNum = NList(rnList).intval();
}
else if (pp.isSymbol("DLF")){
kind = DLF;
}
else if (pp.isSymbol("DLO")){
kind = DLO;
}
else if (pp.isSymbol("M1")){
psfsMode = 1;
}
else if (pp.isSymbol("M2")){
psfsMode = 2;
}
else if (pp.isSymbol("M3")){
psfsMode = 3;
}
else{
return l.typeError(nprErr);
}
}
}
}
if (kind == DLO && reduceTaskNum > (int)ci.getSlaveSize()){
return l.typeError(ernErr);
}
//The third map list
NList mapType = l.third().first();
NList mapValue = l.third().second();
NList coType = mapType.third(); //Create Type
NList coQuery = mapValue.third(); //Create Query
NList coBType; //Create Base Type
if (coType.isAtom()){
coBType = coType;
}
else{
coBType = coType.first();
}
if (coBType.isEqual("typeerror"))
return l.typeError(uafErr);
qStr = coQuery.convertToString();
//Query Parameter
string coParaName = mapValue.second().first().str();
//Check the type for output flist
//If it is a DLF, then the output type must be a tuple stream
if (kind == DLF && !listutils::isTupleStream(coType.listExpr())){
return l.typeError(expErr);
}
NList resultType = NList(NList(fList::BasicType()), NList(coType));
if (objName.length() == 0)
objName = fList::tempName(false);
else
{
char f = objName[0];
if (f<'A' || f>'Z'){
return l.typeError(onmErr);
}
//If the name of sub-objects or sub-files are denoted by users,
//Then it must be kept in a text type file,
//in case of the homonymous problem
//also this file must be kept in the default file path.
string filePath =
getLocalFilePath("", (objName + "_type"),"", true);
if (FileSystem::FileOrFolderExists(filePath)){
ListExpr exeType;
bool ok = false;
if (nl->ReadFromFile(filePath, exeType)){
//TODO Need to be more compatible with file-related operators
if (nl->Equal(nl->Second(exeType),
resultType.second().second().listExpr())){
ok = true;
}
}
if (!ok)
return l.typeError(hnmErr + filePath);
}
else{
ListExpr expList = nl->Second(resultType.listExpr());
if (nl->IsAtom(expList)){
expList = nl->OneElemList(expList);
}
if (!nl->WriteToFile(filePath, expList)){
return l.typeError(fwtErr + filePath);
}
}
}
//Check the length of the database name
string dbName = fList::tempName(true);
if (dbName.length() >= 16){
//16 is the maximum length that a database name can be
return l.typeError(udnErr);
}
return NList(NList(Symbol::APPEND()),
NList(NList(qStr, true, true),
NList(objName, true, false),
NList((int)kind),
NList(coParaName, true, false),
NList(dbName, true, false),
NList(psfsMode)),
resultType).listExpr();
} catch(...){
return listutils::typeError("invalid input");
}
}
int hadoopReduceValueMap(Word* args, Word& result,
int message, Word& local, Supplier s){
if ( message <= CLOSE )
{
//Appended parameters
//Query string
string CreateQuery =
((FText*)args[3].addr)->GetValue();
//Get the object name.
string CreateObjectName =
((CcString*)args[4].addr)->GetValue();
//Get the export file type
fListKind kind =
(fListKind)((CcInt*)args[5].addr)->GetValue();
//Get the object name of the input flist
string inParaName =
((CcString*)args[6].addr)->GetValue();
//Get the database name
string dbName =
((CcString*)args[7].addr)->GetValue();
//Get the PSFS mode
int psfsMode =
((CcInt*)args[8].addr)->GetValue();
clusterInfo *ci = new clusterInfo();
//Get the partition attribute name, since the partition is done by mappers
Supplier oppList = args[1].addr;
string PAName =
nl->SymbolValue(qp->GetType(qp->GetSupplierSon(oppList, 0)));
//Optional parameters
string CreateFilePath = "";
int reduceTaskNum = ci->getSlaveSize();
int opLen = qp->GetNoSons(oppList);
for (int i = 1; i < opLen; i++)
{
//Only the file path need to be get
ListExpr pp = qp->GetType(qp->GetSupplierSon(oppList,i));
if (nl->IsEqual(pp, FText::BasicType())){
CreateFilePath = ((FText*)
qp->Request(qp->GetSupplier(oppList,i)).addr)->GetValue();
}
else if(nl->IsEqual(pp, CcInt::BasicType())){
reduceTaskNum = ((CcInt*)
qp->Request(qp->GetSupplier(oppList,i)).addr)->GetValue();
}
}
//Result type
ListExpr resultType = qp->GetType(s);
fList* resultFList = new fList(CreateObjectName, NList(resultType),
ci, NList(), 1, false, kind);
fList* inputFList = (fList*)args[0].addr;
int dupTimes = inputFList->getDupTimes();
//Prepare the reduce query
ListExpr CreateQueryList;
nl->ReadFromString(CreateQuery, CreateQueryList);
vector<string> flistParaList;
vector<fList*> flistObjList;
flistParaList.push_back(inParaName); //The input parameter
flistObjList.push_back(inputFList);
vector<string> DLF_NameList, DLF_fileLocList;
vector<string> DLO_NameList, DLO_locList;
bool ok = true;
CreateQueryList = replaceParaOp(CreateQueryList, flistParaList,
flistObjList, ok);
if (!ok){
cerr << "Replace para operation fails" << endl;
}
else{
//Replace parameter value according to their flist value
CreateQueryList = replaceSecObj(CreateQueryList, ok);
if (ok)
{
for (size_t i = 0; i < flistParaList.size(); i++)
{
int argIndex = (i == 0 ? 1 : 0);
CreateQueryList =
replaceDLOF(CreateQueryList,
flistParaList[i], flistObjList[i],
DLF_NameList, DLF_fileLocList,
DLO_NameList, DLO_locList, true, ok, argIndex);
if (!ok)
break;
}
}
}
NList dlfNameList, dlfLocList;
NList dloNameList, dloLocList;
vector<pair<int, pair<int, int> > > hpResult;
if (!ok){
cerr << "Reading flist data fails." << endl;
}
else
{
for (size_t i = 0; i < DLF_NameList.size(); i++)
{
dlfNameList.append(NList(DLF_NameList[i], true, false));
ListExpr locList;
nl->ReadFromString(DLF_fileLocList[i], locList);
dlfLocList.append(NList(locList));
}
for (size_t i = 0; i < DLO_NameList.size(); i++)
{
dloNameList.append(NList(DLO_NameList[i], true, false));
ListExpr locList;
nl->ReadFromString(DLO_locList[i], locList);
dloLocList.append(NList(locList));
}
//The recognition of the function argument
string inputRcg = "";
if (inputFList->getKind() == DLO)
inputRcg = inputFList->getSubName();
else
inputRcg = "<DLFMark:Arg1:" + inputFList->getSubName() + "/>";
bool mapStage = false;
//Name and Location lists for DLF and DLO flist in the map stage
NList mq, mfn, mfl, mon, mol;
int mapTaskNum;
if (!inputFList->getUEMapQuery().isEmpty()){
mapStage = true;
mq = inputFList->getUEMapQuery().first();
mfn = inputFList->getUEMapQuery().second().first();
mfl = inputFList->getUEMapQuery().second().second();
mon = inputFList->getUEMapQuery().third().first();
mol = inputFList->getUEMapQuery().third().second();
mapTaskNum = inputFList->getUEMapQuery().fourth().intval();
}
//Call the Hadoop job
stringstream queryStr;
queryStr
<< "hadoop jar ParallelSecondo.jar ParallelSecondo.PS_HadoopReduce \\\n"
<< dbName << " " << CreateObjectName << " "
<< " \"" << tranStr(nl->ToString(CreateQueryList),"\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(dlfNameList.convertToString(),"\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(dlfLocList.convertToString(), "\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(dloNameList.convertToString(),"\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(dloLocList.convertToString(), "\"", "\\\"") << "\" \\\n"
<< dupTimes << " " << kind << " " << " \"" << CreateFilePath << "\" "
<< " \"" << inputRcg << "\" ";
if (mapStage)
{
//Describe the map tasks
queryStr
<< " \"" << tranStr(mq.convertToString(), "\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(mfn.convertToString(), "\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(mfl.convertToString(), "\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(mon.convertToString(), "\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(mol.convertToString(), "\"", "\\\"") << "\" \\\n"
<< mapTaskNum << " ";
}
//The input flist sub-object, which will be partitioned in the map step
queryStr << " \"" << PAName << "\" "<< reduceTaskNum
<< " " << psfsMode
<< endl;
int rtn = -1;
// cout << queryStr.str() << endl;
rtn = system(queryStr.str().c_str());
ok = (rtn == 0);
if (ok)
{
FILE *fs;
char buf[MAX_STRINGSIZE];
fs = popen("hadoop dfs -cat OUTPUT/part*", "r");
if (NULL != fs)
{
while(fgets(buf, sizeof(buf), fs))
{
stringstream ss;
ss << buf;
stringstream iss(ss.str());
int rowIndex, columnIndex, slaveIndex;
iss >> rowIndex >> columnIndex >> slaveIndex;
hpResult.push_back(pair<int, pair<int, int> >
(rowIndex, pair<int, int>(columnIndex, slaveIndex)));
}
}
else
ok = false;
pclose(fs);
}
}
if (ok)
{
NList fileLocList;
if (kind == DLO)
CreateFilePath = dbLoc;
//Create file location list
vector<pair<int, pair<int, int> > >::iterator rit;
int exRowIndex = 1; //expected row number
for ( rit = hpResult.begin(); rit != hpResult.end(); rit++)
{
int rowIndex = rit->first;
for (; exRowIndex <= rowIndex; exRowIndex++)
{
NList newRow = NList();
if (rowIndex == exRowIndex){
int columnIndex = rit->second.first;
int slaveIndex = rit->second.second;
newRow = NList(NList(slaveIndex), NList(columnIndex).enclose(),
NList(CreateFilePath, true, true));
}
if (fileLocList.isEmpty()){
fileLocList.makeHead(newRow);
}
else{
fileLocList.append(newRow);
}
}
}
resultFList = new fList(CreateObjectName, NList(resultType),
ci, fileLocList,
1, //dup time
true, //is distributed
kind); //kind
}
result.setAddr(resultFList);
}
//TODO add the progress estimation in the future
// else if ( message == REQUESTPROGRESS )
// else if ( message == CLOSEPROGRESS )
return 0;
}
Operator hadoopReduceOP(
hadoopReduceInfo(), hadoopReduceValueMap, hadoopReduceTypeMap);
/*
5 Operator ~hadoopReduce2~
Create at 18th Apr. 2012
Jiamin
This is also mainly used to process the reduce step of the precast Hadoop job.
It provides the binary operation, and creates a flist belong to either
DLO or DLF kind.
*/
struct hadoopReduce2Info : OperatorInfo
{
hadoopReduce2Info()
{
name = "hadoopReduce2";
signature =
"flist(T1) x flist(T2) x partAttr1 x partAttr2 "
"x [Name:string] x [Path:text] x [DLO:(DLF)] "
"x [ReduceTaskNum:int] x [isHDJ:bool] "
"x [M1:(M2:M3)]"
"x ( map (T1 T2) T3 ) -> flist(T3)";
meaning = "Create DLO or DLF kind flist after the binary reduce step";
}
};
/*
5.1 Type Mapping of ~hadoopReduce2~
For unary operation, the mapping is:
----
flist(T1) x flist(T2)
x partAttr1 x partAttr2
x [string] x [text] x [(DLO):DLF] x [int]
x ( map (T1 T2) T3 )
\to flist(T3)
T1 and T2 are either rel(tuple) or stream(tuple)
----
Similar as the above unary hadoopReduce operator, this one expects two
input flists, also two partition attributes, each one corresponding to
one input flist, respectively.
The syntax is:
----
flist(T1) x flist(T2)
x partAttr1 x partAttr2
x [objectName: string] x [objectPath: text] x [(DLO):DLF]
x [reduceTaskNum: int]
x ( interFunc: map ( T1 x T2 \to T1) )
\to flist(T3)
T is rel(tuple) or stream(tuple)
----
Update at 11th Sept. 2012.
Allow hadoopReduce2 to describe Hadoop-based parallel queries,
intermediate results are shuffled with HDFS.
Here no DLO flist is allowed being used in the argument function.
It maps:
----
flist(T1) x flist(T2)
x partAttr1 x partAttr2
x [objectName: string] x [objectPath: text] x [(DLO):DLF]
x [reduceTaskNum: int] x [isHDJ: bool]
x ( interFunc: map ( T1 x T2 \to T3) )
\to flist(T3)
T is rel(tuple) or stream(tuple)
----
By default, the ~isHDJ~ parameter is false.
Update 8th Jan. 2014
Jiamin Lu
Add one more optional Mode parameter to this operator with three values:
M1, M2 and M3, as being described in the ~hadoopReduce~ operator.
The default value for this operator is M1.
Now this operator maps:
----
flist(T1) x flist(T2)
x partAttr1 x partAttr2
x [objectName: string] x [objectPath: text] x [DLO:(DLF)]
x [reduceTaskNum: int] x [isHDJ: bool]
x [M1:(M2:M3)]
x ( interFunc: map ( T1 x T2 \to T3) )
\to flist(T1)
T is rel(tuple) or stream(tuple)
----
*/
ListExpr hadoopReduce2TypeMap(ListExpr args){
try{
NList l(args);
string lenErr = "ERROR! Operator hadoopReduce expects 4 argument lists. ";
string typErr = "ERROR! Operator hadoopReduce expects "
"flist(T1) x flist(T2) "
"x partAttr1 x partAttr2 x [string] x [text] x [(DLO):DLF] "
"x [int] x [bool] x [M1:(M2:M3)] x ( map (T1 T2) T3 ) -> flist(T3)";
string ifsErr = "ERROR! Operator hadoopReduce expects the input flist "
"contains either a tuple stream or a tuple relation. ";
string upaErr = "ERROR! The partition attribute "
"is not found in the corresponding input flist";
string ifaErr = "ERROR! Infeasible evaluation in TM of attribute:";
string nprErr = "ERROR! Operator hadoopReduce expects "
"creating a new DLO or DLF kind flist.";
string ernErr = "ERROR! The reduceTaskNum cannot be larger than the"
" cluster scale while producing a DLO flist object.";
string onmErr = "ERROR! Operator hadoopMap expects the created "
"object name starts with upper case. ";
string uafErr = "ERROR!! The internal function is unavailable.";
string expErr = "ERROR! Improper output type for DLF flist";
string fwtErr = "ERROR! Failed writing type into file: ";
string hnmErr = "ERROR! Exists homonymous flist type file in: ";
string udnErr = "ERROR! Long database name is set.";
string objName, filePath, qStr;
fListKind kind = DLO;
int psfsMode = 1;
clusterInfo ci;
int reduceTaskNum = ci.getSlaveSize();
int argIndex = 1;
if (l.length() != 4)
return l.typeError(lenErr);
//The first two flist arguments
NList inputType[2];
for (; argIndex <= 2; argIndex++){
int lc = argIndex - 1;
inputType[lc] = l.elem(argIndex).first();
if (inputType[lc].isSymbol("typeerror") || inputType[lc].length() != 2 ){
return l.typeError(typErr);
}
if (!inputType[lc].first().isSymbol(fList::BasicType())){
return l.typeError(typErr);
}
if (!listutils::isRelDescription(inputType[lc].second().listExpr()) &&
!listutils::isTupleStream(inputType[lc].second().listExpr())){
return l.typeError(ifsErr);
}
}
//The third argument list
NList pType = l.elem(argIndex).first();
NList pValue = l.elem(argIndex).second();
int len = pType.length();
if (len < 2 || len > 6)
return l.typeError(typErr);
string PAName[2] = { pType.first().str(), pType.second().str()};
int PAIndex[2]= {0,0};
ListExpr PAType[2];
bool isHDJ = false;
for (int inc = 0; inc < 2; inc++){
ListExpr attrList = inputType[inc].second().second().second().listExpr();
PAIndex[inc] = listutils::findAttribute(attrList,
PAName[inc], PAType[inc]);
if (PAIndex[inc] <= 0)
return l.typeError(upaErr);
}
for (int oac = 3; oac <= len; oac++)
{
//Check other optional arguments in this list
NList pp = pType.elem(oac);
NList pv = pValue.elem(oac);
if (pp.isSymbol(CcString::BasicType()))
{
//ObjectName defined.
ListExpr fnList;
if (!QueryProcessor::GetNLArgValueInTM(pv.listExpr(), fnList)){
return l.typeError(ifaErr + "objName");
}
objName = NList(fnList).str();
}
else if (pp.isSymbol(FText::BasicType()))
{
//FilePath defined
ListExpr fpList;
if (!QueryProcessor::GetNLArgValueInTM(pv.listExpr(), fpList)){
return l.typeError(ifaErr + "filePath");
}
filePath = NList(fpList).str();
}
else if (pp.isSymbol(CcInt::BasicType())){
ListExpr rnList;
if (!QueryProcessor::GetNLArgValueInTM(pv.listExpr(), rnList)){
return l.typeError(ifaErr + "objName");
}
reduceTaskNum = NList(rnList).intval();
}
else if (pp.isSymbol(CcBool::BasicType())){
ListExpr ihList;
if (!QueryProcessor::GetNLArgValueInTM(pv.listExpr(), ihList)){
return l.typeError(ifaErr + "isHDJ");
}
isHDJ = NList(ihList).boolval();
}
else if (pp.isSymbol("DLF")){
kind = DLF;
}
else if (pp.isSymbol("DLO")){
kind = DLO;
}
else if (pp.isSymbol("M1")){
psfsMode = 1;
}
else if (pp.isSymbol("M2")){
psfsMode = 2;
}
else if (pp.isSymbol("M3")){
psfsMode = 3;
}
else{
return l.typeError(nprErr);
}
}
if (kind == DLO && reduceTaskNum > (int)ci.getSlaveSize()){
return l.typeError(ernErr);
}
argIndex++;
//The fourth inter-query function
NList mapType = l.elem(argIndex).first();
NList mapValue = l.elem(argIndex).second();
if (mapType.length() != 4)
return l.typeError(uafErr);
NList coType = mapType.fourth(); //Create Type
NList coQuery = mapValue.fourth(); //Create Query
NList coBType; //Create Base Type
if (coType.isAtom()){
coBType = coType;
}
else{
coBType = coType.first();
}
if (coBType.isEqual("typeerror"))
return l.typeError(uafErr);
qStr = coQuery.convertToString();
//Query Parameter
string coParaName[2] = {
mapValue.second().first().str(),
mapValue.third().first().str()};
//Check the type for output flist
//If it is a DLF, then the output type must be a tuple stream
if (kind == DLF && !listutils::isTupleStream(coType.listExpr())){
return l.typeError(expErr);
}
NList resultType = NList(NList(fList::BasicType()), NList(coType));
if (objName.length() == 0)
objName = fList::tempName(false);
else
{
char f = objName[0];
if (f<'A' || f>'Z'){
return l.typeError(onmErr);
}
//If the name of sub-objects or sub-files are denoted by users,
//Then it must be kept in a text type file,
//in case of the homonymous problem
//also this file must be kept in the default file path.
string filePath =
getLocalFilePath("", (objName + "_type"),"", true);
if (FileSystem::FileOrFolderExists(filePath)){
ListExpr exeType;
bool ok = false;
if (nl->ReadFromFile(filePath, exeType)){
//TODO Need to be more compatible with file-related operators
if (nl->Equal(nl->Second(exeType),
resultType.second().second().listExpr())){
ok = true;
}
}
if (!ok)
return l.typeError(hnmErr + filePath);
}
else{
ListExpr expList = nl->Second(resultType.listExpr());
if (nl->IsAtom(expList)){
expList = nl->OneElemList(expList);
}
if (!nl->WriteToFile(filePath, expList)){
return l.typeError(fwtErr + filePath);
}
}
}
//Check the length of the database name
string dbName = fList::tempName(true);
if (dbName.length() >= 16){
//16 is the maximum length that a database name can be
return l.typeError(udnErr);
}
NList appendList;
appendList.append(NList(qStr, true, true));
appendList.append(NList(objName, true, false));
appendList.append(NList((int)kind));
appendList.append(NList(coParaName[0], true, false));
appendList.append(NList(coParaName[1], true, false));
appendList.append(NList(dbName, true, false));
appendList.append(NList(isHDJ));
appendList.append(NList(psfsMode));
return NList(NList(Symbol::APPEND()),
appendList,
resultType).listExpr();
} catch(...){
return listutils::typeError("invalid input");
}
}
int hadoopReduce2ValueMap(Word* args, Word& result,
int message, Word& local, Supplier s){
if ( message <= CLOSE )
{
//Get Arguments
//Query string
string CreateQuery =
((FText*)args[4].addr)->GetValue();
//Get the object name.
string CreateObjectName =
((CcString*)args[5].addr)->GetValue();
//Get the export file type
fListKind kind =
(fListKind)((CcInt*)args[6].addr)->GetValue();
//Get the object name of the input flist
string inParaName[2] = {
((CcString*)args[7].addr)->GetValue(),
((CcString*)args[8].addr)->GetValue()};
//Get the database name
string dbName =
((CcString*)args[9].addr)->GetValue();
//Get isHDJ
bool isHDJ = ((CcBool*)args[10].addr)->GetValue();
//Get PSFS Mode
int psfsMode = ((CcInt*)args[11].addr)->GetValue();
clusterInfo *ci = new clusterInfo();
//Get the partition attribute name, since the partition is done by mappers
Supplier oppList = args[2].addr;
string PAName[2] = {
nl->SymbolValue(qp->GetType(qp->GetSupplierSon(oppList, 0))),
nl->SymbolValue(qp->GetType(qp->GetSupplierSon(oppList, 1)))};
//Optional parameters
string CreateFilePath = "";
int reduceTaskNum = ci->getSlaveSize();
int opLen = qp->GetNoSons(oppList);
for (int i = 2; i < opLen; i++)
{
//Only the file path need to be get
ListExpr pp = qp->GetType(qp->GetSupplierSon(oppList,i));
if (nl->IsEqual(pp, FText::BasicType())){
CreateFilePath = ((FText*)
qp->Request(qp->GetSupplier(oppList,i)).addr)->GetValue();
}
else if(nl->IsEqual(pp, CcInt::BasicType())){
reduceTaskNum = ((CcInt*)
qp->Request(qp->GetSupplier(oppList,i)).addr)->GetValue();
}
}
ListExpr resultType = qp->GetType(s);
fList* resultFList = new fList(CreateObjectName, NList(resultType),
ci, NList(), 1, false, kind);
fList* inputFList[2] = {(fList*)args[0].addr, (fList*)args[1].addr};
int dupTimes[2] = { inputFList[0]->getDupTimes(),
inputFList[1]->getDupTimes()};
//Prepare the reduce query
ListExpr CreateQueryList;
nl->ReadFromString(CreateQuery, CreateQueryList);
vector<string> flistParaList;
vector<fList*> flistObjList;
flistParaList.push_back(inParaName[0]); //The input parameter name
flistParaList.push_back(inParaName[1]);
flistObjList.push_back(inputFList[0]); //The input parameter value
flistObjList.push_back(inputFList[1]);
vector<string> DLF_NameList, DLF_fileLocList;
vector<string> DLO_NameList, DLO_locList;
size_t dloNumber = 0;
if (inputFList[0]->getKind() == DLO) dloNumber++;
if (inputFList[1]->getKind() == DLO) dloNumber++;
bool ok = true;
//Process all para operators inside the reduce query
CreateQueryList = replaceParaOp(
CreateQueryList, flistParaList, flistObjList, ok);
if (!ok){
cerr << "Replace para operation fails" << endl;
}
else{
CreateQueryList = replaceSecObj(CreateQueryList, ok);
if (ok)
{
//Replace parameter value according to their flist value
for (size_t i = 0; i < flistParaList.size(); i++)
{
//The top two flists are input arguments for the inter-query
//they are re-partitioned in the map step.
int argIndex = ((i == 0 || i == 1)? (i+1) : 0);
CreateQueryList =
replaceDLOF(CreateQueryList,
flistParaList[i], flistObjList[i],
DLF_NameList, DLF_fileLocList,
DLO_NameList, DLO_locList, true, ok, argIndex);
if (!ok)
break;
}
}
}
if (ok && isHDJ && (DLO_NameList.size() > dloNumber)){
cerr << "For Hadoop-based queries, it is not allowed "
"to use DLO flist in the argument function." << endl;
ok = false;
}
NList dlfNameList, dlfLocList;
NList dloNameList, dloLocList;
vector<pair<int, pair<int, int> > > hpResult;
if (!ok){
cerr << "Reading flist data fails." << endl;
}
else
{
for (size_t i = 0; i < DLF_NameList.size(); i++)
{
dlfNameList.append(NList(DLF_NameList[i], true, false));
ListExpr locList;
nl->ReadFromString(DLF_fileLocList[i], locList);
dlfLocList.append(NList(locList));
}
for (size_t i = 0; i < DLO_NameList.size(); i++)
{
dloNameList.append(NList(DLO_NameList[i], true, false));
ListExpr locList;
nl->ReadFromString(DLO_locList[i], locList);
dloLocList.append(NList(locList));
}
//The recognition of the function argument
string inputRcg[2] = {"", ""};
for (int i = 0; i < 2; i++)
{
if (inputFList[i]->getKind() == DLO)
inputRcg[i] = inputFList[i]->getSubName();
else
inputRcg[i] = "<DLFMark:Arg" + int2string(i+1) + ":"
+ inputFList[i]->getSubName() + "/>";
}
NList IUEMapQuery = NList(
inputFList[0]->getUEMapQuery(),
inputFList[1]->getUEMapQuery());
//Call the Hadoop job
stringstream queryStr;
queryStr
<< "hadoop jar ParallelSecondo.jar "
"ParallelSecondo.PS_HadoopReduce2 \\\n"
<< dbName << " " << CreateObjectName << " \\\n"
<< " \"" << tranStr(nl->ToString(CreateQueryList), "\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(dlfNameList.convertToString(), "\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(dlfLocList.convertToString(), "\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(dloNameList.convertToString(), "\"", "\\\"") << "\" \\\n"
<< " \"" << tranStr(dloLocList.convertToString(), "\"", "\\\"") << "\" \\\n"
<< " \"" << CreateFilePath << "\" \\\n"
<< " \"" << inputRcg[0] << "\" " << dupTimes[0] << " \"" << PAName[0]<< "\" "
<< " \"" << inputRcg[1] << "\" "<< dupTimes[1] << " \"" << PAName[1]<< "\" "
<< reduceTaskNum << " " << kind << " \\\n"
<< " \"" << tranStr(IUEMapQuery.convertToString(), "\"", "\\\"") << "\" "
<< boolalpha << isHDJ
<< " " << psfsMode
<< endl;
int rtn = -1;
// cout << queryStr.str() << endl;
rtn = system(queryStr.str().c_str());
ok = (rtn == 0);
if (ok)
{
FILE *fs;
char buf[MAX_STRINGSIZE];
fs = popen("hadoop dfs -cat OUTPUT/part*", "r");
if (NULL != fs)
{
while(fgets(buf, sizeof(buf), fs))
{
stringstream ss;
ss << buf;
stringstream iss(ss.str());
int rowIndex, columnIndex, slaveIndex;
iss >> rowIndex >> columnIndex >> slaveIndex;
hpResult.push_back(pair<int, pair<int, int> >
(rowIndex, pair<int, int>(columnIndex, slaveIndex)));
}
}
else
ok = false;
pclose(fs);
}
}
if (ok)
{
NList fileLocList;
if (kind == DLO)
CreateFilePath = dbLoc;
//Create file location list
vector<pair<int, pair<int, int> > >::iterator rit;
int exRowIndex = 1; //expected row number
for ( rit = hpResult.begin(); rit != hpResult.end(); rit++)
{
int rowIndex = rit->first;
for (; exRowIndex <= rowIndex; exRowIndex++)
{
NList newRow = NList();
if (rowIndex == exRowIndex){
int columnIndex = rit->second.first;
int slaveIndex = rit->second.second;
newRow = NList(NList(slaveIndex), NList(columnIndex).enclose(),
NList(CreateFilePath, true, true));
}
if (fileLocList.isEmpty()){
fileLocList.makeHead(newRow);
}
else{
fileLocList.append(newRow);
}
}
}
resultFList = new fList(CreateObjectName, NList(resultType),
ci, fileLocList,
1, //dup time
true, //is distributed
kind); //kind
}
result.setAddr(resultFList);
}
//TODO add the progress estimation in the future
// else if ( message == REQUESTPROGRESS )
// else if ( message == CLOSEPROGRESS )
return 0;
}
Operator hadoopReduce2OP(
hadoopReduce2Info(), hadoopReduce2ValueMap, hadoopReduce2TypeMap);
/*
4 Operator ~createFList~
Create an undistributed flist,
used to debug the implementation of the flist operator.
T -> flist
I disable the In and Out function of the flist,
like what BTree does, because we abandon the DGO kind object,
hence it is impossible to create a flist without running a hadoop job.
Update on 15th May 2013
In some cases, like in the parallel generation of BerlinMOD data,
data are created independently on all Data Servers.
In order to access all data on the cluster,
the ~createFList~ operator is extended.
It accepts a schema relation, to indicate the type of the distributed data.
Besides, it accepts several arguments to indicate the location of distributed data,
and returns the flist at last.
For DLF flist, this operator maps:
----
stream(tuple(T))
x ObjectName : string
x LocRel : rel(tuple((Row:int)(DS:int)(Column:int)(FilePath:text))))
x type : DLF
x Distributed : bool
\to flist(stream(tuple(T)))
----
For DLO flist, this operator maps:
----
T
x ObjectName : string
x LocRel : rel(tuple((Row:int)(DS:int)(Column:int)(FilePath:text))))
x type : DLO
x Distributed : bool
\to flist(T)
----
*/
struct createFListInfo : OperatorInfo
{
createFListInfo()
{
name = "createFList";
signature = "T x string x rel x DLO(DLF) x bool -> flist(T)";
meaning = "Create a flist object, "
"in case the data are already distributed on the cluster without "
"using the spread operator.";
}
};
/*
4.1 Type Mapping
*/
ListExpr createFListTypeMap(ListExpr args){
NList l(args);
string tpeErr = "ERROR! createFList expects "
"T x string x rel x DLO(DLF) x bool";
string lorErr = "ERROR! The location relation excepts "
"rel(tuple((Row:int) (DS:int) (Column:int) (FilePath:text)))";
string ifaErr = "ERROR! Infeasible evaluation in TM of attribute:";
string onmErr = "ERROR! Operator createFList expects the created "
"object name starts with upper case. ";
string hnmErr = "ERROR! Unmatched type "
"or Exists homonymous flist type file in: ";
string fwtErr = "ERROR! Failed writing type into file: ";
if (l.length() != 5){
return l.typeError(tpeErr);
}
try{
NList pType, pValue;
fListKind kind = DLO;
//First argument
NList inputType = l.first().first();
//Second argument : ObjectName
//If it is empty, then use the default temporal value
pType = l.second().first();
if (!pType.isSymbol(CcString::BasicType()))
return l.typeError(tpeErr);
//Third argument : location relation
pType = l.third().first();
if (!listutils::isRelDescription(pType.listExpr()))
return l.typeError(tpeErr);
NList attrList = pType.second().second();
if (attrList.length() != 4)
return l.typeError(lorErr);
for (int i = 1; i < 4; i++)
{
if (!attrList.elem(i).second().isEqual(CcInt::BasicType()))
return l.typeError(lorErr);
}
if (!attrList.fourth().second().isEqual(FText::BasicType()))
return l.typeError(lorErr);
//Fourth argument
pType = l.fourth().first();
if (pType.isSymbol("DLF")){
kind = DLF;
}
else if (pType.isSymbol("DLO")){
kind = DLO;
}
else
return l.typeError(tpeErr);
//Fifth argument
pType = l.fifth().first();
if (!pType.isSymbol(CcBool::BasicType()))
return l.typeError(tpeErr);
if ( kind == DLF )
{
if (!listutils::isTupleStream(inputType.listExpr()))
return l.typeError(tpeErr);
}
//Create the type file
ListExpr onList;
if (!QueryProcessor::GetNLArgValueInTM(l.second().second().listExpr(),
onList)){
return l.typeError(ifaErr + "ObjectName");
}
string objName = NList(onList).str();
if (objName.length() == 0)
objName = fList::tempName(false);
else
{
//Prepare the type file if the ObjectName is indicated by the user
char f = objName[0];
if ( f < 'A' || f > 'Z'){
return l.typeError(onmErr);
}
string filePath = getLocalFilePath("", (objName + "_type"), "", true);
if (FileSystem::FileOrFolderExists(filePath))
{
ListExpr exeType; //The exist type
bool ok = false;
if (nl->ReadFromFile(filePath, exeType))
{
if (nl->Equal(exeType, inputType.listExpr())){
ok = true;
}
else
{
if (kind == DLF
&& (listutils::isRelDescription(exeType)
|| listutils::isTupleStream(exeType))){
//Be more compatible with file-related operators,
//checks the stream tuple only
if (nl->Equal(nl->Second(inputType.listExpr()),
nl->Second(exeType)))
ok = true;
}
}
}
if (!ok)
return l.typeError(hnmErr + filePath);
}
else
{
ListExpr expList = inputType.listExpr();
if (nl->IsAtom(expList)){
expList = nl->OneElemList(expList);
}
if (!nl->WriteToFile(filePath, expList)){
return l.typeError(fwtErr + filePath);
}
}
}
NList resultType = NList(NList(fList::BasicType()), inputType);
return NList(
NList(Symbol::APPEND()),
NList(NList(objName, true, false),
NList((int)kind)),
resultType).listExpr();
} catch(...){
return l.typeError(tpeErr);
}
}
/*
4.2 Value Mapping
*/
int createFListValueMap(Word* args, Word& result,
int message, Word& local, Supplier s){
string dbName = fList::tempName(true);
string objName = ((CcString*)args[5].addr)->GetValue();
fListKind kind = (fListKind)((CcInt*)args[6].addr)->GetValue();
NList resultType = NList(qp->GetType(s));
clusterInfo* ci = new clusterInfo();
size_t dupTime = 1;
bool distributed = ((CcBool*)args[4].addr)->GetValue();
NList fileLocList;
fList* emptyFlist =
new fList(objName, resultType, ci, fileLocList, dupTime);
vector<pair<int, rowInLocRel> > locList;
GenericRelation* locRel = (GenericRelation*)(args[2].addr);
GenericRelationIterator* iter = locRel->MakeScan();
Tuple* nextTuple = iter->GetNextTuple();
while (!iter->EndOfScan()){
int row = ((CcInt*)nextTuple->GetAttribute(0))->GetValue();
rowInLocRel r;
r.dsIndex = ((CcInt*)nextTuple->GetAttribute(1))->GetValue();
r.column = ((CcInt*)nextTuple->GetAttribute(2))->GetValue();
r.filePath = ((FText*)nextTuple->GetAttribute(3))->GetValue();
locList.push_back(pair<int, rowInLocRel>(row, r));
nextTuple->DeleteIfAllowed();
nextTuple = iter->GetNextTuple();
}
sort(locList.begin(), locList.end(),rowRelInfo);
vector<pair<int, rowInLocRel> >::iterator llit = locList.begin();
int crow = 1;
while (llit != locList.end())
{
int row = llit->first;
if (crow < row){
for (; crow < row; crow++){
fileLocList.append(NList());
}
}
else if ( crow == row)
{
int dsIndex = llit->second.dsIndex;
string filePath = llit->second.filePath;
NList columnsList;
while (llit != locList.end())
{
if (llit->first != crow){
crow++;
break;
}
if (llit->second.dsIndex != dsIndex ||
llit->second.filePath.compare(filePath) != 0){
cerr <<
"ERROR! The format of the location relation is wrong." << endl;
result.setAddr(emptyFlist);
return 0;
}
columnsList.append(NList(llit->second.column));
llit++;
}
fileLocList.append(NList(NList(dsIndex), columnsList,
NList(filePath, true, true)));
}
else
{
// crow > crow
cerr << "ERROR! The format of the location relation is wrong." << endl;
result.setAddr(emptyFlist);
return 0;
}
}
fList* rl = new fList(objName, resultType, ci, fileLocList,
dupTime, distributed, kind);
result.setAddr(rl);
return 0;
}
Operator cflOp(
createFListInfo(), createFListValueMap, createFListTypeMap);
/*
6 Auxiliary functions
*/
/*
Checks for valid description of a flist
*/
bool isFListStreamDescription(const NList& typeInfo)
{
if (typeInfo.length() != 2){
return false;
}
if (!( typeInfo.first().isSymbol(fList::BasicType()) &&
listutils::isTupleStream(typeInfo.second().listExpr()))){
return false;
}
return true;
}
/*
Scans the query list, and replace parameter values by fList data.
DGO kind data replace the parameter with their value list,
while DLO kind data replace with object name.
If it is a DLF type list, then the file name is appended into the DLF\_NameList,
while the fileLocList is appended into the DLF\_fileLocList
Update at 13th Apr. Jiamin
Add a boolean parameter isArg with the default value of false.
This is used to denote whether the input parameter listName is
an argument of the query list.
If it is, then in the operator like ~hadoopReduce~,
this value should be re-distributed and be replaced in the map step.
Update at 30th Apr. Jiamin
Rename to replaceDLOF, mark on not only DLF, but also DLO flist,
and returns location information of both kinds flists.
Therefore, if a sub-object of a DLO flist doesn't exist on one slave data
server, no map task is deployed.
Update at 22th Aug. Jiamin
Add the boolean parameter ua (Unavailable Allowed).
If it is set as true, then for function argument (argIndex > 0),
they can be unavailable
*/
ListExpr replaceDLOF(ListExpr createQuery, string listName, fList* listObject,
vector<string>& DLF_NameList, vector<string>& DLF_fileLocList,
vector<string>& DLO_NameList, vector<string>& DLO_locList,
bool ua, bool& ok, int argIndex/* = 0*/)
{ //Replace DLO and DLF
if (!ok)
{
return nl->OneElemList(nl->SymbolAtom("error"));
}
if (nl->IsEmpty(createQuery))
return createQuery;
if (nl->IsAtom(createQuery))
{
if ((nl->AtomType(createQuery) == SymbolType) &&
(nl->SymbolValue(createQuery) == listName))
{
if (listObject->isAvailable() || (ua && argIndex > 0))
{
string objectName = listObject->getSubName();
switch (listObject->getKind())
{
case DLO:{
string paraName = objectName;
if (argIndex > 0){
stringstream ss;
ss << "<DLOMark:Arg" << argIndex << ":" << objectName << "/>";
paraName = ss.str();
}
DLO_NameList.push_back(paraName);
DLO_locList.push_back(listObject->getLocList().convertToString());
return nl->SymbolAtom(objectName);
}
case DLF:{
stringstream ss;
ss << "<DLFMark:";
if (argIndex > 0){
ss << "Arg" << argIndex << ":";
}
ss << objectName << "/>";
DLF_NameList.push_back(ss.str());
DLF_fileLocList.push_back(
listObject->getLocList().convertToString());
return nl->StringAtom(ss.str(),true);
}
default:{
ok = false;
return nl->OneElemList(nl->SymbolAtom("error"));
}
}
}
ok = false;
return nl->OneElemList(nl->SymbolAtom("error"));
}
else
return createQuery;
}
else
{
return (nl->Cons(replaceDLOF(nl->First(createQuery),
listName, listObject, DLF_NameList, DLF_fileLocList,
DLO_NameList, DLO_locList, ua, ok, argIndex),
replaceDLOF(nl->Rest(createQuery),
listName, listObject, DLF_NameList, DLF_fileLocList,
DLO_NameList, DLO_locList, ua, ok, argIndex)));
}
}
/*
Find and replace all parameters quoted by the ~para~ operator.
It should be either an flist object, or a DELIVERABLE data.
In the first case, the name and the value of the list are added to the two
vectors, respectively.
Besides, in the query list, the para(O) is replaced by O, i.e.,
the name of the flist object.
In the later case, it will be replaced by the object value in the query list.
*/
ListExpr replaceParaOp(
ListExpr queryList, vector<string>& flistNames,
vector<fList*>& flistObjects, bool& ok)
{
if (!ok){
return nl->OneElemList(nl->SymbolAtom("error"));
}
if (nl->IsEmpty(queryList))
return queryList;
if (nl->ListLength(queryList) == 2)
{
ListExpr first = nl->First(queryList);
if (nl->IsAtom(first))
{
if (nl->IsEqual(first, "para"))
{
ListExpr second = nl->Second(queryList);
string paraName = nl->ToString(second);
ListExpr paraType =
SecondoSystem::GetCatalog()->GetObjectTypeExpr(paraName);
if (nl->ListLength(paraType) > 1){
if(nl->IsEqual(nl->First(paraType), fList::BasicType())){
flistNames.push_back(paraName);
Word listValue;
bool defined;
ok = SecondoSystem::GetCatalog()->
GetObject(paraName, listValue, defined);
if (ok){
flistObjects.push_back((fList*)listValue.addr);
return nl->SymbolAtom(paraName);
}
else{
ok = false;
return nl->OneElemList(nl->SymbolAtom("error"));
}
}
}
else{
//Still allow the ~para~ operator to quote DELIVERABLE objects
ListExpr DGOValue =
SecondoSystem::GetCatalog()->GetObjectValue(paraName);
ListExpr DGOType =
SecondoSystem::GetCatalog()->GetObjectTypeExpr(paraName);
return nl->TwoElemList(DGOType, DGOValue);
}
}
}
}
if (nl->ListLength(queryList) > 0){
return (nl->Cons(
replaceParaOp(nl->First(queryList),
flistNames, flistObjects, ok),
replaceParaOp(nl->Rest(queryList),
flistNames, flistObjects, ok) ));
}
else{
return queryList;
}
}
ListExpr replaceParaOp(ListExpr queryList, Para_FList& para_flist, bool& ok) {
if (!ok) {
return nl->OneElemList(nl->SymbolAtom("error"));
}
if (nl->IsEmpty(queryList))
return queryList;
if (nl->ListLength(queryList) == 2) {
ListExpr first = nl->First(queryList);
if (nl->IsAtom(first)) {
if (nl->IsEqual(first, "para")) {
ListExpr second = nl->Second(queryList);
string paraName = nl->ToString(second);
ListExpr paraType =
SecondoSystem::GetCatalog()->GetObjectTypeExpr(
paraName);
if (nl->ListLength(paraType) > 1) {
if (nl->IsEqual(nl->First(paraType), fList::BasicType())) {
Word listValue;
bool defined;
ok = SecondoSystem::GetCatalog()->GetObject(paraName,
listValue, defined);
if (ok) {
para_flist.push_back(make_pair(
paraName, (fList*) listValue.addr));
return nl->SymbolAtom(paraName);
} else {
ok = false;
return nl->OneElemList(nl->SymbolAtom("error"));
}
}
} else {
//Still allow the ~para~ operator to quote DELIVERABLE objects
ListExpr DGOValue =
SecondoSystem::GetCatalog()->GetObjectValue(
paraName);
ListExpr DGOType =
SecondoSystem::GetCatalog()->GetObjectTypeExpr(
paraName);
return nl->TwoElemList(DGOType, DGOValue);
}
}
}
}
if (nl->ListLength(queryList) > 0) {
return (nl->Cons(
replaceParaOp(nl->First(queryList), para_flist,ok),
replaceParaOp(nl->Rest(queryList), para_flist,ok)));
} else {
return queryList;
}
}
/*
Find all DELIEVERABLE Secondo objects, and substitute it with its nested-list
expression instead of its name.
With this function, there is no need to add ~para~ operator for these symbol
objects, which was designed as DGO flist.
*/
ListExpr replaceSecObj(ListExpr queryList, bool& ok)
{
if (!ok){
return nl->OneElemList(nl->SymbolAtom("error"));
}
if (nl->IsEmpty(queryList))
return queryList;
if (nl->IsAtom(queryList))
{
string atomName = nl->ToString(queryList);
bool isObject = SecondoSystem::GetCatalog()->IsObjectName(atomName);
if (isObject)
{
ListExpr paraType =
SecondoSystem::GetCatalog()->GetObjectTypeExpr(atomName);
if (listutils::isKind(paraType, Kind::DELIVERABLE())){
ListExpr DGOValue =
SecondoSystem::GetCatalog()->GetObjectValue(atomName);
ListExpr DGOType =
SecondoSystem::GetCatalog()->GetObjectTypeExpr(atomName);
return nl->TwoElemList(DGOType, DGOValue);
}
else if (!nl->IsAtom(paraType))
{
if (!nl->IsEqual(nl->First(paraType), fList::BasicType()))
{
ok = false;
cerr << "All objects quoted in the argument function "
"should either be DELIVERABLE data, "
"or flist objects quoted by the para operator" << endl;
return nl->OneElemList(nl->SymbolAtom("error"));
}
}
}
}
if (nl->ListLength(queryList) > 0)
{
return nl->Cons(
replaceSecObj(nl->First(queryList), ok),
replaceSecObj(nl->Rest(queryList), ok));
}
else
return queryList;
}
/*
3 Class ~HadoopAlgebra~
A new subclass ~HadoopAlgebra~ of class ~Algebra~ is declared.
The only specialization with respect to class ~Algebra~ takes place within
the constructor: all type constructors and operators are registered at the
actual algebra.
After declaring the new class, its only instance ~extendedRelationAlgebra~
is defined.
*/
class HadoopAlgebra: public Algebra
{
public:
HadoopAlgebra() :
Algebra()
{
AddTypeConstructor(&flTC);
AddOperator(&spreadOp);
spreadOp.SetUsesArgsInTypeMapping();
AddOperator(&spreadFilesOp);
spreadFilesOp.SetUsesArgsInTypeMapping();
AddOperator(&collectOp);
collectOp.SetUsesArgsInTypeMapping();
AddOperator(&paraOp);
AddOperator(TParaInfo(), 0, TParaTypeMapping);
AddOperator(TPara2Info(), 0, TPara2TypeMapping);
AddOperator(&hadoopMapOP);
hadoopMapOP.SetUsesArgsInTypeMapping();
AddOperator(&hadoopMap2OP);
hadoopMap2OP.SetUsesArgsInTypeMapping();
AddOperator(&hadoopReduceOP);
hadoopReduceOP.SetUsesArgsInTypeMapping();
AddOperator(&hadoopReduce2OP);
hadoopReduce2OP.SetUsesArgsInTypeMapping();
AddOperator(&cflOp);
cflOp.SetUsesArgsInTypeMapping();
AddOperator(&pffeedOp);
pffeedOp.SetUsesArgsInTypeMapping();
AddOperator(&hadoopMapAllOP);
hadoopMapAllOP.SetUsesArgsInTypeMapping();
//Not evaluate the taskQuery on the master
hadoopMapAllOP.SetRequestsArguments();
AddOperator(&pffeed2Op);
pffeed2Op.SetUsesArgsInTypeMapping();
AddOperator(&pffeed3Op);
pffeed3Op.SetUsesArgsInTypeMapping();
}
~HadoopAlgebra()
{
}
;
};
/*
4 Initialization
Each algebra module needs an initialization function. The algebra manager
has a reference to this function if this algebra is included in the list
of required algebras, thus forcing the linker to include this module.
The algebra manager invokes this function to get a reference to the instance
of the algebra class and to provide references to the global nested list
container (used to store constructor, type, operator and object information)
and to the query processor.
The function has a C interface to make it possible to load the algebra
dynamically at runtime.
*/
extern "C" Algebra*
InitializeHadoopAlgebra(
NestedList* nlRef, QueryProcessor* qpRef)
{
nl = nlRef;
qp = qpRef;
return (new HadoopAlgebra());
}
/*
[newpage]
*/