package ParallelSecondo; import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; import java.sql.Timestamp; import java.util.ArrayList; import java.util.List; import java.util.Scanner; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import sj.lang.ListExpr; public class PS_HadoopReduce implements Constant{ private final static String JOBID = "HASEC_JOB_" + new Timestamp(System.currentTimeMillis()).toString() .replace("-", "").replace(".", "") .replace(" ", "").replace(":", ""); /** * @param args * * DatabaseName * CreateObjectName * CreateQuery * DLF_Name_List * DLF_fileLoc_List * DLO_Name_List * DLO_Loc_List * duplicateTimes * fList_Kind * createFilePath * InputObjectName * [ MapQuery * Map_DLF_Name_List * Map_DLF_Loc_List * Map_DLO_Name_List * Map_DLO_Loc_List * MapTaskNum * ] * PartAttributeName * ReduceTaskNum * PSFSMode */ public static void main(String[] args) { final int paraLength = 14; final int paraLength2 = 20; String usage = "Usage HadoopReduce " + " " + " " + " " + " " + " [ " + " " + " " + " ] " + " "; boolean runMapper = false; if (args.length == paraLength2) { runMapper = true; } else if (args.length != paraLength) { System.err.println(usage); System.out.println("You input " + args.length + " arguments"); System.exit(-1); } //Get the master and slave nodes information from the files set by //PARALLEL_SECONDO_MASTER & PARALLEL_SECONDO_SLAVES String slFile = System.getenv().get("PARALLEL_SECONDO_SLAVES"); if (slFile.length() == 0) { System.err.println( "The Slave list PARALLEL_SECONDO_SLAVES " + "is not defined at current node."); System.exit(-1); } List slaves = new ArrayList(); try { Scanner scanner; scanner = new Scanner(new FileInputStream(slFile)); int lineNum = 0; while (scanner.hasNextLine()){ String[] line = scanner.nextLine().split(sysDim); slaves.add(new PSNode(lineNum++, line[0], line[1], Integer.parseInt(line[2]))); } } catch (FileNotFoundException e1) { // TODO Auto-generated catch block e1.printStackTrace(); } String databaseName = args[0]; String CreateObjectName = args[1]; String CreateQuery = args[2]; String DLF_Name_ListStr = args[3]; String DLF_Loc_ListStr = args[4]; String DLO_Name_ListStr = args[5]; String DLO_Loc_ListStr = args[6]; int duplicateTimes = Integer.parseInt(args[7]); FListKind outputKind = FListKind.values()[Integer.parseInt(args[8])]; String CreateFilePath = args[9]; String InputObjectName = "", PAName = ""; String MapQueryStr = "", Map_DLF_Name_ListStr = "", Map_DLF_Loc_ListStr = "", Map_DLO_Name_ListStr = "", Map_DLO_Loc_ListStr = ""; int mapTasksNum, reduceTasksNum, PSFSMode; if (!runMapper) { InputObjectName = args[10]; PAName = args[11]; reduceTasksNum = Integer.parseInt(args[12]); mapTasksNum = slaves.size(); PSFSMode = Integer.parseInt(args[13]); } else { InputObjectName = args[10]; MapQueryStr = args[11]; Map_DLF_Name_ListStr = args[12]; Map_DLF_Loc_ListStr = args[13]; Map_DLO_Name_ListStr = args[14]; Map_DLO_Loc_ListStr = args[15]; mapTasksNum = Integer.parseInt(args[16]); PAName = args[17]; reduceTasksNum = Integer.parseInt(args[18]); PSFSMode = Integer.parseInt(args[19]); } if (outputKind == FListKind.DLO && reduceTasksNum > slaves.size()){ System.err.println("Warning! It is not allowed to produce DLO flist " + "with columns more than slave data servers."); System.err.println("Change the reduce tasks number from " + reduceTasksNum + " to " + slaves.size() + " compulsively"); reduceTasksNum = slaves.size(); } ListExpr fpList = ListExpr.oneElemList(ListExpr.textAtom(CreateFilePath)); CreateFilePath = fpList.toString().replace('\n', ' '); //In case empty path ListExpr allMapDLFList = new ListExpr(); ListExpr allMapDLOList = new ListExpr(); if (runMapper) { //Prepare the DLO and DLF list for the mapper ListExpr allMap_DLFNameList = new ListExpr(); allMap_DLFNameList.readFromString(Map_DLF_Name_ListStr); ListExpr allMap_DLFLocList = new ListExpr(); allMap_DLFLocList.readFromString(Map_DLF_Loc_ListStr); ListExpr allMap_DLONameList = new ListExpr(); allMap_DLONameList.readFromString(Map_DLO_Name_ListStr); ListExpr allMap_DLOLocList = new ListExpr(); allMap_DLOLocList.readFromString(Map_DLO_Loc_ListStr); // allMapDLFList = HPA_AuxFunctions.flist2Mapper2(allMap_DLFNameList, allMap_DLFLocList, mapTasksNum, slaves.size()); allMapDLFList = HPA_AuxFunctions.flist2Mapper(allMap_DLFNameList, allMap_DLFLocList, mapTasksNum); // allMapDLOList = HPA_AuxFunctions.flist2Mapper2(allMap_DLONameList, allMap_DLOLocList, mapTasksNum, slaves.size()); allMapDLOList = HPA_AuxFunctions.flist2Mapper(allMap_DLONameList, allMap_DLOLocList, mapTasksNum); } /* Different from the HadoopMap operator, the function query here is processed inside the reduce step, therefore, only the function argument is redistributed in the map step, while all other DLF flists are decomposed in the reduce query only. */ //Apply each map task with one row of the fileLoc list. ListExpr allDLFLocList = new ListExpr(); allDLFLocList.readFromString(DLF_Loc_ListStr); ListExpr allDLFNameList = new ListExpr(); allDLFNameList.readFromString(DLF_Name_ListStr); ListExpr DLFByMappers = HPA_AuxFunctions.flist2Mapper(allDLFNameList, allDLFLocList, mapTasksNum); DLFByMappers = HPA_AuxFunctions.divMRDLO(allDLFNameList, DLFByMappers, mapTasksNum); ListExpr allDLONameList = new ListExpr(); allDLONameList.readFromString(DLO_Name_ListStr); ListExpr allDLOLoclList = new ListExpr(); allDLOLoclList.readFromString(DLO_Loc_ListStr); ListExpr DLOByMappers = HPA_AuxFunctions.flist2Mapper(allDLONameList, allDLOLoclList, mapTasksNum); DLOByMappers = HPA_AuxFunctions.divMRDLO(allDLONameList, DLOByMappers, mapTasksNum); //Prepare the input for mappers String inputPath = "INPUT"; String outputPath = "OUTPUT"; Configuration conf = new Configuration(); try { FileSystem.get(conf).delete(new Path(outputPath), true); FileSystem.get(conf).delete(new Path(inputPath), true); ListExpr allDLOMappers = DLOByMappers.first(); //All DLO list for mappers (include name and loc) ListExpr allDLOReducers = DLOByMappers.second(); //All DLO list for reducers (include name and loc) ListExpr allDLFMappers = DLFByMappers.first(); //All DLF list for mappers (include name and loc) ListExpr allDLFReducers = DLFByMappers.second(); //All DLF list for reducers (include name and loc) ListExpr aomn_rest = allDLOMappers.first(); ListExpr aoml_rest = allDLOMappers.second(); ListExpr aorn_rest = allDLOReducers.first(); ListExpr aorl_rest = allDLOReducers.second(); ListExpr afmn_rest = allDLFMappers.first(); ListExpr afml_rest = allDLFMappers.second(); ListExpr afrn_rest = allDLFReducers.first(); ListExpr afrl_rest = allDLFReducers.second(); ListExpr amapDLF_rest = new ListExpr(); ListExpr amapDLO_rest = new ListExpr(); if (runMapper) { amapDLF_rest = allMapDLFList; amapDLO_rest = allMapDLOList; } for (int mapperIdx = 0; mapperIdx < mapTasksNum; mapperIdx++) { boolean allDLOexist = true, allDLFexist = true; ListExpr aomLoc, afmLoc, aomName, afmName; aomLoc = ListExpr.theEmptyList(); afmLoc = ListExpr.theEmptyList(); aomName = ListExpr.theEmptyList(); afmName = ListExpr.theEmptyList(); ListExpr amap_DLF, amap_DLO; //Prepared for running Map stage amap_DLF = ListExpr.theEmptyList(); amap_DLO = ListExpr.theEmptyList(); if (runMapper) { if (!amapDLF_rest.isEmpty()) amap_DLF = amapDLF_rest.first(); if (!amapDLO_rest.isEmpty()) amap_DLO = amapDLO_rest.first(); allDLOexist = HPA_AuxFunctions.allMapperFOExist(amap_DLO); allDLFexist = HPA_AuxFunctions.allMapperFOExist(amap_DLF); } else { if (!aoml_rest.isEmpty()) aomLoc = aoml_rest.first(); if (!afml_rest.isEmpty()) afmLoc = afml_rest.first(); if (!aomn_rest.isEmpty()) aomName = aomn_rest.first(); if (!afmn_rest.isEmpty()) afmName = afmn_rest.first(); if (!aomName.isEmpty()) allDLOexist = HPA_AuxFunctions.allObjectExist(aomName, aomLoc); if (!afmName.isEmpty()) allDLFexist = HPA_AuxFunctions.allObjectExist(afmName, afmLoc); } if (allDLOexist && allDLFexist) { //For mappers String fmnstr = HPA_AuxFunctions.plainStr(afmName); String fmlstr = HPA_AuxFunctions.plainStr(afmLoc); //For reducers ListExpr aorLoc, afrLoc, aorName, afrName; aorLoc = afrLoc = aorName = afrName = ListExpr.theEmptyList(); if (!aorl_rest.isEmpty()) aorLoc = aorl_rest.first(); if (!afrl_rest.isEmpty()) afrLoc = afrl_rest.first(); if (!aorn_rest.isEmpty()) aorName = aorn_rest.first(); if (!afrn_rest.isEmpty()) afrName = afrn_rest.first(); String frlstr = HPA_AuxFunctions.plainStr(afrLoc); String frnstr = HPA_AuxFunctions.plainStr(afrName); String fileName = JOBID + "_INPUT_"+ mapperIdx + ".dat"; PrintWriter out = new PrintWriter( FileSystem.get(conf).create( new Path(inputPath + "/" + fileName))); if (runMapper) { String dlfLocStr = HPA_AuxFunctions.plainStr(amap_DLF); int slaveIdx = HPA_AuxFunctions.findFirstSlave(amap_DLF); if (slaveIdx == 0) { slaveIdx = HPA_AuxFunctions.findFirstSlave(amap_DLO); } out.print("" + slaveIdx + inDim +//0 mapperIdx + inDim +//1 databaseName + inDim +//2 MapQueryStr + inDim +//3 Map_DLF_Name_ListStr + inDim +//4 dlfLocStr + inDim +//5 CreateObjectName + inDim +//6 CreateQuery + inDim +//7 fmnstr + inDim +//8 fmlstr + inDim +//9 frnstr + inDim +//10 frlstr + inDim +//11 duplicateTimes + inDim +//12 outputKind.ordinal() + inDim +//13 CreateFilePath + inDim +//14 InputObjectName + inDim +//15 PAName + inDim +//16 PSFSMode //17 ); out.close(); } else { out.print( "" + mapperIdx + inDim + databaseName + inDim + CreateObjectName + inDim + CreateQuery + inDim + fmnstr + inDim + fmlstr + inDim + frnstr + inDim + frlstr + inDim + duplicateTimes + inDim + outputKind.ordinal() + inDim + CreateFilePath + inDim + InputObjectName + inDim + PAName + inDim + PSFSMode ); out.close(); } } if (!aorn_rest.isEmpty()) aorn_rest = aorn_rest.rest(); if (!aorl_rest.isEmpty()) aorl_rest = aorl_rest.rest(); if (!afrn_rest.isEmpty()) afrn_rest = afrn_rest.rest(); if (!afrl_rest.isEmpty()) afrl_rest = afrl_rest.rest(); if (runMapper) { if (!amapDLO_rest.isEmpty()) amapDLO_rest = amapDLO_rest.rest(); if (!amapDLF_rest.isEmpty()) amapDLF_rest = amapDLF_rest.rest(); } else { if (!aomn_rest.isEmpty()) aomn_rest = aomn_rest.rest(); if (!aoml_rest.isEmpty()) aoml_rest = aoml_rest.rest(); if (!afmn_rest.isEmpty()) afmn_rest = afmn_rest.rest(); if (!afml_rest.isEmpty()) afml_rest = afml_rest.rest(); } } } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); System.exit(-1); } //Create the job try { Job job = new Job(); job.setJarByClass(PS_HadoopMap.class); FileInputFormat.addInputPath(job, new Path(inputPath)); FileOutputFormat.setOutputPath(job, new Path(outputPath)); if (runMapper) job.setMapperClass(PS_HadoopReduce_QMap.class); else job.setMapperClass(PS_HadoopReduce_Map.class); job.setReducerClass(PS_HadoopReduce_Reduce.class); job.setMapOutputKeyClass(IntWritable.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(IntWritable.class); job.setOutputValueClass(Text.class); job.setJobName(JOBID); job.setNumReduceTasks(reduceTasksNum); System.exit(job.waitForCompletion(true) ? 0 : 1); } catch (IOException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } catch (ClassNotFoundException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }