delete database bm_05; create database bm_05; open database bm_05; # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # Prepare Parameters # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # let SCALEFACTOR = 0.5; let CLUSTER_SCALE = 12; let DATASCALE = 100; let P_SAMPLESIZE = 100; let REDUCE_SCALE = CLUSTER_SCALE * 3; # let P_NUMCARS = real2int(round((2000 * SCALEFCARS),0)); let P_NUMCARS = DATASCALE; let P_STARTDAY = 2700; let SCALEFDAYS = sqrt(SCALEFACTOR); let P_NUMDAYS = real2int(round((SCALEFDAYS*28),0)); let DATAPATH = '/mnt/diskb/psec2/backup_berlinMOD'; let DATANAME = "dscar"; # Only pick up the head 100, to reduce the complexity of queries let dataScar = DATANAME ffeed[DATAPATH,1;;] head[DATASCALE] consume; # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # Prepare the database # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # let dataSCcar = dataScar feed projectextend[Licence, Type, Model; Journey: .Trip] consume; let dataSCcar_List = dataScar feed projectextend[Licence, Type, Model; Journey: .Trip] extend[SlaveID: hashvalue(.Licence, CLUSTER_SCALE) + 1] spread[;Licence, CLUSTER_SCALE, TRUE;] hadoopMap["SubDataSCcar"; . consume]; # Total runtime ... Times (elapsed / cpu): 48.4099sec / 2.99sec = 16.1906 #+++Verification+++ To verify the data have been distributed correctly query dataSCcar_List hadoopMap[DLF; . feed ] collect[] count # 100 # Create B-Tree based on licence let dataSCcar_Licence_btree = dataSCcar createbtree[Licence]; let dataSCcar_Licence_btree_List = dataSCcar_List hadoopMap[ ; . createbtree[Licence] ]; # Create temporal R-Tree based on units' definition time let dataSCcar_Journey_tmpuni = dataSCcar feed projectextend[Journey ; TID: tupleid(.)] projectextendstream[TID; MBR: units(.Journey) use[fun(U: upoint) point2d(deftime(U)) ]] sortby[MBR asc] bulkloadrtree[MBR]; # Total runtime ... Times (elapsed / cpu): 38.8779sec / 35.24sec = 1.10323 let dataSCcar_Journey_tmpuni_List = dataSCcar_List hadoopMap[ ; . feed projectextend[Journey ; TID: tupleid(.)] projectextendstream[TID; MBR: units(.Journey) use[fun(U: upoint) point2d(deftime(U)) ]] sortby[MBR asc] bulkloadrtree[MBR] ]; # Total runtime ... Times (elapsed / cpu): 34.7612sec / 0.04sec = 869.03 # Create 2D Spatial R-Tree based units' bounding boxes let dataSCcar_Journey_sptuni = dataSCcar feed projectextend[Journey ; TID: tupleid(.)] projectextendstream[TID ; MBR: units(.Journey) use[fun(U: upoint) bbox2d(U) ]] sortby[MBR asc] bulkloadrtree[MBR]; # Total runtime ... Times (elapsed / cpu): 45.2348sec / 42.89sec = 1.05467 let dataSCcar_Journey_sptuni_List = dataSCcar_List hadoopMap[ ; . feed projectextend[Journey ; TID: tupleid(.)] projectextendstream[TID; MBR: units(.Journey) use[fun(U: upoint) bbox2d(U) ]] sortby[MBR asc] bulkloadrtree[MBR] ]; # Total runtime ... Times (elapsed / cpu): 34.7387sec / 0.06sec = 578.979 # Create 3D Spatio-temporal R-Tree based on units' bounding boxes let dataSCcar_Journey_sptmpuni = dataSCcar feed projectextend[Journey ; TID: tupleid(.)] projectextendstream[TID; MBR: units(.Journey) use[fun(U: upoint) bbox(U) ]] sortby[MBR asc] bulkloadrtree[MBR]; # Total runtime ... Times (elapsed / cpu): 58.5181sec / 48.91sec = 1.19645 let dataSCcar_Journey_sptmpuni_List = dataSCcar_List hadoopMap[ ; . feed projectextend[Journey ; TID: tupleid(.)] projectextendstream[TID; MBR: units(.Journey) use[fun(U: upoint) bbox(U) ]] sortby[MBR asc] bulkloadrtree[MBR] ]; # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # Parameters for Global Cell-Grid # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # let Car_Grid_Box = dataSCcar_List hadoopMap[DLF; . feed projectextendstream[; Box: units(.Journey) use[fun(U: upoint) bbox(U)]] aggregateB[Box; fun(R1:rect3, R2:rect3) R1 union R2;[const rect3 value undef]] feed namedtransformstream[PartGrid] ] collect[] aggregateB[PartGrid; fun(GR1:rect3, GR2:rect3) GR1 union GR2; [const rect3 value undef]]; # rect: ( (-9606,-3563,2699) - (32842,27204,2721) ) # Total runtime ... Times (elapsed / cpu): 32.5842sec / 0.14sec = 232.744 #+++Verification+++ To verify the query result of the global grid query dataScar feed projectextendstream[; UTrip: units(.Trip)] projectextend[;Box: bbox(.UTrip)] aggregateB[Box; fun(R1:rect3, R2:rect3) R1 union R2;[const rect3 value undef]]; # rect: ( (-9606,-3563,2699) - (32842,27204,2721) ) # Total runtime ... Times (elapsed / cpu): 26.6602sec / 22.88sec = 1.16522 ## Here only the sequential cost of the aggregation on 100 trajectories ## is cost to the parallel cost. let CAR_WORLD_XSIZE = maxD(Car_Grid_Box, 1) - minD(Car_Grid_Box, 1); let CAR_WORLD_YSIZE = maxD(Car_Grid_Box, 2) - minD(Car_Grid_Box, 2); let CAR_WORLD_TSIZE = maxD(Car_Grid_Box, 3) - minD(Car_Grid_Box, 3); let CAR_WORLD_MAXSIZE = getMaxVal( CAR_WORLD_XSIZE, CAR_WORLD_YSIZE, CAR_WORLD_TSIZE); let CAR_WORLD_X_SCALE = CAR_WORLD_MAXSIZE / CAR_WORLD_XSIZE; let CAR_WORLD_Y_SCALE = CAR_WORLD_MAXSIZE / CAR_WORLD_YSIZE; let CAR_WORLD_T_SCALE = CAR_WORLD_MAXSIZE / CAR_WORLD_TSIZE; let CAR_WORLD_SCALE_BOX = fun(R: rect3) scalerect(R, CAR_WORLD_X_SCALE, CAR_WORLD_Y_SCALE, CAR_WORLD_T_SCALE); let CellNum = real2int(sqrt(int2real(dataScar count))); let CellSize = CAR_WORLD_MAXSIZE / CellNum; let CAR_WORLD_GRID_LBP_X = minD(CAR_WORLD_SCALE_BOX(Car_Grid_Box), 1); let CAR_WORLD_GRID_LBP_Y = minD(CAR_WORLD_SCALE_BOX(Car_Grid_Box), 2); let CAR_WORLD_GRID_LBP_T = minD(CAR_WORLD_SCALE_BOX(Car_Grid_Box), 3); let CAR_WORLD_GRID = createCellGrid3D( CAR_WORLD_GRID_LBP_X, CAR_WORLD_GRID_LBP_Y, CAR_WORLD_GRID_LBP_T, CellSize, CellSize, CellSize, CellNum, CellNum ); # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # Prepare Query Samples # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #********* QueryLicences *************************************# #*************************************************************# let LicenceList = dataScar feed project[Licence] addcounter[Id,1] consume; let LicenceList_Id = LicenceList createbtree[Id]; let QueryLicences = intstream(1, P_SAMPLESIZE) namedtransformstream[Id1] loopjoin[ LicenceList_Id LicenceList exactmatch[rng_intN(P_NUMCARS) + 1] ] projectextend[Licence; Id: .Id1] consume; let QueryLicences_List = QueryLicences feed spread[;Licence,CLUSTER_SCALE,TRUE;] hadoopMap["SQ_Licences"; . consume]; let QueryLicences_Top10_List = QueryLicences feed head[10] spread[;Licence,CLUSTER_SCALE,TRUE;] hadoopMap["SQ_T10_Licences"; . consume]; let QueryLicences_STop10_Dup_List = QueryLicences feed head[20] filter[.Id>10] spread[;Licence,CLUSTER_SCALE,TRUE;] hadoopMap["SQ_ST10_Licences"; . consume]; #********* QueryPoints ***************************************# #*************************************************************# restore streets from 'streets.data'; let P_MINVELOCITY = 0.04166666666666666667; let streets1 = streets feed filter[ .Vmax > P_MINVELOCITY ] addcounter[StreetId, 1] project[StreetId, Vmax, GeoData] consume; let allstreets1 = streets1 feed projecttransformstream[GeoData] collect_line[TRUE]; let allstreets = components(allstreets1) transformstream extend[NoSeg: no_segments(.Elem)] sortby[NoSeg desc] extract[Elem]; let Crossings = ( streets1 feed {s1} streets1 feed {s2} spatialjoin[GeoData_s1, GeoData_s2] filter[.StreetId_s1 < .StreetId_s2] extend[Crossroads: crossings(.GeoData_s1, .GeoData_s2)] project[Crossroads] filter[not(isempty(.Crossroads))] aggregateB[Crossroads; fun(P1: points, P2: points) P1 union P2; [const points value ()]] ) union (streets1 feed projectextend[; B : boundary(.GeoData)] aggregateB[B; fun(P3 : points, P4 : points) P3 union P4; [const points value ()]] ); let sections2 = allstreets polylines[FALSE, Crossings] namedtransformstream[Part] consume; let nodes2 = sections2 feed projectextend[; EndPoints: boundary(.Part)] aggregateB[EndPoints; fun(P1: points, P2: points) P1 union P2 ; [const points value ()]]; let QueryPoints = intstream(1, P_SAMPLESIZE) namedtransformstream[Id] extend[Pos: get(nodes2, rng_intN(no_components(nodes2)) )] consume; #********* QueryRegions **************************************# #*************************************************************# let QueryRegions = intstream(1, P_SAMPLESIZE) namedtransformstream[Id] extend[Region: circle( get(nodes2, rng_intN(no_components(nodes2))), rng_intN(997) + 3.0, rng_intN(98) + 3)] consume; #********* QueryPeriods **************************************# #*************************************************************# let QueryPeriods = intstream(1, P_SAMPLESIZE) namedtransformstream[Id] extend[ StartInstant: create_instant(P_STARTDAY, 0) + create_duration(rng_real() * P_NUMDAYS ), Duration: create_duration(abs(rng_gaussian(1.0))) ] projectextend[Id; Period: theRange( .StartInstant, .StartInstant + .Duration, TRUE, TRUE)] consume; #********* QueryInstants *************************************# #*************************************************************# let QueryInstants = intstream(1, P_SAMPLESIZE) namedtransformstream[Id] extend[Instant: create_instant(P_STARTDAY, 0) + create_duration(rng_real() * P_NUMDAYS )] consume; close database;