286 lines
9.6 KiB
Plaintext
286 lines
9.6 KiB
Plaintext
|
|
delete database bm_05;
|
||
|
|
|
||
|
|
create database bm_05;
|
||
|
|
|
||
|
|
open database bm_05;
|
||
|
|
|
||
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
||
|
|
# Prepare Parameters #
|
||
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
||
|
|
|
||
|
|
let SCALEFACTOR = 0.5;
|
||
|
|
let CLUSTER_SCALE = 12;
|
||
|
|
let DATASCALE = 100;
|
||
|
|
let P_SAMPLESIZE = 100;
|
||
|
|
let REDUCE_SCALE = CLUSTER_SCALE * 3;
|
||
|
|
|
||
|
|
# let P_NUMCARS = real2int(round((2000 * SCALEFCARS),0));
|
||
|
|
let P_NUMCARS = DATASCALE;
|
||
|
|
let P_STARTDAY = 2700;
|
||
|
|
let SCALEFDAYS = sqrt(SCALEFACTOR);
|
||
|
|
let P_NUMDAYS = real2int(round((SCALEFDAYS*28),0));
|
||
|
|
|
||
|
|
let DATAPATH = '/mnt/diskb/psec2/backup_berlinMOD';
|
||
|
|
let DATANAME = "dscar";
|
||
|
|
|
||
|
|
# Only pick up the head 100, to reduce the complexity of queries
|
||
|
|
let dataScar = DATANAME ffeed[DATAPATH,1;;] head[DATASCALE] consume;
|
||
|
|
|
||
|
|
|
||
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
||
|
|
# Prepare the database
|
||
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
||
|
|
|
||
|
|
|
||
|
|
let dataSCcar = dataScar feed projectextend[Licence, Type, Model; Journey: .Trip] consume;
|
||
|
|
let dataSCcar_List = dataScar feed
|
||
|
|
projectextend[Licence, Type, Model; Journey: .Trip]
|
||
|
|
extend[SlaveID: hashvalue(.Licence, CLUSTER_SCALE) + 1]
|
||
|
|
spread[;Licence, CLUSTER_SCALE, TRUE;]
|
||
|
|
hadoopMap["SubDataSCcar"; . consume];
|
||
|
|
# Total runtime ... Times (elapsed / cpu): 48.4099sec / 2.99sec = 16.1906
|
||
|
|
|
||
|
|
#+++Verification+++ To verify the data have been distributed correctly
|
||
|
|
query dataSCcar_List hadoopMap[DLF; . feed ] collect[] count
|
||
|
|
# 100
|
||
|
|
|
||
|
|
# Create B-Tree based on licence
|
||
|
|
let dataSCcar_Licence_btree = dataSCcar createbtree[Licence];
|
||
|
|
let dataSCcar_Licence_btree_List = dataSCcar_List hadoopMap[ ; . createbtree[Licence] ];
|
||
|
|
|
||
|
|
# Create temporal R-Tree based on units' definition time
|
||
|
|
let dataSCcar_Journey_tmpuni =
|
||
|
|
dataSCcar feed
|
||
|
|
projectextend[Journey ; TID: tupleid(.)]
|
||
|
|
projectextendstream[TID; MBR: units(.Journey)
|
||
|
|
use[fun(U: upoint) point2d(deftime(U)) ]]
|
||
|
|
sortby[MBR asc]
|
||
|
|
bulkloadrtree[MBR];
|
||
|
|
# Total runtime ... Times (elapsed / cpu): 38.8779sec / 35.24sec = 1.10323
|
||
|
|
let dataSCcar_Journey_tmpuni_List =
|
||
|
|
dataSCcar_List hadoopMap[ ; .
|
||
|
|
feed projectextend[Journey ; TID: tupleid(.)]
|
||
|
|
projectextendstream[TID; MBR:
|
||
|
|
units(.Journey) use[fun(U: upoint) point2d(deftime(U)) ]]
|
||
|
|
sortby[MBR asc] bulkloadrtree[MBR]
|
||
|
|
];
|
||
|
|
# Total runtime ... Times (elapsed / cpu): 34.7612sec / 0.04sec = 869.03
|
||
|
|
|
||
|
|
# Create 2D Spatial R-Tree based units' bounding boxes
|
||
|
|
let dataSCcar_Journey_sptuni =
|
||
|
|
dataSCcar feed projectextend[Journey ; TID: tupleid(.)]
|
||
|
|
projectextendstream[TID
|
||
|
|
; MBR: units(.Journey) use[fun(U: upoint) bbox2d(U) ]]
|
||
|
|
sortby[MBR asc] bulkloadrtree[MBR];
|
||
|
|
# Total runtime ... Times (elapsed / cpu): 45.2348sec / 42.89sec = 1.05467
|
||
|
|
let dataSCcar_Journey_sptuni_List = dataSCcar_List hadoopMap[ ; .
|
||
|
|
feed projectextend[Journey ; TID: tupleid(.)]
|
||
|
|
projectextendstream[TID; MBR:
|
||
|
|
units(.Journey) use[fun(U: upoint) bbox2d(U) ]]
|
||
|
|
sortby[MBR asc] bulkloadrtree[MBR]
|
||
|
|
];
|
||
|
|
# Total runtime ... Times (elapsed / cpu): 34.7387sec / 0.06sec = 578.979
|
||
|
|
|
||
|
|
# Create 3D Spatio-temporal R-Tree based on units' bounding boxes
|
||
|
|
let dataSCcar_Journey_sptmpuni =
|
||
|
|
dataSCcar feed
|
||
|
|
projectextend[Journey ; TID: tupleid(.)]
|
||
|
|
projectextendstream[TID; MBR: units(.Journey)
|
||
|
|
use[fun(U: upoint) bbox(U) ]]
|
||
|
|
sortby[MBR asc]
|
||
|
|
bulkloadrtree[MBR];
|
||
|
|
# Total runtime ... Times (elapsed / cpu): 58.5181sec / 48.91sec = 1.19645
|
||
|
|
let dataSCcar_Journey_sptmpuni_List =
|
||
|
|
dataSCcar_List hadoopMap[ ; .
|
||
|
|
feed projectextend[Journey ; TID: tupleid(.)]
|
||
|
|
projectextendstream[TID; MBR:
|
||
|
|
units(.Journey) use[fun(U: upoint) bbox(U) ]]
|
||
|
|
sortby[MBR asc] bulkloadrtree[MBR]
|
||
|
|
];
|
||
|
|
|
||
|
|
|
||
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
||
|
|
# Parameters for Global Cell-Grid #
|
||
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
||
|
|
|
||
|
|
|
||
|
|
let Car_Grid_Box = dataSCcar_List hadoopMap[DLF; .
|
||
|
|
feed projectextendstream[; Box: units(.Journey)
|
||
|
|
use[fun(U: upoint) bbox(U)]]
|
||
|
|
aggregateB[Box; fun(R1:rect3, R2:rect3)
|
||
|
|
R1 union R2;[const rect3 value undef]]
|
||
|
|
feed namedtransformstream[PartGrid]
|
||
|
|
] collect[]
|
||
|
|
aggregateB[PartGrid; fun(GR1:rect3, GR2:rect3)
|
||
|
|
GR1 union GR2; [const rect3 value undef]];
|
||
|
|
|
||
|
|
# rect: ( (-9606,-3563,2699) - (32842,27204,2721) )
|
||
|
|
# Total runtime ... Times (elapsed / cpu): 32.5842sec / 0.14sec = 232.744
|
||
|
|
|
||
|
|
#+++Verification+++ To verify the query result of the global grid
|
||
|
|
query dataScar feed
|
||
|
|
projectextendstream[; UTrip: units(.Trip)]
|
||
|
|
projectextend[;Box: bbox(.UTrip)]
|
||
|
|
aggregateB[Box; fun(R1:rect3, R2:rect3)
|
||
|
|
R1 union R2;[const rect3 value undef]];
|
||
|
|
# rect: ( (-9606,-3563,2699) - (32842,27204,2721) )
|
||
|
|
# Total runtime ... Times (elapsed / cpu): 26.6602sec / 22.88sec = 1.16522
|
||
|
|
## Here only the sequential cost of the aggregation on 100 trajectories
|
||
|
|
## is cost to the parallel cost.
|
||
|
|
|
||
|
|
let CAR_WORLD_XSIZE = maxD(Car_Grid_Box, 1) - minD(Car_Grid_Box, 1);
|
||
|
|
|
||
|
|
let CAR_WORLD_YSIZE = maxD(Car_Grid_Box, 2) - minD(Car_Grid_Box, 2);
|
||
|
|
|
||
|
|
let CAR_WORLD_TSIZE = maxD(Car_Grid_Box, 3) - minD(Car_Grid_Box, 3);
|
||
|
|
|
||
|
|
let CAR_WORLD_MAXSIZE = getMaxVal(
|
||
|
|
CAR_WORLD_XSIZE, CAR_WORLD_YSIZE, CAR_WORLD_TSIZE);
|
||
|
|
|
||
|
|
let CAR_WORLD_X_SCALE = CAR_WORLD_MAXSIZE / CAR_WORLD_XSIZE;
|
||
|
|
|
||
|
|
let CAR_WORLD_Y_SCALE = CAR_WORLD_MAXSIZE / CAR_WORLD_YSIZE;
|
||
|
|
|
||
|
|
let CAR_WORLD_T_SCALE = CAR_WORLD_MAXSIZE / CAR_WORLD_TSIZE;
|
||
|
|
|
||
|
|
let CAR_WORLD_SCALE_BOX = fun(R: rect3)
|
||
|
|
scalerect(R, CAR_WORLD_X_SCALE, CAR_WORLD_Y_SCALE, CAR_WORLD_T_SCALE);
|
||
|
|
|
||
|
|
let CellNum = real2int(sqrt(int2real(dataScar count)));
|
||
|
|
|
||
|
|
let CellSize = CAR_WORLD_MAXSIZE / CellNum;
|
||
|
|
|
||
|
|
let CAR_WORLD_GRID_LBP_X = minD(CAR_WORLD_SCALE_BOX(Car_Grid_Box), 1);
|
||
|
|
|
||
|
|
let CAR_WORLD_GRID_LBP_Y = minD(CAR_WORLD_SCALE_BOX(Car_Grid_Box), 2);
|
||
|
|
|
||
|
|
let CAR_WORLD_GRID_LBP_T = minD(CAR_WORLD_SCALE_BOX(Car_Grid_Box), 3);
|
||
|
|
|
||
|
|
let CAR_WORLD_GRID = createCellGrid3D(
|
||
|
|
CAR_WORLD_GRID_LBP_X, CAR_WORLD_GRID_LBP_Y, CAR_WORLD_GRID_LBP_T,
|
||
|
|
CellSize, CellSize, CellSize, CellNum, CellNum );
|
||
|
|
|
||
|
|
|
||
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
||
|
|
# Prepare Query Samples #
|
||
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
||
|
|
|
||
|
|
#********* QueryLicences *************************************#
|
||
|
|
#*************************************************************#
|
||
|
|
let LicenceList =
|
||
|
|
dataScar feed
|
||
|
|
project[Licence]
|
||
|
|
addcounter[Id,1]
|
||
|
|
consume;
|
||
|
|
|
||
|
|
let LicenceList_Id = LicenceList createbtree[Id];
|
||
|
|
|
||
|
|
let QueryLicences =
|
||
|
|
intstream(1, P_SAMPLESIZE) namedtransformstream[Id1]
|
||
|
|
loopjoin[ LicenceList_Id LicenceList exactmatch[rng_intN(P_NUMCARS) + 1] ]
|
||
|
|
projectextend[Licence; Id: .Id1]
|
||
|
|
consume;
|
||
|
|
|
||
|
|
let QueryLicences_List = QueryLicences feed
|
||
|
|
spread[;Licence,CLUSTER_SCALE,TRUE;]
|
||
|
|
hadoopMap["SQ_Licences"; . consume];
|
||
|
|
|
||
|
|
let QueryLicences_Top10_List = QueryLicences feed
|
||
|
|
head[10]
|
||
|
|
spread[;Licence,CLUSTER_SCALE,TRUE;]
|
||
|
|
hadoopMap["SQ_T10_Licences"; . consume];
|
||
|
|
|
||
|
|
let QueryLicences_STop10_Dup_List = QueryLicences feed
|
||
|
|
head[20] filter[.Id>10]
|
||
|
|
spread[;Licence,CLUSTER_SCALE,TRUE;]
|
||
|
|
hadoopMap["SQ_ST10_Licences"; . consume];
|
||
|
|
|
||
|
|
#********* QueryPoints ***************************************#
|
||
|
|
#*************************************************************#
|
||
|
|
restore streets from 'streets.data';
|
||
|
|
let P_MINVELOCITY = 0.04166666666666666667;
|
||
|
|
let streets1 =
|
||
|
|
streets feed
|
||
|
|
filter[ .Vmax > P_MINVELOCITY ]
|
||
|
|
addcounter[StreetId, 1]
|
||
|
|
project[StreetId, Vmax, GeoData]
|
||
|
|
consume;
|
||
|
|
let allstreets1 =
|
||
|
|
streets1 feed
|
||
|
|
projecttransformstream[GeoData] collect_line[TRUE];
|
||
|
|
let allstreets =
|
||
|
|
components(allstreets1) transformstream
|
||
|
|
extend[NoSeg: no_segments(.Elem)]
|
||
|
|
sortby[NoSeg desc]
|
||
|
|
extract[Elem];
|
||
|
|
let Crossings =
|
||
|
|
( streets1 feed {s1}
|
||
|
|
streets1 feed {s2}
|
||
|
|
spatialjoin[GeoData_s1, GeoData_s2]
|
||
|
|
filter[.StreetId_s1 < .StreetId_s2]
|
||
|
|
extend[Crossroads: crossings(.GeoData_s1, .GeoData_s2)]
|
||
|
|
project[Crossroads]
|
||
|
|
filter[not(isempty(.Crossroads))]
|
||
|
|
aggregateB[Crossroads; fun(P1: points, P2: points)
|
||
|
|
P1 union P2; [const points value ()]]
|
||
|
|
)
|
||
|
|
union
|
||
|
|
(streets1 feed
|
||
|
|
projectextend[; B : boundary(.GeoData)]
|
||
|
|
aggregateB[B; fun(P3 : points, P4 : points)
|
||
|
|
P3 union P4; [const points value ()]]
|
||
|
|
);
|
||
|
|
let sections2 =
|
||
|
|
allstreets polylines[FALSE, Crossings]
|
||
|
|
namedtransformstream[Part]
|
||
|
|
consume;
|
||
|
|
let nodes2 =
|
||
|
|
sections2 feed
|
||
|
|
projectextend[; EndPoints: boundary(.Part)]
|
||
|
|
aggregateB[EndPoints; fun(P1: points, P2: points)
|
||
|
|
P1 union P2 ; [const points value ()]];
|
||
|
|
let QueryPoints =
|
||
|
|
intstream(1, P_SAMPLESIZE)
|
||
|
|
namedtransformstream[Id]
|
||
|
|
extend[Pos: get(nodes2, rng_intN(no_components(nodes2)) )]
|
||
|
|
consume;
|
||
|
|
|
||
|
|
#********* QueryRegions **************************************#
|
||
|
|
#*************************************************************#
|
||
|
|
|
||
|
|
let QueryRegions =
|
||
|
|
intstream(1, P_SAMPLESIZE)
|
||
|
|
namedtransformstream[Id]
|
||
|
|
extend[Region: circle(
|
||
|
|
get(nodes2, rng_intN(no_components(nodes2))),
|
||
|
|
rng_intN(997) + 3.0,
|
||
|
|
rng_intN(98) + 3)]
|
||
|
|
consume;
|
||
|
|
|
||
|
|
#********* QueryPeriods **************************************#
|
||
|
|
#*************************************************************#
|
||
|
|
|
||
|
|
let QueryPeriods =
|
||
|
|
intstream(1, P_SAMPLESIZE)
|
||
|
|
namedtransformstream[Id]
|
||
|
|
extend[
|
||
|
|
StartInstant: create_instant(P_STARTDAY, 0)
|
||
|
|
+ create_duration(rng_real() * P_NUMDAYS ),
|
||
|
|
Duration: create_duration(abs(rng_gaussian(1.0)))
|
||
|
|
]
|
||
|
|
projectextend[Id; Period: theRange(
|
||
|
|
.StartInstant, .StartInstant + .Duration,
|
||
|
|
TRUE, TRUE)]
|
||
|
|
consume;
|
||
|
|
|
||
|
|
#********* QueryInstants *************************************#
|
||
|
|
#*************************************************************#
|
||
|
|
let QueryInstants =
|
||
|
|
intstream(1, P_SAMPLESIZE)
|
||
|
|
namedtransformstream[Id]
|
||
|
|
extend[Instant: create_instant(P_STARTDAY, 0)
|
||
|
|
+ create_duration(rng_real() * P_NUMDAYS )]
|
||
|
|
consume;
|
||
|
|
|
||
|
|
|
||
|
|
close database;
|