Files
secondo/Algebras/Hadoop/Java/ParallelSecondo/HPA_AuxFunctions.java

860 lines
23 KiB
Java
Raw Permalink Normal View History

2026-01-23 17:03:45 +08:00
package ParallelSecondo;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ServerSocket;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.StringTokenizer;
import sj.lang.ListExpr;
/**
*
* @author Jiamin
*
* This class contains functions that prepared for processing auxliary functions
* in parallel operators of the HadoopParallelAlgebra.
*
*
*/
public class HPA_AuxFunctions extends ListExpr implements Constant{
/**
* Prepare the file list for each mapper
*
* This function deploy each row to a mapper running at the node
* where the data are kept.
*
*
* @param nameList
* @param locList
* @param mapTasksNum
* @return
*/
public static ListExpr flist2Mapper(ListExpr nameList, ListExpr locList, int mapTasksNum)
{
ListExpr[] mapLoc = new ListExpr[mapTasksNum];
ListExpr[] mapLoc_last = new ListExpr[mapTasksNum];
// ListExpr mapList;
ListExpr locRest = locList;
ListExpr nameRest = nameList;
while (!locRest.isEmpty())
{
//One of the several flist parameters prepared for the map step.
ListExpr aParaMatrix = locRest.first();
ListExpr aParaName = nameRest.first();
int rowNumber = 1;
String paraName = aParaName.stringValue();
ListExpr[] AParaLoc = new ListExpr[mapTasksNum];
ListExpr[] AParaLoc_last = new ListExpr[mapTasksNum];
/*
The row number is not explicitly indicated in the location list,
hence if one row is only distributed to one mapper,
then the other mappers should indicate that as an empty row.
*/
if (paraName.matches(INDLOFPattern))
{
while (!aParaMatrix.isEmpty())
{
ListExpr row = aParaMatrix.first();
int slaveIdx = -1;
if (!row.isEmpty()){
slaveIdx = row.first().intValue() - 1;
}
for (int mcnt = 0; mcnt < mapTasksNum; mcnt++)
{
if (mcnt != slaveIdx){
AParaLoc[mcnt] = ListExpr.concat(AParaLoc[mcnt], ListExpr.theEmptyList());
}
else{
AParaLoc[mcnt] = ListExpr.concat(AParaLoc[mcnt], row);
}
}
aParaMatrix = aParaMatrix.rest();
rowNumber++;
}
}
//Merge locations for each mapper
//For input arguments, then divide its location to each mapper
//Or else, copy its location to every mapper
for (int mcnt = 0; mcnt < mapTasksNum; mcnt++)
{
ListExpr mapperLocList = aParaMatrix;
if (AParaLoc[mcnt] != null)
mapperLocList = AParaLoc[mcnt];
if (mapLoc[mcnt] == null){
mapLoc[mcnt] = ListExpr.oneElemList(mapperLocList);
mapLoc_last[mcnt] = mapLoc[mcnt];
}
else{
mapLoc_last[mcnt] = ListExpr.append(mapLoc_last[mcnt], mapperLocList);
}
}
locRest = locRest.rest();
nameRest = nameRest.rest();
}
ListExpr allMappers = new ListExpr(), allMappers_last = null;
for (int mcnt = 0; mcnt < mapTasksNum; mcnt++)
{
ListExpr oneMapper = new ListExpr();
if (mapLoc[mcnt] != null){
oneMapper = mapLoc[mcnt];
}
if (allMappers.isEmpty()){
allMappers = ListExpr.oneElemList(oneMapper);
allMappers_last = allMappers;
}
else{
allMappers_last = ListExpr.append(allMappers_last, oneMapper);
}
}
return allMappers;
}
/**
* Prepare the file list for each mapper.
*
* This function deploy rows to mappers evenly,
* and let each mapper reads data from its local disk as much as possible.
*
* TODO : The problem is that how can I find a way that distributes rows
* evenly on mappers, and guarantees each mapper reads the data from its
* local disk as much as possible ????
*
*
* At present, first distribute files to more tasks.
*
*
* @param nameList
* @param locList
* @param mapTasksNum
* @return
*/
public static ListExpr flist2Mapper2(
ListExpr namesList, ListExpr locsList, int mapTasksNum, int slavesNum){
ListExpr[] mapLoc = new ListExpr[mapTasksNum];
ListExpr nameRest = namesList;
ListExpr locRest = locsList;
while (!locRest.isEmpty())
{
ListExpr nameList = nameRest.first();
String name = nameList.stringValue();
ListExpr matrixList = locRest.first();
ListExpr[] oneLoc = new ListExpr[mapTasksNum];
if (name.matches(INDLOFPattern))
{
int rowNum = matrixList.listLength();
//The lower limit of files in each mapper
int avgMRNum = rowNum / mapTasksNum;
while (!matrixList.isEmpty())
{
ListExpr row = matrixList.first();
int mapperIdx = -1;
if (!row.isEmpty()){
int slaveIdx = row.first().intValue() - 1;
mapperIdx = slaveIdx;
//Decide insert this file to which mapper
while (mapperIdx < mapTasksNum)
{
if (oneLoc[mapperIdx] == null){
break;
}
else if (oneLoc[mapperIdx].listLength() <= avgMRNum){
break;
}
else{
int cand = mapperIdx + slavesNum;
if (cand < mapTasksNum){
mapperIdx = cand;
continue;
}
else
break;
}
/* if (oneLoc[mapperIdx] == null)
break;
else if (oneLoc[mapperIdx].listLength() <= avgMRNum){
if (mapperIdx < slavesNum)
break;
else if (mapperIdx < mapTasksNum){
mapperIdx++;
break;
}
}
else if (mapperIdx < slavesNum)
mapperIdx = slavesNum;
else
mapperIdx++;
*/
}
//Set this file to the indicated mapper,
//and left an empty row for other mappers
for (int mcnt = 0; mcnt < mapTasksNum; mcnt++)
{
if (mcnt != mapperIdx){
oneLoc[mcnt] = ListExpr.concat(oneLoc[mcnt], ListExpr.theEmptyList());
}
else{
oneLoc[mcnt] = ListExpr.concat(oneLoc[mcnt], row);
}
}
}
matrixList = matrixList.rest();
}
}
//Merge locations for each mapper
for (int mcnt = 0; mcnt < mapTasksNum; mcnt++)
{
ListExpr mapperLocList = matrixList;
if (oneLoc[mcnt] != null)
mapperLocList = oneLoc[mcnt];
mapLoc[mcnt] = ListExpr.concat(mapLoc[mcnt], mapperLocList);
}
nameRest = nameRest.rest();
locRest = locRest.rest();
}
ListExpr allMappers = null;
for (ListExpr mapper : mapLoc )
{
if (mapper == null)
allMappers = ListExpr.concat(allMappers, ListExpr.theEmptyList());
else
allMappers = ListExpr.concat(allMappers, mapper);
}
return allMappers;
}
/**
*
* Find the first slave index from the given location list for a mapper.
* Usually all rows given to a mapper should all inside a same node
* and we always take the first slave to be the executer of that mapper.
*
* @param locList
* @return
*/
public static int findFirstSlave(ListExpr locList)
{
int slaveIdx = 0;
ListExpr objs = locList;
while (!objs.isEmpty())
{
ListExpr rows = objs.first();
while (!rows.isEmpty()){
if (!rows.first().isEmpty()){
slaveIdx = rows.first().first().intValue();
break;
}
rows = rows.rest();
}
if ( slaveIdx > 0 )
break;
else
objs = objs.rest();
}
return slaveIdx;
}
/**
* Used to divide the flists that are prepared for map and reduce tasks
*
* The returned loc list contains two parts, one for mapper and the other one for reducer.
* Each half contains two lists too, one for name and the other one for the locations.
*
* This function can only be used after the flist2Mapper is done
*
*
* @param nameList
* @param locList
* @param mapTasksNum
* @return
*/
public static ListExpr divMRDLO(ListExpr nameList, ListExpr locList, int mapTasksNum)
{
ListExpr locRest = locList;
ListExpr mapperName = null, reducerName = null;
ListExpr mapperLoc = null, reducerloc = null;
mapperName = mapperLoc = reducerName = reducerloc = ListExpr.theEmptyList();
for (int mcnt = 0; mcnt < mapTasksNum; mcnt++)
{
ListExpr mapLocs = locRest.first();
ListExpr oneMapper = null, oneMapperName = null;
ListExpr oneReducer = null, oneReducerName = null;
oneMapperName = oneMapper = oneReducerName = oneReducer = ListExpr.theEmptyList();
ListExpr nameRest = nameList;
while (!nameRest.isEmpty())
{
String fileName = nameRest.first().stringValue();
ListExpr fileLocation = mapLocs.first();
if (fileName.matches(INDLOFPattern))
{
//Prepare this file for mappers
oneMapperName = ListExpr.concat(oneMapperName,
ListExpr.oneElemList(ListExpr.stringAtom(fileName)));
oneMapper = ListExpr.concat(oneMapper, fileLocation);
}
else
{
//Prepare this file for reducers
oneReducerName = ListExpr.concat(oneReducerName,
ListExpr.oneElemList(ListExpr.stringAtom(fileName)));
oneReducer = ListExpr.concat(oneReducer, fileLocation);
}
nameRest = nameRest.rest();
mapLocs = mapLocs.rest();
}
mapperName = ListExpr.concat(mapperName, oneMapperName);
mapperLoc = ListExpr.concat(mapperLoc, oneMapper);
reducerName = ListExpr.concat(reducerName, oneReducerName);
reducerloc = ListExpr.concat(reducerloc, oneReducer);
locRest = locRest.rest();
}
return ListExpr.twoElemList(
ListExpr.twoElemList(mapperName, mapperLoc),
ListExpr.twoElemList(reducerName, reducerloc));
}
public static String plainStr(ListExpr list)
{
return list.toString().replaceAll("\n", " ").
replaceAll("\t", " ").replaceAll(" +", " ");
}
/**
* Check whether all objects are prepared for mappers in hadoopMap operator
*
* @param locations
* @return
*/
public static boolean allMapperFOExist(ListExpr locations)
{
if (locations.isEmpty()) return true;
ListExpr obj = locations;
while (!obj.isEmpty()){
ListExpr rows = obj.first();
while (!rows.isEmpty())
{
if (!rows.first().isEmpty()){
return true;
}
rows = rows.rest();
}
return false; //One file is empty on all rows
}
return false;
}
/**
* Check whether all objects prepared for hadoopReduce & hadoopReduce2
*
* @param names
* @param locs
* @return
*/
public static boolean allObjectExist(ListExpr names, ListExpr locs)
{
if (names.isEmpty())
return true;
ListExpr nmRest = names;
ListExpr lcRest = locs;
while (!nmRest.isEmpty()){
ListExpr rows = lcRest.first();
while (!rows.isEmpty()){
if (!rows.first().isEmpty()){
return true;
}
rows = rows.rest();
}
lcRest = lcRest.rest();
nmRest = nmRest.rest();
}
return false;
}
/**
* Check whether the object with ~objName~ exists,
* find name in the nameList, and its corresponding location list is not empty.
*
*
* @param objName
* @param nameList
* @param locsList
* @return
*/
/*
* if (fileName.matches(DLOFPattern))
fileName = fileName.substring(fileName.lastIndexOf(':') + 1,
fileName.lastIndexOf("/>"));
*/
public static boolean objectExist(String objName, ListExpr nameList, ListExpr locsList)
{
ListExpr nameRest = nameList;
ListExpr locsRest = locsList;
while (!nameRest.isEmpty())
{
ListExpr nml = nameRest.first();
ListExpr lcl = locsRest.first();
String name;
if (nml.isAtom())
name = nml.stringValue();
else
name = nml.first().stringValue();
if (name.matches(DLOFPattern))
{
name = name.substring(name.lastIndexOf(':') + 1,
name.lastIndexOf("/>"));
if (objName.compareTo(name) == 0)
{
ListExpr rows = lcl;
while (!rows.isEmpty()){
if (!rows.first().isEmpty()){
return true;
}
rows = rows.rest();
}
}
}
nameRest = nameRest.rest();
locsRest = locsRest.rest();
}
return false;
}
/**
* Use the flist location to replace the flist subsititution mark.
*
*
*
* @param queryList
* @param nameList
* @param loclList
* @param dupTimes
* @return queryList
*/
public static ListExpr loc2Ffeed (ListExpr queryList, ListExpr nameList,
ListExpr loclList, int dupTimes/*, String tuplePrefix*/)
{
ListExpr nameRest = nameList;
ListExpr loclRest = loclList;
int allRowNum = 0;
boolean replaced = true;
while (!nameRest.isEmpty())
{
ListExpr fileNameList = nameRest.first();
String fileName = fileNameList.stringValue();
if (fileName.matches(DLOFPattern))
fileName = fileName.substring(fileName.lastIndexOf(':') + 1,
fileName.lastIndexOf("/>"));
ListExpr fileLoc = loclRest.first();
if (fileLoc.isEmpty()){
System.out.println("Mission failed, as fileLoc is: " + fileLoc.toString());
replaced = false;
break;
}
//For one flist mark
ListExpr inputStreamList = null;
if (!fileLoc.isEmpty())
{
int rowNum = 0;
int appendRowNum = 0;
ListExpr fl_rest = fileLoc;
while (!fl_rest.isEmpty())
{
ListExpr rowInfo = fl_rest.first();
if (!rowInfo.isEmpty())
{
appendRowNum++;
int locSlave = rowInfo.first().intValue();
ListExpr columns = rowInfo.second();
String filePath = rowInfo.third().stringValue();
ListExpr columnTRel = null;
if (!columns.isEmpty()){
ListExpr trelLast = null;
while (!columns.isEmpty()){
int column = columns.first().intValue();
if (columnTRel == null){
columnTRel = ListExpr.oneElemList(
ListExpr.oneElemList(ListExpr.intAtom(column)));
trelLast = columnTRel;
}
else{
trelLast = ListExpr.append( trelLast,
ListExpr.oneElemList(ListExpr.intAtom(column)));
}
columns = columns.rest();
}
}
else{
columnTRel = ListExpr.theEmptyList();
}
// Avoid generating normal relation files in database
columnTRel = ListExpr.twoElemList(
ListExpr.twoElemList(
ListExpr.symbolAtom("trel"),
ListExpr.twoElemList(
ListExpr.symbolAtom("tuple"),
ListExpr.oneElemList(
ListExpr.twoElemList(
ListExpr.symbolAtom(fsName),
ListExpr.symbolAtom("int"))))),
columnTRel);
columnTRel = ListExpr.twoElemList(
ListExpr.symbolAtom("feed"),
columnTRel);
String tupleParaName = "XxxTP" + allRowNum;
ListExpr ffeedOneRow =
ListExpr.oneElemList(ListExpr.symbolAtom("ffeed"));
ListExpr ffeed_last = ffeedOneRow;
ffeed_last =
ListExpr.append(ffeed_last, ListExpr.stringAtom(fileName));
ffeed_last = ListExpr.append(ffeed_last,
ListExpr.threeElemList(ListExpr.textAtom(filePath),
ListExpr.intAtom(rowNum + 1),
ListExpr.threeElemList(
ListExpr.symbolAtom("attr"),
ListExpr.symbolAtom(tupleParaName),
ListExpr.symbolAtom(fsName))));
ffeed_last = ListExpr.append(ffeed_last, ListExpr.theEmptyList());
//Search type file at local disk
ffeed_last = ListExpr.append(ffeed_last,
ListExpr.threeElemList(
ListExpr.intAtom(locSlave),
ListExpr.intAtom(locSlave),
ListExpr.intAtom(dupTimes)));
ffeedOneRow = ListExpr.threeElemList(
ListExpr.symbolAtom("loopsel"),
columnTRel,
ListExpr.threeElemList(
ListExpr.symbolAtom("fun"),
ListExpr.twoElemList(
ListExpr.symbolAtom(tupleParaName),
ListExpr.symbolAtom("TUPLE")),
ffeedOneRow));
if (inputStreamList == null){
inputStreamList = ffeedOneRow;
}
else{
//Concat the former one with the current one.
inputStreamList = ListExpr.threeElemList(
ListExpr.symbolAtom("concat"),
inputStreamList, ffeedOneRow);
}
}
fl_rest = fl_rest.rest();
rowNum++;
allRowNum++;
}
if (appendRowNum == 0){
replaced = false; //all rows are empty
}
}
if (replaced){
queryList = ExtListExpr.replace(queryList, fileNameList, inputStreamList);
}
else{
return ListExpr.theEmptyList();
}
nameRest = nameRest.rest();
loclRest = loclRest.rest();
}
return queryList;
}
public static ListExpr feedRows(Iterator<Integer> it)
{
ListExpr result = ListExpr.theEmptyList();
while (it.hasNext())
{
int row = it.next();
result = ListExpr.concat(result,
ListExpr.twoElemList(
ListExpr.intAtom(row), ListExpr.intAtom(row)) );
}
result = ListExpr.twoElemList(
ListExpr.twoElemList(
ListExpr.symbolAtom("trel"),
ListExpr.twoElemList(
ListExpr.symbolAtom("tuple"),
ListExpr.twoElemList(
ListExpr.twoElemList(
ListExpr.symbolAtom(frsName),
ListExpr.symbolAtom("int")),
ListExpr.twoElemList(
ListExpr.symbolAtom(fssName),
ListExpr.symbolAtom("int"))))), //TRel Definition
result //TRel Value
);
result =
ListExpr.twoElemList( ListExpr.symbolAtom("feed"), result);
return result;
}
public static ListExpr feedInterResult(int columnNo, String interResultName,
String interFileTupleName, int typeNodeIdx, ListExpr feedList)
{
ListExpr result = ListExpr.oneElemList(ListExpr.symbolAtom("ffeed"));
ListExpr last = result;
last = ListExpr.append(last, ListExpr.stringAtom(
interResultName.substring(
interResultName.lastIndexOf(":") + 1,
interResultName.lastIndexOf("/>"))));
last = ListExpr.append(last, ListExpr.threeElemList(
ListExpr.textAtom(""),
ListExpr.threeElemList(
ListExpr.symbolAtom("attr"),
ListExpr.symbolAtom(interFileTupleName),
ListExpr.symbolAtom(frsName)), //get file row_columnNo
ListExpr.intAtom(columnNo)));
last = ListExpr.append(last, ListExpr.oneElemList(
ListExpr.intAtom(typeNodeIdx))); //Type Remote node
last = ListExpr.append(last, ListExpr.threeElemList(
ListExpr.threeElemList(
ListExpr.symbolAtom("attr"),
ListExpr.symbolAtom(interFileTupleName),
ListExpr.symbolAtom(fssName)), //create by a slave
ListExpr.threeElemList(
ListExpr.symbolAtom("attr"),
ListExpr.symbolAtom(interFileTupleName),
ListExpr.symbolAtom(fssName)), //also locate at it
ListExpr.intAtom(1))); //without duplication
result =
ListExpr.threeElemList(
ListExpr.symbolAtom("loopsel"),
feedList,
ListExpr.threeElemList(
ListExpr.symbolAtom("fun"),
ListExpr.twoElemList(
ListExpr.symbolAtom(interFileTupleName),
ListExpr.symbolAtom("TUPLE")),
result));
return result;
}
/**
* Prepared for new created pffeed operator
* Create a temporary relation describe the distribution of one column
* in the intermediate flist
*
*/
public static ListExpr feedColumn(Iterator<Integer> it, int column)
{
ListExpr result = ListExpr.theEmptyList();
while (it.hasNext())
{
int row = it.next();
result = ListExpr.concat(result,
ListExpr.threeElemList(ListExpr.intAtom(row),
ListExpr.intAtom(column), ListExpr.intAtom(row)) );
}
result = ListExpr.twoElemList(
ListExpr.twoElemList(
ListExpr.symbolAtom("trel"),
ListExpr.twoElemList(
ListExpr.symbolAtom("tuple"),
ListExpr.threeElemList(
ListExpr.twoElemList(
ListExpr.symbolAtom(frsName),
ListExpr.symbolAtom("int")),
ListExpr.twoElemList(
ListExpr.symbolAtom(fcsName),
ListExpr.symbolAtom("int")),
ListExpr.twoElemList(
ListExpr.symbolAtom(fssName),
ListExpr.symbolAtom("int"))))), //TRel Definition
result //TRel Value
);
result =
ListExpr.twoElemList( ListExpr.symbolAtom("feed"), result);
return result;
}
/**
* Using pffeed operator to fetch intermediate result.
*
*
*/
public static ListExpr feedInterResult2( String interResultName,
int typeNodeIdx, ListExpr feedList, int PSFSMode)
{
String foName = (PSFSMode == 1) ? "pffeed" : ((PSFSMode == 2) ? "pffeed2" : "pffeed3");
ListExpr result = ListExpr.oneElemList(ListExpr.symbolAtom(foName));
ListExpr last = result;
last = ListExpr.append(last, feedList);
last = ListExpr.append(last, ListExpr.fourElemList(
ListExpr.symbolAtom(frsName),
ListExpr.symbolAtom(fcsName),
ListExpr.symbolAtom(fssName),
ListExpr.stringAtom(
interResultName.substring(
interResultName.lastIndexOf(":") + 1,
interResultName.lastIndexOf("/>")))));
last = ListExpr.append(last,
ListExpr.oneElemList(ListExpr.intAtom(typeNodeIdx)));
return result;
}
// ------------------------------------------------------------------------
// Functions for HDJ
// ------------------------------------------------------------------------
public static int getPort() throws IOException
{
int transPortNum = Integer.parseInt(getPID());
transPortNum = 40000 + transPortNum % 7000;
while(true){
ServerSocket tmpSock = null;
try{
tmpSock = new ServerSocket(transPortNum);
}catch(IOException e){
transPortNum++;
tmpSock = null;
}finally{
if (tmpSock != null){
try {
tmpSock.close();
} catch (IOException e) {
e.printStackTrace();
}
break;
}
}
}
return transPortNum;
}
public static String getPID() throws IOException{
String pid = System.getProperty("pid"); //NOI18N
if(pid==null){
String cmd[];
File tempFile = null;
if(System.getProperty("os.name").toLowerCase().indexOf("windows")==-1)
cmd = new String[]{ "/bin/sh", "-c", "echo $$ $PPID" }; //NOI18N
else{
// getpids.exe is taken from http://www.scheibli.com/projects/getpids/index.html (GPL)
tempFile = File.createTempFile("getpids", "exe"); //NOI18N
// extract the embedded getpids.exe file from the jar and save it to above file
pump(HPA_AuxFunctions.class.getResourceAsStream("getpids.exe"), new FileOutputStream(tempFile), true, true); //NOI18N
cmd = new String[]{ tempFile.getAbsolutePath() };
}
if(cmd!=null){
Process p = Runtime.getRuntime().exec(cmd);
ByteArrayOutputStream bout = new ByteArrayOutputStream();
pump(p.getInputStream(), bout, false, true);
if(tempFile!=null)
tempFile.delete();
StringTokenizer stok = new StringTokenizer(bout.toString());
stok.nextToken(); // this is pid of the process we spanned
pid = stok.nextToken();
if(pid!=null)
System.setProperty("pid", pid); //NOI18N
}
}
return pid;
}
public static void pump(InputStream in, OutputStream out, boolean closeIn, boolean closeOut) throws IOException{
byte[] bytes = new byte[1024];
int read;
try{
while((read=in.read(bytes))!= -1)
out.write(bytes, 0, read);
}finally{
if(closeIn)
in.close();
if(closeOut)
out.close();
}
}
}