Files
secondo/Algebras/HadoopParallel
2026-01-23 17:03:45 +08:00
..
2026-01-23 17:03:45 +08:00
2026-01-23 17:03:45 +08:00
2026-01-23 17:03:45 +08:00
2026-01-23 17:03:45 +08:00
2026-01-23 17:03:45 +08:00
2026-01-23 17:03:45 +08:00
2026-01-23 17:03:45 +08:00
2026-01-23 17:03:45 +08:00
2026-01-23 17:03:45 +08:00
2026-01-23 17:03:45 +08:00
2026-01-23 17:03:45 +08:00
2026-01-23 17:03:45 +08:00
2026-01-23 17:03:45 +08:00
2026-01-23 17:03:45 +08:00
2026-01-23 17:03:45 +08:00
2026-01-23 17:03:45 +08:00
2026-01-23 17:03:45 +08:00

/*
----
This file is part of SECONDO.

Copyright (C) 2004-2008, University in Hagen, Faculty of Mathematics and
Computer Science, Database Systems for New Applications.

SECONDO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.

SECONDO is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with SECONDO; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
----

//paragraph [1] Title: [{\Large \bf] [}]
//paragraph [10] Footnote: [{\footnote{] [}}]
//[TOC] [\tableofcontents]
//[newpage] [\newpage]

[1] File-Related Operators in the HadoopParallelAlgebra 

June 2011 Jiamin Lu

[TOC]

[newpage]

1 Abstract

This document explains the file-related operators of 
the HadoopParallelAlgebra: ~fconsume~, ~ffeed~ and ~fdistribute~, 
and their fault-tolerance feature inside a computer cluster 
environment. 

The ~fconsume~ and ~fdistribute~ operators export (the data of) a stream of tuples 
into the file system. The ~fconsume~ writes 
the whole stream of tuples into one binary file, and the ~fdistribute~ 
divides tuples into different files based on one partition 
attribute's values. These binary files are called ~data files~. 
At the same time, the type list of the input tuple stream is kept 
in a text file, which is called ~type file~. After the export, 
these files can be imported into the Secondo database efficiently 
by using the ~ffeed~ operator.

Inside a computer cluster, the above type file and data files can be 
duplicated on several computers, in case some of them may be corrupt 
during the parallel processing. At present, we pick up a simple 
chained-declustering mechanism [YYTM10] to manage these duplicated files.
All computers involved in keeping these duplicated files build up 
a simple distributed file system of the parallel Secondo. 

2 Basic Functions

2.1 File Names

These file-related operators exchange data between Secondo database 
and file systems, and we use file names to identify different files. 
Hence the string name must be given by the user, and it will be 
used as the prefix name of all involved files. 

As we explained in the last section, there are two kinds of files 
that are processed by these operators, a text type file and binary data files. 
The type file's name is fixed by connecting the given file name
and a string of "type", with an underscore. 
E.g., if we set the name as FILENAME, then the type file's name is 
FILENAME\_type.

There are totally three kinds of scenarios when using these 
file-related operators. First, a Secondo relation may be exported 
into one data file completely. Second, a Secondo relation may be 
exported into several data files for parallel processing. 
At last, a Secondo relation may be divided by several computers, 
and is partitioned into a matrix file at last. 

At the first situation, the binary data file's name will be the same 
as the given name. But in the other two situations, these data files 
share the given name as the prefix string of their names 
to denote they are belong to a same Secondo relation, 
and use integer numbers as the postfix string to distinguish 
them from each other. The prefix name and postfix numbers are 
connected by underscores too. 
E.g., the file name of FILENAME\_0\_1 means 
this is a data file with the given name of FILENAME,
and it locates in the 1st row, 2nd column of a file matrix.


2.2 File Path

All three file-related operators set the file path as an indispensable 
parameter, which determines a local directory that keeps all these 
type and data files. If the file path is used, then it must be 
an absolute path. But it can also be set to an empty text string,
and then the default file path will be used. 

The default path is set inside the SecondoConfig.ini file, 
which normally is kept in \$SECONDO\_BUILD\_DIR/bin. 
Inside the file, a parameter called *SecondoFilePath*, 
belonging to the *ParallelSecondo* environment, denotes the file path.
If this parameter is not defined, 
or it does not exist and cannot be created, then the directory in 
\$SECONDO\_BUILD\_DIR/bin/parallel will be used as the default file path.
If this path is still unavailable, then the operation fails.


2.3 fconsume and fdistribute Operators

~fconsume~ exports a stream of tuples into a data file, 
it's syntax is:  

----
(stream(tuple(...))
  x fileName x filePath x [rowNum] x [colNum]	;
  x [typeLoc1] x [typeLoc2]			;
  x [targetIndex x dupTimes]
-> bool
----

~fdistribute~ distributes a stream of tuples into several data files, 
and it's syntax is:  

----
stream(tuple(...))
x fileName x path x partAttr x [rowNum]  ;
x [nBuckets] x [KPA]                     ;
x [typeNodeIndex1] x [typeNodeIndex2]    ;
x [targetIndex x dupTimes ]              
-> stream(tuple(fileSuffix, value))
---- 

Both operators export a stream of tuples data into the file system. 
Therefore they set a stream of tuples, a fileName and
the file path as indispensable parameters.

Besides these parameters, ~fconsume~ also accepts 
the rowNum and colNum as optional parameters. 
If they are not defined, then the data file will be named 
with the given fileName directly. 
The rowNum and colNum are not dependent on each other. 
If only one number is given, then this number can be viewed as 
a row number or a column number, and it doesn't affect the exported
file's name.

The ~fdistribute~ requires another indispensable parameter partAttr, 
which divides tuples into different data files based on its value.
The type of the partAttr must provide an available HashValue function, 
since we rely on this function to classify the tuples. 
In some cases, directly dividing the tuples based on partAttr's 
hash values is not a good idea, since it may produce an extremely uneven 
partition. The user can set the optional parameter nBucket 
which is used to re-hash these tuples, and get a new hash value
to achieve an even partition.
These hash values are viewed as column numbers of the data files, 
during the matrix file distribution. The optional integer parameter 
rowNum can also be set, and it will be set in data files' names 
between the fileName and the column number. 

The ~fconsume~ operator exports tuples' complete values into the data file, 
but the ~fdistribute~ operator can choose whether keeping 
the partition attribute or not, by setting the 
optional parameter KPA (Keep Partition Attribute). 
This is because sometimes the partition attribute is a temporary attribute. 
By default, the KPA is set to false.

The above functions are called ~local mode~ of these two operators, 
users can use these two operators on a single computer too. 
Besides the local mode, there are two other groups of parameters 
used in a cluster environment, to duplicate those produced type
and data files into some remote computers. Parameters of different 
modes are separated into groups by semicolons, 
and the other two groups' parameters are all optional. 
Hence if you can don't need to duplicate these files, 
these parameters can be simply set as empty. E.g., a query like: 

----    
query plz feed fconsume["PLZFILE",'';;]
----    

will create two files, one is PLZFILE\_type that keeps the nested list 
of the relation plz. The other file is PLZFILE, which keeps all the 
binary data of the tuples inside the relation plz. 
Both files will be stored in default paths of the computer.  
Since the fileSuffix is set as empty, the data file doesn't have 
any suffix number. At the same time, all the parameters for 
file duplicating are set as empty, therefore these files won't be 
duplicated to any other machines. 


2.4 ffeed Operator

The ~ffeed~ operator is used to import the files produced by 
~fconsume~ and ~fdistribute~ operators back into the Secondo 
database again. The syntax of the ~ffeed~ is: 

----
fileName 
x filePath x [rowNum] x [colNum]                ;
x [typeNodeIndex]                               ;
x [producerIndex x targetIndex x attemptTimes]  
->stream(tuple(...))
---- 

The type of the output tuple stream is set by reading the nested-list 
in the type file, which is named *fileName\_type*. 
The data of the tuples are read from the binary data file, 
and it feeds in only one data file each time. 
If the optional parameters rowNum or colNum are defined, 
then the data file's name is composed by these parameters.
If the filePath is empty, Secondo will search both type and 
data files in the default file path. 

Similar as the above operators, ~ffeed~ also offers some other 
groups of optional parameter for feeding files from other machines. 
Parameters belonging to different groups are divided by semicolons. 
If the user doesn't need the remote-feed functions, then he doesn't 
have to set any other parameters, but still needs to write 
two semicolons to denote the empty parameters. E.g. a query like: 

----
  query "PLZFILE" ffeed['';;] count
----

can feed in the files from the local computer that we created 
in the last example.



3 Fault-Tolerance in a Cluster

During the parallel processing, data are kept as files on disks 
of different computers. 
Sometimes, especially within a cluster of thousands of computers, 
it is common that several computers may crash during the processing, 
and all the data kept in these computers become inavailable. 
In case of this kind of problem, data must be duplicated on several computers, 
in order to keep the reliability of them.

In Parallel Secondo, we pick up the simple chained-declustering 
mechanism to duplicate these files. I.e., nodes are listed one 
by one into a slaveList, and each file can be duplicated into several successive 
~n~ nodes within the slaveList, where the ~n~ is denoted by the user himself. 

The slaveList follows the schema: 

----
IPAddress:FilePath:MonitorPort
----

The IPAddress denotes files' locations inside the computer cluster, 
then the filePath denotes the locations inside nodes' disks.
Then the last part defines a port number on which a Secondo Monitor 
listens to the requirements from different nodes. 

As in the HadoopParallel algebra data are kept in two kinds of files, 
the type file and the data files, we duplicate these files separately.
In all file-related operators, besides the above parameters that are 
used for the local mode, there are two further groups of parameters 
prepared for the remote modes, i.e. the fault-tolerance feature 
of Parallel Secondo. 

The first group is used to process the type file on remote machines, 
hence we call it  ~type remote mode parameters~. 
The parameters are integer numbers and indicate target machines' indices 
inside the slaveList. For ~fconsume~ and ~fdistribute~ operators, 
at most two remote nodes' indices can be set as parameters, 
i.e., the type file produced by these two operators can be duplicated 
into at most two other nodes. And for the ~ffeed~ operator, 
only one node index can be set as type remote index. 

The second group is used to process data files on remote machines, 
hence we call it  ~data remote mode parameters~. 
In ~fconsume~ and ~fdistribute~ operators, the data remote parameters
are the same, and are composed by two integers; one defines the first 
target node, and the other defines the number of duplicates.
Sometimes, if the local node is not included among the duplication 
nodes, then the created data file will be removed after the duplication. 
If one node is defined twice during the duplication, the copy only 
happens once. 
It is common that two nodes produce files with a same name, 
and both attempt to duplicate the file into another node's disk. 
To distinguish the files that are created by the current node, 
and the files that are copied from other nodes, 
all files created by other nodes must use the producer's IP address 
as the postfix of the file name. 

In ~ffeed~, there are totally three parameters required for 
the data remote mode: producerIndex, targetIndex and attemptTimes.
The producerIndex identifies which node created the data file. 
Then it uses the targetIndex to locate the node from which the operator 
starts to look for the data file. At last, the attemptTimes 
denotes the maximum attempt times on different machines for searching 
the file. If the operator cannot find the data file after attemptTimes
search, then nothing will be returned, or else it will copy the data 
from the remote machine to the local disk, then read the tuples 
from the data file. 


*/