Files
secondo/Algebras/HadoopParallel/BerlinMOD_Parallel_Generator.sec
2026-01-23 17:03:45 +08:00

286 lines
9.6 KiB
Plaintext

delete database bm_05;
create database bm_05;
open database bm_05;
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# Prepare Parameters #
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
let SCALEFACTOR = 0.5;
let CLUSTER_SCALE = 12;
let DATASCALE = 100;
let P_SAMPLESIZE = 100;
let REDUCE_SCALE = CLUSTER_SCALE * 3;
# let P_NUMCARS = real2int(round((2000 * SCALEFCARS),0));
let P_NUMCARS = DATASCALE;
let P_STARTDAY = 2700;
let SCALEFDAYS = sqrt(SCALEFACTOR);
let P_NUMDAYS = real2int(round((SCALEFDAYS*28),0));
let DATAPATH = '/mnt/diskb/psec2/backup_berlinMOD';
let DATANAME = "dscar";
# Only pick up the head 100, to reduce the complexity of queries
let dataScar = DATANAME ffeed[DATAPATH,1;;] head[DATASCALE] consume;
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# Prepare the database
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
let dataSCcar = dataScar feed projectextend[Licence, Type, Model; Journey: .Trip] consume;
let dataSCcar_List = dataScar feed
projectextend[Licence, Type, Model; Journey: .Trip]
extend[SlaveID: hashvalue(.Licence, CLUSTER_SCALE) + 1]
spread[;Licence, CLUSTER_SCALE, TRUE;]
hadoopMap["SubDataSCcar"; . consume];
# Total runtime ... Times (elapsed / cpu): 48.4099sec / 2.99sec = 16.1906
#+++Verification+++ To verify the data have been distributed correctly
query dataSCcar_List hadoopMap[DLF; . feed ] collect[] count
# 100
# Create B-Tree based on licence
let dataSCcar_Licence_btree = dataSCcar createbtree[Licence];
let dataSCcar_Licence_btree_List = dataSCcar_List hadoopMap[ ; . createbtree[Licence] ];
# Create temporal R-Tree based on units' definition time
let dataSCcar_Journey_tmpuni =
dataSCcar feed
projectextend[Journey ; TID: tupleid(.)]
projectextendstream[TID; MBR: units(.Journey)
use[fun(U: upoint) point2d(deftime(U)) ]]
sortby[MBR asc]
bulkloadrtree[MBR];
# Total runtime ... Times (elapsed / cpu): 38.8779sec / 35.24sec = 1.10323
let dataSCcar_Journey_tmpuni_List =
dataSCcar_List hadoopMap[ ; .
feed projectextend[Journey ; TID: tupleid(.)]
projectextendstream[TID; MBR:
units(.Journey) use[fun(U: upoint) point2d(deftime(U)) ]]
sortby[MBR asc] bulkloadrtree[MBR]
];
# Total runtime ... Times (elapsed / cpu): 34.7612sec / 0.04sec = 869.03
# Create 2D Spatial R-Tree based units' bounding boxes
let dataSCcar_Journey_sptuni =
dataSCcar feed projectextend[Journey ; TID: tupleid(.)]
projectextendstream[TID
; MBR: units(.Journey) use[fun(U: upoint) bbox2d(U) ]]
sortby[MBR asc] bulkloadrtree[MBR];
# Total runtime ... Times (elapsed / cpu): 45.2348sec / 42.89sec = 1.05467
let dataSCcar_Journey_sptuni_List = dataSCcar_List hadoopMap[ ; .
feed projectextend[Journey ; TID: tupleid(.)]
projectextendstream[TID; MBR:
units(.Journey) use[fun(U: upoint) bbox2d(U) ]]
sortby[MBR asc] bulkloadrtree[MBR]
];
# Total runtime ... Times (elapsed / cpu): 34.7387sec / 0.06sec = 578.979
# Create 3D Spatio-temporal R-Tree based on units' bounding boxes
let dataSCcar_Journey_sptmpuni =
dataSCcar feed
projectextend[Journey ; TID: tupleid(.)]
projectextendstream[TID; MBR: units(.Journey)
use[fun(U: upoint) bbox(U) ]]
sortby[MBR asc]
bulkloadrtree[MBR];
# Total runtime ... Times (elapsed / cpu): 58.5181sec / 48.91sec = 1.19645
let dataSCcar_Journey_sptmpuni_List =
dataSCcar_List hadoopMap[ ; .
feed projectextend[Journey ; TID: tupleid(.)]
projectextendstream[TID; MBR:
units(.Journey) use[fun(U: upoint) bbox(U) ]]
sortby[MBR asc] bulkloadrtree[MBR]
];
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# Parameters for Global Cell-Grid #
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
let Car_Grid_Box = dataSCcar_List hadoopMap[DLF; .
feed projectextendstream[; Box: units(.Journey)
use[fun(U: upoint) bbox(U)]]
aggregateB[Box; fun(R1:rect3, R2:rect3)
R1 union R2;[const rect3 value undef]]
feed namedtransformstream[PartGrid]
] collect[]
aggregateB[PartGrid; fun(GR1:rect3, GR2:rect3)
GR1 union GR2; [const rect3 value undef]];
# rect: ( (-9606,-3563,2699) - (32842,27204,2721) )
# Total runtime ... Times (elapsed / cpu): 32.5842sec / 0.14sec = 232.744
#+++Verification+++ To verify the query result of the global grid
query dataScar feed
projectextendstream[; UTrip: units(.Trip)]
projectextend[;Box: bbox(.UTrip)]
aggregateB[Box; fun(R1:rect3, R2:rect3)
R1 union R2;[const rect3 value undef]];
# rect: ( (-9606,-3563,2699) - (32842,27204,2721) )
# Total runtime ... Times (elapsed / cpu): 26.6602sec / 22.88sec = 1.16522
## Here only the sequential cost of the aggregation on 100 trajectories
## is cost to the parallel cost.
let CAR_WORLD_XSIZE = maxD(Car_Grid_Box, 1) - minD(Car_Grid_Box, 1);
let CAR_WORLD_YSIZE = maxD(Car_Grid_Box, 2) - minD(Car_Grid_Box, 2);
let CAR_WORLD_TSIZE = maxD(Car_Grid_Box, 3) - minD(Car_Grid_Box, 3);
let CAR_WORLD_MAXSIZE = getMaxVal(
CAR_WORLD_XSIZE, CAR_WORLD_YSIZE, CAR_WORLD_TSIZE);
let CAR_WORLD_X_SCALE = CAR_WORLD_MAXSIZE / CAR_WORLD_XSIZE;
let CAR_WORLD_Y_SCALE = CAR_WORLD_MAXSIZE / CAR_WORLD_YSIZE;
let CAR_WORLD_T_SCALE = CAR_WORLD_MAXSIZE / CAR_WORLD_TSIZE;
let CAR_WORLD_SCALE_BOX = fun(R: rect3)
scalerect(R, CAR_WORLD_X_SCALE, CAR_WORLD_Y_SCALE, CAR_WORLD_T_SCALE);
let CellNum = real2int(sqrt(int2real(dataScar count)));
let CellSize = CAR_WORLD_MAXSIZE / CellNum;
let CAR_WORLD_GRID_LBP_X = minD(CAR_WORLD_SCALE_BOX(Car_Grid_Box), 1);
let CAR_WORLD_GRID_LBP_Y = minD(CAR_WORLD_SCALE_BOX(Car_Grid_Box), 2);
let CAR_WORLD_GRID_LBP_T = minD(CAR_WORLD_SCALE_BOX(Car_Grid_Box), 3);
let CAR_WORLD_GRID = createCellGrid3D(
CAR_WORLD_GRID_LBP_X, CAR_WORLD_GRID_LBP_Y, CAR_WORLD_GRID_LBP_T,
CellSize, CellSize, CellSize, CellNum, CellNum );
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
# Prepare Query Samples #
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
#********* QueryLicences *************************************#
#*************************************************************#
let LicenceList =
dataScar feed
project[Licence]
addcounter[Id,1]
consume;
let LicenceList_Id = LicenceList createbtree[Id];
let QueryLicences =
intstream(1, P_SAMPLESIZE) namedtransformstream[Id1]
loopjoin[ LicenceList_Id LicenceList exactmatch[rng_intN(P_NUMCARS) + 1] ]
projectextend[Licence; Id: .Id1]
consume;
let QueryLicences_List = QueryLicences feed
spread[;Licence,CLUSTER_SCALE,TRUE;]
hadoopMap["SQ_Licences"; . consume];
let QueryLicences_Top10_List = QueryLicences feed
head[10]
spread[;Licence,CLUSTER_SCALE,TRUE;]
hadoopMap["SQ_T10_Licences"; . consume];
let QueryLicences_STop10_Dup_List = QueryLicences feed
head[20] filter[.Id>10]
spread[;Licence,CLUSTER_SCALE,TRUE;]
hadoopMap["SQ_ST10_Licences"; . consume];
#********* QueryPoints ***************************************#
#*************************************************************#
restore streets from 'streets.data';
let P_MINVELOCITY = 0.04166666666666666667;
let streets1 =
streets feed
filter[ .Vmax > P_MINVELOCITY ]
addcounter[StreetId, 1]
project[StreetId, Vmax, GeoData]
consume;
let allstreets1 =
streets1 feed
projecttransformstream[GeoData] collect_line[TRUE];
let allstreets =
components(allstreets1) transformstream
extend[NoSeg: no_segments(.Elem)]
sortby[NoSeg desc]
extract[Elem];
let Crossings =
( streets1 feed {s1}
streets1 feed {s2}
spatialjoin[GeoData_s1, GeoData_s2]
filter[.StreetId_s1 < .StreetId_s2]
extend[Crossroads: crossings(.GeoData_s1, .GeoData_s2)]
project[Crossroads]
filter[not(isempty(.Crossroads))]
aggregateB[Crossroads; fun(P1: points, P2: points)
P1 union P2; [const points value ()]]
)
union
(streets1 feed
projectextend[; B : boundary(.GeoData)]
aggregateB[B; fun(P3 : points, P4 : points)
P3 union P4; [const points value ()]]
);
let sections2 =
allstreets polylines[FALSE, Crossings]
namedtransformstream[Part]
consume;
let nodes2 =
sections2 feed
projectextend[; EndPoints: boundary(.Part)]
aggregateB[EndPoints; fun(P1: points, P2: points)
P1 union P2 ; [const points value ()]];
let QueryPoints =
intstream(1, P_SAMPLESIZE)
namedtransformstream[Id]
extend[Pos: get(nodes2, rng_intN(no_components(nodes2)) )]
consume;
#********* QueryRegions **************************************#
#*************************************************************#
let QueryRegions =
intstream(1, P_SAMPLESIZE)
namedtransformstream[Id]
extend[Region: circle(
get(nodes2, rng_intN(no_components(nodes2))),
rng_intN(997) + 3.0,
rng_intN(98) + 3)]
consume;
#********* QueryPeriods **************************************#
#*************************************************************#
let QueryPeriods =
intstream(1, P_SAMPLESIZE)
namedtransformstream[Id]
extend[
StartInstant: create_instant(P_STARTDAY, 0)
+ create_duration(rng_real() * P_NUMDAYS ),
Duration: create_duration(abs(rng_gaussian(1.0)))
]
projectextend[Id; Period: theRange(
.StartInstant, .StartInstant + .Duration,
TRUE, TRUE)]
consume;
#********* QueryInstants *************************************#
#*************************************************************#
let QueryInstants =
intstream(1, P_SAMPLESIZE)
namedtransformstream[Id]
extend[Instant: create_instant(P_STARTDAY, 0)
+ create_duration(rng_real() * P_NUMDAYS )]
consume;
close database;