1048 lines
39 KiB
Plaintext
1048 lines
39 KiB
Plaintext
|
|
open database bm_05;
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
# Q # 1 #
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
let OBACRres001 =
|
|
QueryLicences_List
|
|
hadoopMap[ "Q1_Result", DLF; . feed {O} loopsel[
|
|
para(dataSCcar_Licence_btree_List) para(dataSCcar_List)
|
|
exactmatch[.Licence_O]] project[Licence,Model]
|
|
]
|
|
collect[] consume;
|
|
|
|
# Total runtime ... Times (elapsed / cpu): 40.5199sec / 0.35sec = 115.771
|
|
|
|
#+++Verification+++
|
|
let SQ_OBACRres001 = QueryLicences feed {O}
|
|
loopjoin[ dataSCcar_Licence_btree dataSCcar exactmatch[.Licence_O]]
|
|
project[Licence, Model] consume;
|
|
|
|
query SQ_OBACRres001 feed sort OBACRres001 feed sort mergediff count = 0;
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
# Q # 2 #
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
let OBACRres002 = dataSCcar_List
|
|
hadoopMap[ "Q2_Result", DLF; . feed filter[.Type = "passenger"]
|
|
]
|
|
collect[] consume;
|
|
|
|
#+++Verification+++
|
|
let SQ_OBACRres002 =
|
|
dataSCcar feed filter [.Type = "passenger"] consume;
|
|
|
|
query SQ_OBACRres002 feed sort OBACRres002 feed remove[SlaveID] sort mergediff count = 0;
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
# Q # 3 #
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
let QueryInstants_Top10_Dup_List =
|
|
QueryInstants feed {II} head[10]
|
|
intstream(1, CLUSTER_SCALE) namedtransformstream[SID]
|
|
product
|
|
spread[;SID, CLUSTER_SCALE, FALSE;]
|
|
hadoopMap[; . consume];
|
|
|
|
# Cannot use the created cell-grid object.
|
|
# The cellGird3D cannot be set as DELIEVERABLE kind data yet.
|
|
# Since the inter-query contains only the value expression of the data
|
|
# can not pass through the type mapping function.
|
|
|
|
let OBACRres003 =
|
|
QueryLicences feed head[10]
|
|
spread["QueryLicences_TOP10",'';Licence, CLUSTER_SCALE, TRUE;]
|
|
hadoopMap[DLF; . loopjoin[ para(dataSCcar_Licence_btree_List)
|
|
para(dataSCcar_List) exactmatch[.Licence] {LL}
|
|
projectextendstream[Licence_LL; UTrip: units(.Journey_LL)]
|
|
extend[Box: scalerect(bbox(.UTrip), para(CAR_WORLD_X_SCALE),
|
|
para(CAR_WORLD_Y_SCALE), para(CAR_WORLD_T_SCALE))]
|
|
extendstream[Cell: cellnumber(.Box,
|
|
para(CAR_WORLD_GRID_LBP_X), para(CAR_WORLD_GRID_LBP_Y),
|
|
para(CAR_WORLD_GRID_LBP_T), para(CellSize), para(CellSize),
|
|
para(CellSize), para(CellNum), para(CellNum) )]]]
|
|
hadoopReduce[Cell, "Q3_Result", DLF, REDUCE_SCALE
|
|
; . para(QueryInstants_Top10_Dup_List) feed product
|
|
projectextend[; Licence: .Licence_LL, Instant: .Instant_II,
|
|
Pos: val(.UTrip atinstant .Instant_II)]
|
|
filter[isdefined(.Pos)] ]
|
|
collect[]
|
|
sort rdup consume;
|
|
|
|
# TODO: Cannot use integrated CellGrid3D data type.
|
|
# TODO: Cannot use function
|
|
|
|
# Comment: As an unit may be duplicated into several cells,
|
|
# hence the rdup operation is required at last
|
|
|
|
#+++Binary Query+++
|
|
let OBACRres003_2 =
|
|
QueryLicences feed head[10]
|
|
spread["QueryLicences_TOP10",'';Licence, CLUSTER_SCALE, TRUE;]
|
|
hadoopMap[DLF; . loopjoin[ para(dataSCcar_Licence_btree_List)
|
|
para(dataSCcar_List) exactmatch[.Licence] {LL}
|
|
projectextendstream[Licence_LL; UTrip: units(.Journey_LL)]
|
|
extend[Box: scalerect(bbox(.UTrip), para(CAR_WORLD_X_SCALE),
|
|
para(CAR_WORLD_Y_SCALE), para(CAR_WORLD_T_SCALE))]
|
|
extendstream[Cell: cellnumber(.Box,
|
|
para(CAR_WORLD_GRID_LBP_X), para(CAR_WORLD_GRID_LBP_Y),
|
|
para(CAR_WORLD_GRID_LBP_T), para(CellSize), para(CellSize),
|
|
para(CellSize), para(CellNum), para(CellNum) )]]]
|
|
QueryInstants feed {II} head[10]
|
|
intstream(1, REDUCE_SCALE) namedtransformstream[SID] product
|
|
spread["QueryInstants_TOP10_Dup2",'';SID, REDUCE_SCALE, TRUE;]
|
|
hadoopReduce2[Cell, SID, "Q3_2Result", DLF, REDUCE_SCALE
|
|
; . .. product
|
|
projectextend[; Licence: .Licence_LL, Instant: .Instant_II,
|
|
Pos: val(.UTrip atinstant .Instant_II)]
|
|
filter[isdefined(.Pos)] ]
|
|
collect[] sort rdup consume;
|
|
|
|
# Total runtime ... Times (elapsed / cpu): 1:55min (115.372sec) /0.68sec = 169.664
|
|
|
|
#+++Verification+++
|
|
let SQ_OBACRres003 =
|
|
QueryLicences feed {LL} head[10]
|
|
loopjoin[dataSCcar_Licence_btree dataSCcar exactmatch[.Licence_LL]]
|
|
QueryInstants feed {II} head[10]
|
|
product
|
|
projectextend[; Licence: .Licence_LL, Instant: .Instant_II,
|
|
Pos: val(.Journey atinstant .Instant_II)]
|
|
consume;
|
|
|
|
query SQ_OBACRres003 feed sort OBACRres003 feed sort mergediff count = 0;
|
|
|
|
query SQ_OBACRres003 feed sort OBACRres003_2 feed sort mergediff count = 0;
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
# Q # 4 #
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
let OBACRres004 =
|
|
QueryPoints feed
|
|
intstream(1, CLUSTER_SCALE) namedtransformstream[SID] product
|
|
spread["QueryPoints_Dup",'';SID, CLUSTER_SCALE, FALSE;]
|
|
hadoopMap["Q4_Result", DLF; .
|
|
loopjoin[ para(dataSCcar_Journey_sptuni_List)
|
|
windowintersectsS[bbox(.Pos)] sort rdup
|
|
para(dataSCcar_List) gettuples]
|
|
filter[.Journey passes .Pos] project[Pos, Licence]
|
|
sortby[Pos, Licence] krdup[Pos, Licence]
|
|
] collect[] consume;
|
|
# Total runtime ... Times (elapsed / cpu): 43.8071sec / 0.97sec = 45.162
|
|
|
|
#+++Verification+++
|
|
let SQ_OBACRres004 =
|
|
QueryPoints feed
|
|
loopjoin[ dataSCcar_Journey_sptuni
|
|
windowintersectsS[bbox(.Pos)] sort rdup
|
|
dataSCcar gettuples]
|
|
filter[.Journey passes .Pos] project[Pos, Licence]
|
|
sortby[Pos, Licence] krdup[Pos, Licence] consume;
|
|
# Total runtime ... Times (elapsed / cpu): 2.50229sec / 2.5sec = 1.00092
|
|
|
|
query SQ_OBACRres004 feed sort OBACRres004 feed sort mergediff count = 0;
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
# Q # 5 #
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
let OBACRres005 =
|
|
QueryLicences feed head[10]
|
|
spread["Q5QL_1TO10",'';Licence, CLUSTER_SCALE, TRUE;]
|
|
hadoopMap[DLF; . loopsel[
|
|
para(dataSCcar_Licence_btree_List) para(dataSCcar_List)
|
|
exactmatch[.Licence] ]
|
|
projectextend[Licence; Traj: simplify(trajectory(.Journey),0.000001)]
|
|
extend[Box: scalerect(bbox(.Traj),
|
|
para(CAR_WORLD_X_SCALE), para(CAR_WORLD_Y_SCALE))]
|
|
projectextendstream[Licence, Box, Traj
|
|
;Cell: cellnumber(.Box,
|
|
para(CAR_WORLD_GRID_LBP_X), para(CAR_WORLD_GRID_LBP_Y),
|
|
para(CellSize), para(CellSize), para(CellNum)) ] ]
|
|
QueryLicences feed head[20] filter[.Id>10]
|
|
spread["Q5QL_11TO20",'';Licence, CLUSTER_SCALE, TRUE;]
|
|
hadoopMap[DLF; . loopsel[
|
|
para(dataSCcar_Licence_btree_List) para(dataSCcar_List)
|
|
exactmatch[.Licence] ]
|
|
projectextend[Licence; Traj: simplify(trajectory(.Journey),0.000001)]
|
|
extend[Box: scalerect(bbox(.Traj),
|
|
para(CAR_WORLD_X_SCALE), para(CAR_WORLD_Y_SCALE))]
|
|
projectextendstream[Licence, Box, Traj
|
|
;Cell: cellnumber(.Box,
|
|
para(CAR_WORLD_GRID_LBP_X), para(CAR_WORLD_GRID_LBP_Y),
|
|
para(CellSize), para(CellSize), para(CellNum)) ] ]
|
|
hadoopReduce2[Cell, Cell, DLF, REDUCE_SCALE
|
|
; . {V1} .. {V2} product
|
|
projectextend[; Licence1: .Licence_V1, Licence2: .Licence_V2,
|
|
Dist: distance(.Traj_V1, .Traj_V2) ] sort rdup]
|
|
collect[] sort rdup consume;
|
|
|
|
# Total runtime ... Times (elapsed / cpu): 2:26min (146.035sec) /0.59sec = 247.517
|
|
|
|
# Comment: sort by Cell, and join inside cells. Or can join for the whole task.
|
|
# Comment: Here is a product operator, should we partition them by cells ?
|
|
# At least the product operation should not be done within cells,
|
|
# since there may trajectories never met the other
|
|
# in one cell. Else we should distribute their join pairs.
|
|
|
|
# Comment: Also it is impossible to product them first,
|
|
# then distribute product pairs to slaves,
|
|
# since moving objects are distributed inside the cluster.
|
|
|
|
|
|
#+++Verification+++
|
|
|
|
let SQ_OBACRres005tmp1 =
|
|
QueryLicences feed head[10]
|
|
loopsel[ dataSCcar_Licence_btree dataSCcar
|
|
exactmatch[.Licence] ]
|
|
projectextend[Licence; Traj:
|
|
simplify(trajectory(.Journey),0.000001)] consume;
|
|
|
|
let SQ_OBACRres005tmp2 =
|
|
QueryLicences feed head[20] filter[.Id>10]
|
|
loopsel[ dataSCcar_Licence_btree dataSCcar
|
|
exactmatch[.Licence] ]
|
|
projectextend[Licence; Traj:
|
|
simplify(trajectory(.Journey),0.000001)] consume;
|
|
|
|
let SQ_OBACRres005 =
|
|
SQ_OBACRres005tmp1 feed {V1} SQ_OBACRres005tmp2 feed {V2}
|
|
product
|
|
projectextend[ ; Licence1: .Licence_V1,
|
|
Licence2: .Licence_V2, Dist: distance(.Traj_V1, .Traj_V2)]
|
|
sort rdup consume;
|
|
|
|
# Total runtime ... Times (elapsed / cpu): 9.40725sec / 9.4sec = 1.00077
|
|
|
|
query SQ_OBACRres005 feed sort OBACRres005 feed sort mergediff count = 0;
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
# Q # 6 #
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
let OBACRres006tmp1 = dataSCcar_List
|
|
hadoopMap[DLF; . feed filter[.Type = "truck"]
|
|
extendstream[UTrip: units(.Journey)]
|
|
extend[Box: scalerect(bbox(.UTrip),
|
|
para(CAR_WORLD_X_SCALE), para(CAR_WORLD_Y_SCALE), para(CAR_WORLD_T_SCALE))]
|
|
projectextendstream[Licence, Box, UTrip
|
|
;Cell: cellnumber(.Box,
|
|
para(CAR_WORLD_GRID_LBP_X), para(CAR_WORLD_GRID_LBP_Y),
|
|
para(CAR_WORLD_GRID_LBP_T), para(CellSize), para(CellSize),
|
|
para(CellSize), para(CellNum), para(CellNum)) ] ];
|
|
|
|
|
|
let OBACRres006 = OBACRres006tmp1 OBACRres006tmp1
|
|
hadoopReduce2[Cell, Cell, DLF, REDUCE_SCALE
|
|
; . sortby[Cell] {V1} .. sortby[Cell] {V2}
|
|
parajoin2[ Cell_V1, Cell_V2; . ..
|
|
realJoinMMRTreeVec[Box_V1, Box_V2, 10, 20]
|
|
filter[(.Licence_V1 < .Licence_V2)
|
|
and gridintersects(
|
|
para(CAR_WORLD_GRID_LBP_X), para(CAR_WORLD_GRID_LBP_Y),
|
|
para(CAR_WORLD_GRID_LBP_T), para(CellSize), para(CellSize),
|
|
para(CellSize), para(CellNum), para(CellNum),
|
|
.Box_V1, .Box_V2, .Cell_V1)
|
|
and sometimes(distance(.UTrip_V1,.UTrip_V2) <= 10.0) ]
|
|
projectextend[; Licence1: .Licence_V1, Licence2: .Licence_V2] ]
|
|
sort rdup]
|
|
collect[] sort rdup consume;
|
|
|
|
# Comment: The rdup operator in both partial and central query is required,
|
|
# as we can remove duplicated results for units but not moving objects'.
|
|
|
|
#+++Verification+++
|
|
let SQ_OBACRres006 =
|
|
dataSCcar feed {V1} filter[.Type_V1 = "truck"]
|
|
dataSCcar feed {V2} filter[.Type_V2 = "truck"]
|
|
symmjoin[.Licence_V1 < ..Licence_V2]
|
|
filter[ minimum(distance(.Journey_V1, .Journey_V2)) <= 10.0 ]
|
|
projectextend[ ; Licence1: .Licence_V1,
|
|
Licence2: .Licence_V2 ]
|
|
consume;
|
|
|
|
# Total runtime ... Times (elapsed / cpu): 11.4168sec / 11.4sec = 1.00147
|
|
|
|
# Why still produces a lot of duplicated results ?
|
|
|
|
query SQ_OBACRres006 feed sort OBACRres006 feed sort mergediff count = 0;
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
# Q # 7 #
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
let OBACRres007PointMinInst =
|
|
QueryPoints feed intstream(1, CLUSTER_SCALE) namedtransformstream[SID]
|
|
product
|
|
spread["QueryPoints_Dup";SID, CLUSTER_SCALE, FALSE;]
|
|
hadoopMap[ DLF; .
|
|
loopjoin[ para(dataSCcar_Journey_sptuni_List)
|
|
windowintersectsS[bbox(.Pos)]
|
|
sort rdup para(dataSCcar_List) gettuples ]
|
|
filter[.Type = "passenger"]
|
|
projectextend[Pos ; Instant: inst(initial(.Journey at .Pos)) ]
|
|
filter[not(isempty(.Instant))]
|
|
sortby[Pos asc, Instant asc]
|
|
groupby[Pos; SlaveFirstTime: group feed min[Instant] ]
|
|
] collect[]
|
|
sortby[Pos] groupby[Pos; FirstTime: group feed min[SlaveFirstTime]]
|
|
consume;
|
|
|
|
let OBACRres007 =
|
|
OBACRres007PointMinInst feed
|
|
intstream(1, CLUSTER_SCALE) namedtransformstream[SID]
|
|
product
|
|
spread["OBACRres007PointMinInst_Dup",'';SID, CLUSTER_SCALE, FALSE;]
|
|
hadoopMap[ "Q7_Result", DLF; . extend[MBR: box3d(bbox(.Pos),.FirstTime) ]
|
|
loopjoin[ para(dataSCcar_Journey_sptmpuni_List)
|
|
windowintersectsS[.MBR]
|
|
sort rdup para(dataSCcar_List) gettuples ]
|
|
filter[.Type = "passenger"] filter[.Journey passes .Pos]
|
|
projectextend[Licence, FirstTime,
|
|
Pos ; Instant: inst(initial(.Journey at .Pos))]
|
|
filter[.Instant <= .FirstTime]
|
|
project[ Pos, Licence ]
|
|
]
|
|
collect[]
|
|
consume;
|
|
# Total runtime ... Times (elapsed / cpu): 46.9647sec / 0.38sec = 123.591
|
|
|
|
|
|
#+++Verification+++
|
|
let SQ_OBACRres007PointMinInst = QueryPoints feed project[Pos]
|
|
loopjoin[ dataSCcar_Journey_sptuni
|
|
windowintersectsS[bbox(.Pos)]
|
|
sort rdup dataSCcar gettuples ]
|
|
filter[.Type = "passenger"]
|
|
projectextend[Pos ; Instant: inst(initial(.Journey at .Pos)) ]
|
|
filter[not(isempty(.Instant))]
|
|
sortby[Pos asc, Instant asc]
|
|
groupby[Pos; FirstTime: group feed min[Instant] ]
|
|
consume;
|
|
|
|
let SQ_OBACRres007 =
|
|
SQ_OBACRres007PointMinInst feed
|
|
extend[MBR: box3d(bbox(.Pos),.FirstTime) ]
|
|
loopjoin[ dataSCcar_Journey_sptmpuni
|
|
windowintersectsS[.MBR]
|
|
sort rdup dataSCcar gettuples ]
|
|
filter[.Type = "passenger"] filter[.Journey passes .Pos]
|
|
projectextend[Licence, FirstTime,
|
|
Pos ; Instant: inst(initial(.Journey at .Pos))]
|
|
filter[.Instant <= .FirstTime]
|
|
project[ Pos, Licence ]
|
|
consume;
|
|
# Total runtime ... Times (elapsed / cpu): 1.59655sec / 1.54sec = 1.03672
|
|
|
|
query SQ_OBACRres007 feed sort OBACRres007 feed sort mergediff count = 0;
|
|
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
# Q # 8 #
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
let QueryPeriods_Top10_Dup_List =
|
|
QueryPeriods feed head[10]
|
|
intstream(1, CLUSTER_SCALE) namedtransformstream[SID] product
|
|
spread["QueryPeriods_TOP10_Dup",'';SID, CLUSTER_SCALE, FALSE;]
|
|
|
|
let OBACRres008 =
|
|
QueryLicences feed {LL} head[10]
|
|
spread[;Licence_LL, CLUSTER_SCALE, TRUE;]
|
|
hadoopMap[DLF; . loopsel[
|
|
para(dataSCcar_Licence_btree_List) para(dataSCcar_List)
|
|
exactmatch[.Licence_LL] ]
|
|
projectextendstream[Licence; UTrip: units(.Journey)]
|
|
extend[Box: scalerect(bbox(.UTrip), para(CAR_WORLD_X_SCALE),
|
|
para(CAR_WORLD_Y_SCALE), para(CAR_WORLD_T_SCALE))]
|
|
extendstream[Cell: cellnumber(.Box,
|
|
para(CAR_WORLD_GRID_LBP_X), para(CAR_WORLD_GRID_LBP_Y),
|
|
para(CAR_WORLD_GRID_LBP_T), para(CellSize), para(CellSize),
|
|
para(CellSize), para(CellNum), para(CellNum) )] ]
|
|
hadoopReduce[Cell, DLF, "Q8_Result", REDUCE_SCALE
|
|
; . para(QueryPeriods_Top10_Dup_List) {PP} product
|
|
projectextendstream[Licence, Period_PP, UTrip
|
|
; UPTrip: .UTrip atperiods .Period_PP]
|
|
extend[UDist: round(length(.UPTrip), 3)]
|
|
projectextend[Licence, UTrip, UDist;Period:.Period_PP]]
|
|
collect[] sort rdup
|
|
groupby2[Licence, Period; Dist: fun(t2:TUPLE, d2:real) d2 + attr(t2, UDist)::0.0]
|
|
consume;
|
|
|
|
# Total runtime ... Times (elapsed / cpu): 2:01min (120.76sec) /14.67sec = 8.23179
|
|
|
|
|
|
#+++Binary Query+++
|
|
let OBACRres008_2 =
|
|
QueryLicences feed {LL} head[10]
|
|
spread[;Licence_LL, CLUSTER_SCALE, TRUE;]
|
|
hadoopMap[DLF; . loopsel[
|
|
para(dataSCcar_Licence_btree_List) para(dataSCcar_List)
|
|
exactmatch[.Licence_LL] ]
|
|
projectextendstream[Licence; UTrip: units(.Journey)]
|
|
extend[Box: scalerect(bbox(.UTrip), para(CAR_WORLD_X_SCALE),
|
|
para(CAR_WORLD_Y_SCALE), para(CAR_WORLD_T_SCALE))]
|
|
extendstream[Cell: cellnumber(.Box,
|
|
para(CAR_WORLD_GRID_LBP_X), para(CAR_WORLD_GRID_LBP_Y),
|
|
para(CAR_WORLD_GRID_LBP_T), para(CellSize), para(CellSize),
|
|
para(CellSize), para(CellNum), para(CellNum) )] ]
|
|
QueryPeriods feed head[10]
|
|
intstream(1, REDUCE_SCALE) namedtransformstream[SID] product
|
|
spread["QueryPeriods_TOP10_Dup2",'';SID, REDUCE_SCALE, TRUE;]
|
|
hadoopReduce2[ Cell, SID, DLF, REDUCE_SCALE
|
|
; . .. {PP} product
|
|
projectextendstream[Licence, Period_PP, UTrip
|
|
; UPTrip: .UTrip atperiods .Period_PP]
|
|
extend[UDist: round(length(.UPTrip), 3)]
|
|
projectextend[Licence, UTrip, UDist;Period:.Period_PP] ]
|
|
collect[] sort rdup
|
|
groupby2[Licence, Period; Dist: fun(t2:TUPLE, d2:real) d2 + attr(t2, UDist)::0.0]
|
|
consume;
|
|
|
|
# Total runtime ... Times (elapsed / cpu): 1:55min (115.348sec) /1.83sec = 63.0316
|
|
|
|
#+++Verification+++
|
|
let SQ_OBACRres008 =
|
|
QueryLicences feed {LL} head[10]
|
|
loopsel[ dataSCcar_Licence_btree dataSCcar exactmatch[.Licence_LL] ]
|
|
QueryPeriods feed project[Period] head[10] {PP}
|
|
product
|
|
projectextend[Licence; Period: .Period_PP,
|
|
Dist: round(length(.Journey atperiods .Period_PP),3)]
|
|
project[Licence, Period, Dist] consume;
|
|
|
|
query SQ_OBACRres008 feed sort OBACRres008 feed sort mergediff count = 0;
|
|
query SQ_OBACRres008 feed sort OBACRres008_2 feed sort mergediff count = 0;
|
|
# Comment: slight differences happens on the distance, because of the precision problem
|
|
|
|
query
|
|
SQ_OBACRres008 feed {r} OBACRres008 feed {s}
|
|
symmjoin[ (.Licence_r = ..Licence_s) and (.Period_r = ..Period_s)
|
|
and (.Dist_r # ..Dist_s) ]
|
|
filter[ abs(.Dist_r - .Dist_s) > 1]
|
|
count = 0;
|
|
|
|
query
|
|
SQ_OBACRres008 feed {r} OBACRres008_2 feed {s}
|
|
symmjoin[ (.Licence_r = ..Licence_s) and (.Period_r = ..Period_s)
|
|
and (.Dist_r # ..Dist_s) ]
|
|
filter[ abs(.Dist_r - .Dist_s) > 1]
|
|
count = 0;
|
|
|
|
query OBACRres008 feed sort OBACRres008_2 feed sort mergediff count = 0;
|
|
|
|
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
# Q # 9 #
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
|
|
let OBACRres009 =
|
|
QueryPeriods feed
|
|
intstream(1, CLUSTER_SCALE) namedtransformstream[SID] product
|
|
spread["QueryPeriods_Dup",'';SID, CLUSTER_SCALE, FALSE;]
|
|
hadoopMap["Q9_Result", DLF; . {PP}
|
|
para(dataSCcar_List) feed project[Journey] {V1} product
|
|
projectextend[Id_PP ; Period: .Period_PP,
|
|
D: length(.Journey_V1 atperiods .Period_PP)]
|
|
sortby[Id_PP, Period, D desc]
|
|
groupby[Id_PP, Period; SubDist: round(group feed max[D],3) ]
|
|
project[Id_PP, Period, SubDist]
|
|
] collect[]
|
|
sortby [Id_PP, Period, SubDist desc]
|
|
groupby[Id_PP, Period; Dist: round(group feed max[SubDist],3) ]
|
|
project[Period, Dist]
|
|
consume;
|
|
|
|
# Total runtime ... Times (elapsed / cpu): 51.497sec / 0.38sec = 135.518
|
|
|
|
|
|
#+++Verification+++
|
|
let SQ_OBACRres009 =
|
|
dataSCcar feed project[Journey] {V1}
|
|
QueryPeriods feed {PP}
|
|
product
|
|
projectextend[Id_PP ; Period: .Period_PP, D:
|
|
length(.Journey_V1 atperiods .Period_PP)]
|
|
sortby[Id_PP, Period, D desc]
|
|
groupby[ Id_PP, Period; Dist: round(group feed max[D],3) ]
|
|
project[Period, Dist]
|
|
consume;
|
|
|
|
# Total runtime ... Times (elapsed / cpu): 1:46min (106.337sec) /106.3sec = 1.00035
|
|
|
|
query SQ_OBACRres009 feed sort OBACRres009 feed sort mergediff count = 0;
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
# Q # 10 #
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
let OBACRres010tmp1 =
|
|
QueryLicences feed head[10]
|
|
spread["Q10QL_1TO10",'';Licence, CLUSTER_SCALE, TRUE;]
|
|
hadoopMap[DLF; . loopsel[
|
|
para(dataSCcar_Licence_btree_List) para(dataSCcar_List)
|
|
exactmatch[.Licence] ]
|
|
extendstream[UTrip: units(.Journey)]
|
|
extend[Box: scalerect(bbox(.UTrip),
|
|
para(CAR_WORLD_X_SCALE), para(CAR_WORLD_Y_SCALE), para(CAR_WORLD_T_SCALE))]
|
|
projectextendstream[Licence, Box, UTrip
|
|
;Cell: cellnumber(.Box,
|
|
para(CAR_WORLD_GRID_LBP_X), para(CAR_WORLD_GRID_LBP_Y),
|
|
para(CAR_WORLD_GRID_LBP_T), para(CellSize), para(CellSize),
|
|
para(CellSize), para(CellNum), para(CellNum)) ] ]
|
|
|
|
let OBACRres010tmp2 =
|
|
dataSCcar_List
|
|
hadoopMap[DLF; . feed
|
|
extendstream[UTrip: units(.Journey)]
|
|
extend[Box: scalerect(bbox(.UTrip),
|
|
para(CAR_WORLD_X_SCALE), para(CAR_WORLD_Y_SCALE), para(CAR_WORLD_T_SCALE))]
|
|
projectextendstream[Licence, Box, UTrip
|
|
;Cell: cellnumber(.Box,
|
|
para(CAR_WORLD_GRID_LBP_X), para(CAR_WORLD_GRID_LBP_Y),
|
|
para(CAR_WORLD_GRID_LBP_T), para(CellSize), para(CellSize),
|
|
para(CellSize), para(CellNum), para(CellNum)) ] ]
|
|
|
|
let OBACRres010 =
|
|
OBACRres010tmp1 OBACRres010tmp2
|
|
hadoopReduce2[Cell, Cell, REDUCE_SCALE, "Q10_Result", DLF
|
|
; . sortby[Cell] {V1} .. sortby[Cell] {V2}
|
|
parajoin2[ Cell_V1, Cell_V2; . ..
|
|
realJoinMMRTreeVec[Box_V1, Box_V2, 10, 20]
|
|
filter[(.Licence_V1 # .Licence_V2)
|
|
and gridintersects(
|
|
para(CAR_WORLD_GRID_LBP_X), para(CAR_WORLD_GRID_LBP_Y),
|
|
para(CAR_WORLD_GRID_LBP_T), para(CellSize), para(CellSize),
|
|
para(CellSize), para(CellNum), para(CellNum),
|
|
.Box_V1, .Box_V2, .Cell_V1)
|
|
and sometimes(distance(.UTrip_V1,.UTrip_V2) < 3.0) ]
|
|
projectextend[; QueryLicence: .Licence_V1, OtherLicence: .Licence_V2,
|
|
DPos: (.UTrip_V1 atperiods
|
|
deftime((distance(.UTrip_V1,.UTrip_V2) < 3.0) the_mvalue at TRUE )) the_mvalue]
|
|
filter[not(isempty(deftime(.DPos)))]
|
|
project[QueryLicence, OtherLicence, DPos] ]]
|
|
collect[] sortby[QueryLicence, OtherLicence]
|
|
groupby[QueryLicence, OtherLicence
|
|
; Pos: group feed project[DPos] sort transformstream concatS]
|
|
consume
|
|
|
|
# Comment: The parallel results are slightly different from the sequential results,
|
|
# since we query the join results based on upoints instead of mpoints
|
|
# Comment: If I merge above three queries together, then the result is wrong,
|
|
# I didn't find out the reason yet.
|
|
# Comment: This one make sense to be parallelized, as the sequential query is cpu-tensitive.
|
|
|
|
#+++Verification+++
|
|
let SQ_OBACRres010 =
|
|
QueryLicences feed head[10]
|
|
loopsel[ dataSCcar_Licence_btree dataSCcar exactmatch[.Licence]
|
|
project[Licence, Journey] ] {V1}
|
|
dataSCcar feed project[Licence, Journey] {V2}
|
|
symmjoin[(.Licence_V1 # ..Licence_V2) ]
|
|
filter[ (everNearerThan(.Journey_V1, .Journey_V2, 3.0)) ]
|
|
projectextend[; QueryLicence: .Licence_V1,
|
|
OtherLicence: .Licence_V2,
|
|
Pos: .Journey_V1 atperiods deftime((distance(.Journey_V1,
|
|
.Journey_V2) < 3.0) at TRUE) ]
|
|
filter[not(isempty(deftime(.Pos)))]
|
|
project[QueryLicence, OtherLicence, Pos]
|
|
sort rdup
|
|
consume;
|
|
|
|
# Total runtime ... Times (elapsed / cpu): 5:43min (342.972sec) /342.77sec = 1.00059
|
|
|
|
query SQ_OBACRres010 feed sort OBACRres010 feed sort mergediff count = 0;
|
|
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
# Q # 11 #
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
let OBACRres011 =
|
|
QueryPoints feed head[10] project[Pos] {PP}
|
|
QueryInstants feed head[10] project[Instant] {II}
|
|
product
|
|
intstream(1, CLUSTER_SCALE) namedtransformstream[SID]
|
|
product
|
|
spread["Query_Points_Instants_top10_dup",''; SID, CLUSTER_SCALE, FALSE;]
|
|
hadoopMap[ "Q11_Result", DLF; .
|
|
loopjoin[ para(dataSCcar_Journey_sptmpuni_List)
|
|
windowintersectsS[box3d(bbox(.Pos_PP), .Instant_II)]
|
|
sort rdup ]
|
|
para(dataSCcar_List) gettuples
|
|
projectextend[Licence, Pos_PP, Instant_II; XPos:
|
|
val(.Journey atinstant .Instant_II) ]
|
|
filter[not(isempty(.XPos))]
|
|
filter[distance(.XPos,.Pos_PP) < 0.5]
|
|
projectextend[Licence; Pos: .Pos_PP, Instant: .Instant_II]
|
|
sort rdup
|
|
] collect[]
|
|
consume;
|
|
|
|
# Total runtime ... Times (elapsed / cpu): 45.7292sec / 0.56sec = 81.6593
|
|
|
|
#+++Verification+++
|
|
let SQ_OBACRres011 =
|
|
QueryPoints feed head[10] project[Pos] {PP}
|
|
QueryInstants feed head[10] project[Instant] {II}
|
|
product
|
|
loopjoin[ dataSCcar_Journey_sptmpuni
|
|
windowintersectsS[box3d(bbox(.Pos_PP), .Instant_II)]
|
|
sort rdup ]
|
|
dataSCcar gettuples
|
|
projectextend[Licence, Pos_PP, Instant_II; XPos:
|
|
val(.Journey atinstant .Instant_II) ]
|
|
filter[not(isempty(.XPos))]
|
|
filter[distance(.XPos,.Pos_PP) < 0.5]
|
|
projectextend[Licence; Pos: .Pos_PP, Instant: .Instant_II]
|
|
sort rdup
|
|
consume;
|
|
|
|
# Total runtime ... Times (elapsed / cpu): 0.122817sec / 0.05sec = 2.45634
|
|
|
|
query SQ_OBACRres011 feed sort OBACRres011 feed sort mergediff count = 0;
|
|
|
|
######## Comments
|
|
# Both results don't contain any tuple.
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
# Q # 12 #
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
let OBACRres012allInstants_List =
|
|
QueryInstants feed head[10]
|
|
extend[Period: theRange(.Instant, .Instant, TRUE, TRUE)]
|
|
aggregateB[Period; fun(I1: periods, I2:periods)
|
|
I1 union I2; [const periods value ()]
|
|
] feed namedtransformstream[Period]
|
|
intstream(1, CLUSTER_SCALE) namedtransformstream[SID]
|
|
product
|
|
spread["OBACRres012allInstants_dup"; SID, CLUSTER_SCALE, FALSE;]
|
|
hadoopMap[DLO; . extract[Period]];
|
|
|
|
let QueryPoints_Top10_List =
|
|
QueryPoints feed head[10] project[Pos]
|
|
intstream(1, CLUSTER_SCALE) namedtransformstream[SID]
|
|
product
|
|
spread["QueryPoints_Top10_dup"; SID, CLUSTER_SCALE, FALSE;]
|
|
hadoopMap[DLO; . consume];
|
|
|
|
let QueryInstants_Top10_List =
|
|
QueryInstants feed head[10] project[Instant]
|
|
intstream(1, CLUSTER_SCALE) namedtransformstream[SID]
|
|
product
|
|
spread["QueryInstants_Top10_dup"; SID, CLUSTER_SCALE, FALSE;]
|
|
hadoopMap[DLO; . consume];
|
|
|
|
let OBACRres012tmp1 =
|
|
QueryPoints_Top10_List
|
|
hadoopMap[DLF
|
|
; . feed loopjoin[ para(dataSCcar_Journey_sptuni_List)
|
|
windowintersectsS[bbox(.Pos)] sort rdup
|
|
para(dataSCcar_List) gettuples
|
|
projectextend[Licence; Journey: .Journey atperiods para(OBACRres012allInstants_List)] ]
|
|
filter[.Journey passes .Pos]
|
|
projectextendstream[Licence, Pos; UTrip: units(.Journey)]
|
|
extend[Box: scalerect(bbox(.UTrip),
|
|
para(CAR_WORLD_X_SCALE), para(CAR_WORLD_Y_SCALE), para(CAR_WORLD_T_SCALE))]
|
|
extendstream[ Cell: cellnumber( .Box,
|
|
para(CAR_WORLD_GRID_LBP_X), para(CAR_WORLD_GRID_LBP_Y), para(CAR_WORLD_GRID_LBP_T),
|
|
para(CellSize), para(CellSize), para(CellSize), para(CellNum), para(CellNum) )] ]
|
|
|
|
let OBACRres012 =
|
|
OBACRres012tmp1 OBACRres012tmp1
|
|
hadoopReduce2[ Cell, Cell, REDUCE_SCALE, DLF
|
|
; . sortby[Cell] {V1} .. sortby[Cell] {V2}
|
|
parajoin2[Cell_V1, Cell_V2; . ..
|
|
realJoinMMRTreeVec[Box_V1, Box_V2, 10, 20]
|
|
filter[(.Licence_V1 < .Licence_V2)
|
|
and gridintersects(
|
|
para(CAR_WORLD_GRID_LBP_X), para(CAR_WORLD_GRID_LBP_Y),
|
|
para(CAR_WORLD_GRID_LBP_T), para(CellSize), para(CellSize),
|
|
para(CellSize), para(CellNum), para(CellNum),
|
|
.Box_V1, .Box_V2, .Cell_V1) ]]
|
|
para(QueryInstants_Top10_List) feed
|
|
symmjoin[val(.UTrip_V1 atinstant ..Instant)
|
|
= val(.UTrip_V2 atinstant ..Instant)]
|
|
projectextend[ Pos_V2, Instant; Licence1: .Licence_V1,
|
|
Licence2: .Licence_V2]
|
|
sort rdup ]
|
|
collect[] consume;
|
|
|
|
# Comment: The correctness cannot be guaranteed, as both parallel and sequential
|
|
# query results contain 0 tuple.
|
|
|
|
|
|
#+++Verification+++
|
|
let SQ_OBACRres012allInstants =
|
|
QueryInstants feed head[10]
|
|
extend[Period: theRange(.Instant, .Instant, TRUE, TRUE)]
|
|
aggregateB[Period; fun(I1: periods, I2:periods)
|
|
I1 union I2; [const periods value ()]
|
|
];
|
|
|
|
let SQ_OBACRres012 =
|
|
QueryPoints feed head[10] project[Pos]
|
|
loopjoin[ dataSCcar_Journey_sptuni
|
|
windowintersectsS[bbox(.Pos)]
|
|
sort rdup dataSCcar gettuples
|
|
projectextend[Licence
|
|
; Journey: .Journey atperiods SQ_OBACRres012allInstants]
|
|
]
|
|
filter[.Journey passes .Pos]
|
|
projectextend[Licence, Pos; Journey: .Journey at .Pos] {V1}
|
|
QueryPoints feed head[10] project[Pos]
|
|
loopjoin[ dataSCcar_Journey_sptuni
|
|
windowintersectsS[bbox(.Pos)]
|
|
sort rdup dataSCcar gettuples
|
|
projectextend[Licence
|
|
; Journey: .Journey atperiods SQ_OBACRres012allInstants]
|
|
]
|
|
filter[.Journey passes .Pos]
|
|
projectextend[Licence, Pos; Journey: .Journey at .Pos] {V2}
|
|
symmjoin[.Licence_V1 < ..Licence_V2]
|
|
QueryInstants feed head[10]
|
|
symmjoin[val(.Journey_V1 atinstant ..Instant)
|
|
= val(.Journey_V2 atinstant ..Instant)]
|
|
projectextend[ Pos_V2, Instant; Licence1: .Licence_V1,
|
|
Licence2: .Licence_V2]
|
|
sort rdup
|
|
consume;
|
|
|
|
delete SQ_OBACRres012allInstants;
|
|
|
|
query SQ_OBACRres012 feed sort OBACRres012 feed sort mergediff count = 0;
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
# Q # 13 #
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
let OBACRres013 =
|
|
QueryRegions feed head[10] filter[not(isempty(.Region))] {RR}
|
|
QueryPeriods feed head[10] filter[not(isempty(.Period))] {PP}
|
|
product
|
|
intstream(1, CLUSTER_SCALE) namedtransformstream[SID]
|
|
product
|
|
spread["Query_Region_Period_top10_dup",''; SID, CLUSTER_SCALE, FALSE;]
|
|
hadoopMap["Q13_Result", DLF; .
|
|
loopsel [ fun(t:TUPLE) para(dataSCcar_Journey_sptmpuni_List)
|
|
windowintersectsS[box3d(bbox(attr(t,Region_RR)), attr(t,Period_PP))]
|
|
sort rdup para(dataSCcar_List) gettuples
|
|
filter[(.Journey atperiods attr(t,Period_PP)) passes attr(t,Region_RR) ]
|
|
projectextend[Licence; Region: attr(t,Region_RR),
|
|
Period: attr(t,Period_PP), Id_RR: attr(t,Id_RR), Id_PP: attr(t,Id_PP)] ]
|
|
sortby[Id_RR, Id_PP, Licence] krdup[Id_RR, Id_PP, Licence]
|
|
project[Region, Period, Licence] ]
|
|
collect[]
|
|
consume;
|
|
|
|
# Total runtime ... Times (elapsed / cpu): 49.246sec / 0.6sec = 82.0767
|
|
|
|
|
|
|
|
#+++Verification+++
|
|
let SQ_OBACRres013 =
|
|
QueryRegions feed head[10] filter[not(isempty(.Region))] {RR}
|
|
QueryPeriods feed head[10] filter[not(isempty(.Period))] {PP}
|
|
product
|
|
loopsel [ fun(t:TUPLE)
|
|
dataSCcar_Journey_sptmpuni
|
|
windowintersectsS[box3d(bbox(attr(t,Region_RR)), attr(t,Period_PP))]
|
|
sort rdup dataSCcar gettuples
|
|
filter[(.Journey atperiods attr(t,Period_PP)) passes attr(t,Region_RR) ]
|
|
projectextend[Licence; Region: attr(t,Region_RR),
|
|
Period: attr(t,Period_PP), Id_RR: attr(t,Id_RR),
|
|
Id_PP: attr(t,Id_PP)]
|
|
]
|
|
sortby[Id_RR, Id_PP, Licence] krdup[Id_RR, Id_PP, Licence]
|
|
project[Region, Period, Licence]
|
|
consume;
|
|
|
|
# Total runtime ... Times (elapsed / cpu): 9.34449sec / 9.34sec = 1.00048
|
|
|
|
query SQ_OBACRres013 feed sort OBACRres013 feed sort mergediff count = 0;
|
|
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
# Q # 14 #
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
let OBACRres014 =
|
|
QueryRegions feed head[10] filter[not(isempty(.Region))] {RR}
|
|
QueryInstants feed head[10] filter[not(isempty(.Instant))] {II}
|
|
product
|
|
intstream(1, CLUSTER_SCALE) namedtransformstream[SID]
|
|
product
|
|
spread["Query_Region_Instant_top10_dup", ''; SID, CLUSTER_SCALE, FALSE;]
|
|
hadoopMap["Q14_Result", DLF; .
|
|
loopsel [ fun(t:TUPLE) para(dataSCcar_Journey_sptmpuni_List)
|
|
windowintersectsS[box3d( bbox(attr(t,Region_RR)), attr(t,Instant_II))]
|
|
sort rdup para(dataSCcar_List) gettuples
|
|
filter[val(.Journey atinstant attr(t,Instant_II)) inside attr(t,Region_RR) ]
|
|
projectextend[Licence; Region: attr(t,Region_RR),
|
|
Instant: attr(t,Instant_II), Id_RR: attr(t,Id_RR),
|
|
Id_II: attr(t,Id_II)] ]
|
|
sortby[Id_RR, Id_II, Licence] krdup[Id_RR, Id_II, Licence]
|
|
project[Region, Instant, Licence] ]
|
|
collect[]
|
|
consume;
|
|
|
|
# Total runtime ... Times (elapsed / cpu): 40.9629sec / 0.37sec = 110.711
|
|
|
|
#+++Verification+++
|
|
let SQ_OBACRres014 =
|
|
QueryRegions feed head[10] {RR}
|
|
QueryInstants feed head[10] {II} product
|
|
loopsel [ fun(t:TUPLE) dataSCcar_Journey_sptmpuni
|
|
windowintersectsS[box3d(bbox(attr(t,Region_RR)),attr(t,Instant_II))]
|
|
sort rdup dataSCcar gettuples
|
|
filter[val(.Journey atinstant attr(t,Instant_II)) inside attr(t,Region_RR) ]
|
|
projectextend[Licence; Region: attr(t,Region_RR),
|
|
Instant: attr(t,Instant_II), Id_RR: attr(t,Id_RR),
|
|
Id_II: attr(t,Id_II)] ]
|
|
sortby[Id_RR, Id_II, Licence] krdup[Id_RR, Id_II, Licence]
|
|
project[Region, Instant, Licence] consume;
|
|
|
|
# Total runtime ... Times (elapsed / cpu): 0.369704sec / 0.21sec = 1.7605
|
|
|
|
query SQ_OBACRres014 feed sort OBACRres014 feed sort mergediff count = 0;
|
|
|
|
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
# Q # 15 #
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
let OBACRres015 =
|
|
QueryPoints feed head[10] filter[not(isempty(.Pos))] {PO}
|
|
QueryPeriods feed head[10] filter[not(isempty(.Period))] {PR}
|
|
product
|
|
intstream(1, CLUSTER_SCALE) namedtransformstream[SID]
|
|
product
|
|
spread["Query_Point_Period_top10_dup",''; SID, CLUSTER_SCALE, FALSE;]
|
|
hadoopMap["Q15_Result", DLF; .
|
|
loopsel [ fun(t:TUPLE) para(dataSCcar_Journey_sptmpuni_List)
|
|
windowintersectsS[box3d(bbox(attr(t,Pos_PO)),attr(t,Period_PR))]
|
|
sort rdup para(dataSCcar_List) gettuples
|
|
filter[(.Journey atperiods attr(t,Period_PR)) passes attr(t,Pos_PO) ]
|
|
projectextend[Licence; Point: attr(t,Pos_PO),
|
|
Period: attr(t,Period_PR), Id_PO: attr(t,Id_PO), Id_PR: attr(t,Id_PR)] ]
|
|
sortby[Id_PO, Id_PR, Licence] krdup[Id_PO, Id_PR, Licence]
|
|
project[Point, Period, Licence]
|
|
]
|
|
collect[]
|
|
consume;
|
|
|
|
# Total runtime ... Times (elapsed / cpu): 45.4479sec / 0.37sec = 122.832
|
|
|
|
|
|
|
|
#+++Verification+++
|
|
let SQ_OBACRres015 =
|
|
QueryPoints feed head[10] {PO}
|
|
QueryPeriods feed head[10] {PR} product
|
|
loopsel [ fun(t:TUPLE) dataSCcar_Journey_sptmpuni
|
|
windowintersectsS[ box3d(bbox(attr(t,Pos_PO)),attr(t,Period_PR)) ]
|
|
sort rdup dataSCcar gettuples
|
|
filter[(.Journey atperiods attr(t,Period_PR)) passes attr(t,Pos_PO) ]
|
|
projectextend[Licence; Point: attr(t,Pos_PO),
|
|
Period: attr(t,Period_PR), Id_PO: attr(t,Id_PO), Id_PR: attr(t,Id_PR)] ]
|
|
sortby[Id_PO, Id_PR, Licence] krdup[Id_PO, Id_PR, Licence]
|
|
project[Point, Period, Licence]
|
|
consume;
|
|
|
|
# Total runtime ... Times (elapsed / cpu): 3.58079sec / 3.44sec = 1.04093
|
|
|
|
|
|
query SQ_OBACRres015 feed sort OBACRres015 feed sort mergediff count = 0;
|
|
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
# Q # 16 #
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
let Query_Period_Region_DupList =
|
|
QueryPeriods feed head[10] {PP} QueryRegions feed head[10] {RR}
|
|
product
|
|
intstream(1, CLUSTER_SCALE) namedtransformstream[SID]
|
|
product
|
|
spread["Query_Period_Region_top10_dup"; SID, CLUSTER_SCALE, FALSE;]
|
|
hadoopMap[; . consume];
|
|
|
|
# Divide upoints to levels, instead of cells
|
|
let OBACRres016Candidates1_List =
|
|
QueryLicences feed head[10]
|
|
spread["Q16QL_1TO10",'';Licence, CLUSTER_SCALE, TRUE;]
|
|
hadoopMap[DLF; . loopsel[
|
|
para(dataSCcar_Licence_btree_List) para(dataSCcar_List)
|
|
exactmatch[.Licence] ]
|
|
para(Query_Period_Region_DupList) feed
|
|
product
|
|
projectextend[Licence, Region_RR, Period_PP, Id_RR, Id_PP
|
|
; Journey: (.Journey atperiods .Period_PP) at .Region_RR]
|
|
filter[no_components(.Journey) > 0]
|
|
projectextendstream[Licence, Region_RR, Period_PP, Id_RR, Id_PP
|
|
; UTrip: units(.Journey)]
|
|
extend[Box: scalerect(bbox(.UTrip),
|
|
para(CAR_WORLD_X_SCALE), para(CAR_WORLD_Y_SCALE), para(CAR_WORLD_T_SCALE))]
|
|
extendstream[ Layer: cellnumber(.Box,
|
|
para(CAR_WORLD_GRID_LBP_X), para(CAR_WORLD_GRID_LBP_Y),
|
|
para(CAR_WORLD_GRID_LBP_T),
|
|
para(CAR_WORLD_MAXSIZE), para(CAR_WORLD_MAXSIZE), para(CellSize),
|
|
1, 1) ] ];
|
|
|
|
|
|
let OBACRres016Candidates2_List =
|
|
QueryLicences feed head[20] filter[.Id > 10]
|
|
spread["Q16QL_11TO20",'';Licence, CLUSTER_SCALE, TRUE;]
|
|
hadoopMap[DLF; . loopsel[
|
|
para(dataSCcar_Licence_btree_List) para(dataSCcar_List)
|
|
exactmatch[.Licence] ]
|
|
para(Query_Period_Region_DupList) feed
|
|
product
|
|
projectextend[Licence, Region_RR, Period_PP, Id_RR, Id_PP
|
|
; Journey: (.Journey atperiods .Period_PP) at .Region_RR]
|
|
filter[no_components(.Journey) > 0]
|
|
projectextendstream[Licence, Region_RR, Period_PP, Id_RR, Id_PP
|
|
; UTrip: units(.Journey)]
|
|
extend[Box: scalerect(bbox(.UTrip),
|
|
para(CAR_WORLD_X_SCALE), para(CAR_WORLD_Y_SCALE), para(CAR_WORLD_T_SCALE))]
|
|
extendstream[ Layer: cellnumber(.Box,
|
|
para(CAR_WORLD_GRID_LBP_X), para(CAR_WORLD_GRID_LBP_Y),
|
|
para(CAR_WORLD_GRID_LBP_T),
|
|
para(CAR_WORLD_MAXSIZE), para(CAR_WORLD_MAXSIZE), para(CellSize),
|
|
1, 1) ] ];
|
|
|
|
|
|
let OBACRres016 = OBACRres016Candidates1_List OBACRres016Candidates2_List
|
|
hadoopReduce2[ Layer, Layer, REDUCE_SCALE, DLF
|
|
; . sortby[Layer] {C1} .. sortby[Layer] {C2}
|
|
parajoin2[Layer_C1, Layer_C2
|
|
; . .. symmjoin[ (.Licence_C1 < ..Licence_C2)
|
|
and (.Id_RR_C1 = ..Id_RR_C2) and (.Id_PP_C1 = ..Id_PP_C2) ]
|
|
filter[ not(sometimes(distance(.UTrip_C1,.UTrip_C2) < 0.1))] ]
|
|
projectextend[; Licence1: .Licence_C1, Licence2: .Licence_C2,
|
|
Region: .Region_RR_C1, Period: .Period_PP_C1,
|
|
Id_RR: .Id_RR_C1, Id_PP: .Id_PP_C1 ]
|
|
sortby[Id_RR, Id_PP, Licence1, Licence2]
|
|
krdup[Id_RR, Id_PP, Licence1, Licence2]
|
|
project[Region, Period, Licence1, Licence2] ]
|
|
collect[] consume;
|
|
|
|
# Total runtime ... Times (elapsed / cpu): 45.8293sec / 0.12sec = 381.911
|
|
|
|
# Comment: The gridintersects operator cannot be used here,
|
|
# as units' boxes doesn't intersect with each other.
|
|
|
|
#+++Verification+++
|
|
let SQ_OBACRres016Candidates1 =
|
|
QueryLicences feed head[10]
|
|
loopsel[ fun(t:TUPLE)
|
|
dataSCcar_Licence_btree dataSCcar exactmatch[attr(t,Licence)]]
|
|
QueryPeriods feed head[10] {PP} QueryRegions feed head[10] {RR} product
|
|
product
|
|
projectextend[Licence, Region_RR, Period_PP, Id_RR, Id_PP
|
|
; Journey: (.Journey atperiods .Period_PP) at .Region_RR]
|
|
filter[no_components(.Journey) > 0] consume;
|
|
|
|
let SQ_OBACRres016Candidates2 =
|
|
QueryLicences feed head[20] filter[.Id > 10]
|
|
loopsel[ fun(t:TUPLE)
|
|
dataSCcar_Licence_btree dataSCcar exactmatch[attr(t,Licence)]]
|
|
QueryPeriods feed head[10] {PP} QueryRegions feed head[10] {RR} product
|
|
product
|
|
projectextend[Licence, Region_RR, Period_PP, Id_RR, Id_PP
|
|
; Journey: (.Journey atperiods .Period_PP) at .Region_RR]
|
|
filter[no_components(.Journey) > 0] consume;
|
|
|
|
### Increased Tolerance to 0.1m
|
|
let SQ_OBACRres016 =
|
|
SQ_OBACRres016Candidates1 feed {C1}
|
|
SQ_OBACRres016Candidates2 feed {C2}
|
|
symmjoin[ (.Licence_C1 < ..Licence_C2)
|
|
and (.Id_RR_C1 = ..Id_RR_C2)
|
|
and (.Id_PP_C1 = ..Id_PP_C2)
|
|
]
|
|
filter[ not(everNearerThan(.Journey_C1, .Journey_C2, 0.1)) ]
|
|
projectextend[; Licence1: .Licence_C1,
|
|
Licence2: .Licence_C2, Region: .Region_RR_C1,
|
|
Period: .Period_PP_C1, Id_RR: .Id_RR_C1,
|
|
Id_PP: .Id_PP_C1 ]
|
|
sortby[Id_RR, Id_PP, Licence1, Licence2]
|
|
krdup[Id_RR, Id_PP, Licence1, Licence2]
|
|
project[Region, Period, Licence1, Licence2] consume;
|
|
|
|
query SQ_OBACRres016 feed sort OBACRres016 feed sort mergediff count = 0;
|
|
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
# Q # 17 #
|
|
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
|
|
|
|
let OBACRres017PosCount_List =
|
|
QueryPoints feed project[Pos] {PP}
|
|
intstream(1, CLUSTER_SCALE) namedtransformstream[SID] product
|
|
spread["Query_Point_dup",''; SID, CLUSTER_SCALE, FALSE;]
|
|
hadoopMap[DLF
|
|
; . loopjoin[ fun(t:TUPLE) para(dataSCcar_Journey_sptuni_List)
|
|
windowintersectsS[bbox(attr(t,Pos_PP))] sort rdup
|
|
para(dataSCcar_List) gettuples filter[.Journey passes attr(t,Pos_PP)]
|
|
project[Licence] ]
|
|
projectextend[Licence; Pos: .Pos_PP]
|
|
extend[Box: scalerect(bbox(.Pos),
|
|
para(CAR_WORLD_X_SCALE), para(CAR_WORLD_Y_SCALE))]
|
|
extendstream[ Cell: cellnumber( .Box,
|
|
para(CAR_WORLD_GRID_LBP_X), para(CAR_WORLD_GRID_LBP_Y),
|
|
para(CellSize), para(CellSize), para(CellNum) )] ]
|
|
hadoopReduce[Cell, DLF, "OBACRres017PosCount", REDUCE_SCALE
|
|
; . sortby[Pos asc, Licence asc]
|
|
groupby[Pos; Hits: group feed rdup count]
|
|
];
|
|
|
|
let OBACRres017PosMaxCount =
|
|
OBACRres017PosCount_List
|
|
hadoopMap["OBACRres017PosMaxCount", DLF
|
|
; . max[Hits] feed namedtransformstream[DisMaxHits]
|
|
]
|
|
collect[] max[DisMaxHits];
|
|
|
|
|
|
let OBACRres017 =
|
|
OBACRres017PosCount_List
|
|
hadoopMap["OBACRres017", DLF
|
|
; . filter[.Hits = para(OBACRres017PosMaxCount)]
|
|
project[Pos, Hits]
|
|
]
|
|
collect[] consume;
|
|
|
|
#+++Verification+++
|
|
let SQ_OBACRres017PosCount = QueryPoints feed project[Pos] {PP}
|
|
loopjoin[ fun(t:TUPLE)
|
|
dataSCcar_Journey_sptuni
|
|
windowintersectsS[bbox(attr(t,Pos_PP))]
|
|
sort rdup dataSCcar gettuples
|
|
filter[.Journey passes attr(t,Pos_PP)]
|
|
project[Licence]
|
|
]
|
|
projectextend[Licence; Pos: .Pos_PP]
|
|
sortby[Pos asc, Licence asc]
|
|
groupby[Pos; Hits: group feed rdup count] consume;
|
|
# Total runtime ... Times (elapsed / cpu): 2.68406sec / 2.6sec = 1.03233
|
|
|
|
let SQ_OBACRres017 =
|
|
SQ_OBACRres017PosCount feed
|
|
filter[.Hits = (SQ_OBACRres017PosCount feed max[Hits])]
|
|
project[Pos, Hits] consume;
|
|
# Total runtime ... Times (elapsed / cpu): 0.117674sec / 0.02sec = 5.8837
|
|
|
|
query SQ_OBACRres017 feed sort OBACRres017 feed sort mergediff count = 0;
|
|
|
|
|
|
close database; |