Files
secondo/bin/Scripts/OrderedRelationGraphFromFullOSMImportParallelOnly.SEC

452 lines
13 KiB
Plaintext
Raw Permalink Normal View History

2026-01-23 17:03:45 +08:00
###########################################################################
#
# Construction of a Road Network from OpenStreetMap Data in OrderedRelation
# Graph Representation
#
# Uses NestedRelationAlgebra
#
# -> adapt the filename of the osm-file to be imported.
#
# running times and result sizes refer to arnsberg-regbez.osm, using
# GlobalMemory = 2 GB
#
###########################################################################
# Database hombruch
# we have 12 data servers and 36 cores
let CLUSTER_SIZE = 12
let PS_SCALE = 36
let OSM_FilePath = 'arnsberg-regbez.osm';
# Q1
# query fullosmimport(OSM_FilePath, "City")
# 155 secs, 194
# relation CityNodes with 5008465 tuples stored
# relation CityNodeTags with 389898 tuples stored
# relation CityWays with 6419349 tuples stored
# relation CityWayTags with 2193352 tuples stored
# relation CityRelations with 355347 tuples stored
# relation CityRelationTags with 52448 tuples stored
# Import the data in parallel
let SubFileName = "PartCity";
let RelPrefix = "City";
query divide_osm(OSM_FilePath, SubFileName, CLUSTER_SIZE, RelPrefix);
# 47 secs
query spreadFiles(SubFileName, '', CLUSTER_SIZE);
# 10 secs
let MapAllRel = hadoopMapAll(fullosmimport(totext(SubFileName), RelPrefix)) consume;
# 82 secs
# The distribution of above map job result
let CityDistribution = MapAllRel feed remove[Success, Result] consume;
let CityNodes_Ini_dlo = CityNodes_type createFList["CityNodes", CityDistribution, DLO, TRUE];
let CityNodeTags_Ini_dlo = CityNodeTags_type createFList["CityNodeTags", CityDistribution, DLO, TRUE];
let CityWays_Ini_dlo = CityWays_type createFList["CityWays", CityDistribution, DLO, TRUE];
let CityWayTags_Ini_dlo = CityWayTags_type createFList["CityWayTags", CityDistribution, DLO, TRUE];
let CityRelations_Ini_dlo = CityRelations_type createFList["CityRelations", CityDistribution, DLO, TRUE];
let CityRelationTags_Ini_dlo = CityRelationTags_type createFList["CityRelationTags", CityDistribution, DLO, TRUE];
# Q2 parallel
# Use the approach with a regular spatial grid
let CityNodes_NodeId_flist = CityNodes_Ini_dlo hadoopReduce[NodeId, DLF, CLUSTER_SIZE; . feed ]
# 63s
# let CityNodes = CityNodes_NodeId_flist collect[] consume;
# let CityNodesSample = CityNodes sample[1000; 0.00000001] consume
let CityNodesSample = CityNodes_Ini_dlo hadoopMap[DLF, TRUE; . sample[100; 0.00000001]] collect[] consume;
let CityAreaScaled = CityNodesSample feed
extend[Easting: .Lon * 1000000, Northing: .Lat * 1000000]
extend[Box: rectangle2(.Easting, .Easting, .Northing, .Northing)]
aggregateB[Box; fun(r1: rect, r2:rect) r1 union r2; [const rect value undef] ]
# we will define a 30 by 30 grid over this area. The resulting 900 cells will
# be mapped to 36 tasks, so each task gets 25 cells, to obtain an even distribution.
let NO_COLUMNS = 30
let CELL_SIZE_X = (maxD(CityAreaScaled, 1) - minD(CityAreaScaled, 1)) / NO_COLUMNS
let CELL_SIZE_Y = (maxD(CityAreaScaled, 2) - minD(CityAreaScaled, 2)) / NO_COLUMNS
let GRID_CityArea = createCellGrid2D(minD(CityAreaScaled, 1), minD(CityAreaScaled, 2),
CELL_SIZE_X, CELL_SIZE_Y, NO_COLUMNS)
# let CityNodes_NodeId_flist = CityNodes feed spread[; NodeId, CLUSTER_SIZE, TRUE;]
let CityNodesNew_NodeId_dlo = CityNodes_NodeId_flist
hadoopMap[DLF, FALSE; . extend[Easting: .Lon * 1000000, Northing: .Lat * 1000000]
extend[Box: rectangle2(.Easting, .Easting, .Northing, .Northing)]
extendstream[Cell: cellnumber(.Box, GRID_CityArea)]
extend[Task: .Cell mod PS_SCALE] ]
hadoopReduce[Task, DLF, PS_SCALE; . sortby[Box] addcounter[No, 1]
projectextend[NodeId; Pos: makepoint(.Lon, .Lat),
NodeIdNew: (.Task * 10000000) + .No] ]
hadoopReduce[NodeId, DLO, CLUSTER_SIZE; . consume]
# Q3 parallel
# let CityWays_NodeRef_flist = CityWays feed spread[; NodeRef, CLUSTER_SIZE, TRUE;]
# 32 secs
let CityWays_NodeRef_flist = CityWays_Ini_dlo hadoopReduce[NodeRef, DLF, CLUSTER_SIZE; . feed ]
# 60s
# let CityWayTags_WayIdInTag_flist = CityWayTags feed spread[; WayIdInTag, CLUSTER_SIZE, TRUE;]
# 28 secs
let CityWayTags_WayIdInTag_flist = CityWayTags_Ini_dlo hadoopReduce[WayIdInTag, DLF, CLUSTER_SIZE; . feed ]
# 62s
let Ways_WayId_dlo = CityNodesNew_NodeId_dlo
CityWays_NodeRef_flist
hadoopReduce2[NodeId, NodeRef, DLF, PS_SCALE; . feed .. itHashJoin[NodeId, NodeRef] ]
CityWayTags_WayIdInTag_flist
hadoopReduce2[WayId, WayIdInTag, DLO, CLUSTER_SIZE; . sortby[WayId, NodeCounter]
nest[WayId; NodeList]
extend[Curve: .NodeList afeed projecttransformstream[Pos] collect_line[TRUE]]
.. nest[WayIdInTag; WayInfo]
itHashJoin[WayId, WayIdInTag]
extend[Box: bbox(.Curve) scale[1000000.0]]
sortby[Box]
remove[Box]
consume ]
# 3:20min (200.108sec)
# check the result:
# query Ways_WayId_dlo hadoopMap[DLF; . feed count feed transformstream] collect[] consume
# 34 secs
# even distribution
# query Ways_WayId_dlo hadoopMap[DLF; . feed count feed transformstream] collect[] sum[Elem]
# 39 secs
# 752888 correct
# query Ways_WayId_dlo hadoopMap[DLF; . feed project[WayId, Curve] head[500] ]
# collect[] consume
# 38 secs
# 6000 Ways appear in Javagui
# Q4 parallel
let Ways_WayId_Curve_rtree_dlo = Ways_WayId_dlo
hadoopMap[DLO; . feed addid projectextend[TID; Box: bbox(.Curve)] bulkloadrtree[Box] ]
# 41.93 secs
# query Ways feed addid projectextend[TID; Box: bbox(.Curve)] bulkloadrtree[Box]
# query nodes(Ways feed addid projectextend[TID; Box: bbox(.Curve)] bulkloadrtree[Box]) consume
# Q5 parallel
let Roads_WayId_dlo = Ways_WayId_dlo hadoopMap[DLO; . feed filter[.WayInfo afeed filter[.WayTagKey = "highway"] count > 0] consume ]
# 54.72 secs
# check the result:
# query Roads_WayId_dlo hadoopMap[DLF; . feed count feed transformstream] collect[] consume
# ok
# query Roads_WayId_dlo hadoopMap[DLF; . feed count feed transformstream] collect[] sum[Elem]
# 242556 correct
# Q6 parallel
let Roads_WayId_Curve_rtree_dlo = Roads_WayId_dlo
hadoopMap[DLO; . feed addid projectextend[TID; Box: bbox(.Curve)] bulkloadrtree[Box] ]
# 33.52 secs
# Q7 parallel
let RoadsWayId_WayId_flist = Roads_WayId_dlo hadoopMap[DLF; . feed project[WayId] ]
# 34.78 secs
let Nodes_WayId_flist =
CityWays_NodeRef_flist
CityWays_NodeRef_flist
hadoopReduce2[NodeRef, NodeRef, DLF, PS_SCALE; . .. {h2}
itHashJoin[NodeRef, NodeRef_h2] filter[.WayId # .WayId_h2] ]
CityNodesNew_NodeId_dlo
hadoopReduce2[NodeRef, NodeId, DLF, PS_SCALE; . .. feed itHashJoin[NodeRef, NodeId] ]
RoadsWayId_WayId_flist
hadoopReduce2[WayId, WayId, DLF, PS_SCALE; . .. {r1} itHashJoin[WayId, WayId_r1] ]
RoadsWayId_WayId_flist
hadoopReduce2[WayId_h2, WayId, DLF, PS_SCALE; . .. {r2}
itHashJoin[WayId_h2, WayId_r2]
project[WayId, NodeCounter, NodeIdNew, Pos] ]
Roads_WayId_dlo
hadoopMap[DLF, FALSE; . feed
projectextend[WayId; Node: .NodeList afeed filter[.NodeCounter = 0] aconsume]
unnest[Node]
project[WayId, NodeCounter, NodeIdNew, Pos] ]
hadoopReduce2[WayId, WayId, DLF, PS_SCALE; . .. concat ]
Roads_WayId_dlo
hadoopMap[DLF, FALSE; . feed extend[HighNodeNo: (.NodeList afeed count) - 1]
projectextend[WayId; Node: fun(t: TUPLE)
attr(t, NodeList) afeed filter[.NodeCounter = attr(t, HighNodeNo)] aconsume]
unnest[Node]
project[WayId, NodeCounter, NodeIdNew, Pos] ]
hadoopReduce2[WayId, WayId, DLF, PS_SCALE; . .. concat
sortby[WayId, NodeCounter]
rdup ]
# 5:45 mins (344 secs)
# 722693 correct
# query Nodes_WayId_flist hadoopMap[DLF; . count feed transformstream] collect[] sum[Elem]
# Q8 parallel
# Split the above query
let Nodes_WayId_dlo = Nodes_WayId_flist
hadoopReduce[WayId, DLO, CLUSTER_SIZE; . nest[WayId; SectionNodes]
projectextend[WayId; Sections: .SectionNodes afeed
extend_last[Source: ..NodeIdNew::0, Target: .NodeIdNew::0,
SourcePos: ..Pos::[const point value undef],
TargetPos: .Pos::[const point value undef],
SourceNodeCounter: ..NodeCounter::0,
TargetNodeCounter: .NodeCounter::0]
filter[.Source # 0]
project[Source, Target, SourcePos, TargetPos,
SourceNodeCounter, TargetNodeCounter]
aconsume] consume]
# 1:27min (86.9874sec)
let EdgesUp_WayId_flist = Nodes_WayId_dlo
hadoopMap[DLF, TRUE; . feed
para(Roads_WayId_dlo) feed {r}
itHashJoin[WayId, WayId_r]
projectextend[WayId; Sections: fun(t:TUPLE)
attr(t, Sections) afeed
extend[Curve: fun(u: TUPLE)
attr(t, NodeList_r) afeed
filter[.NodeCounter_r between[attr(u, SourceNodeCounter),
attr(u, TargetNodeCounter)] ]
projecttransformstream[Pos_r]
collect_sline[TRUE],
RoadName: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "name"] extract
[WayTagValue_r],
RoadType: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "highway"] extract
[WayTagValue_r],
MaxSpeed: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "maxspeed"] extract
[WayTagValue_r]
]
aconsume ]
unnest[Sections] ]
# 49.78 secs
# 480137 correct
# query EdgesUp_WayId_flist hadoopMap[DLF; . count feed transformstream] collect[] sum[Elem]
# Q9 parallel
let NodesDesc_WayId_dlo = Nodes_WayId_flist
hadoopReduce[WayId, DLO, CLUSTER_SIZE; . nest[WayId; SectionNodes]
projectextend[WayId; Sections: .SectionNodes afeed sortby[NodeCounter desc]
extend_last[Source: ..NodeIdNew::0, Target: .NodeIdNew::0,
SourcePos: ..Pos::[const point value undef],
TargetPos: .Pos::[const point value undef],
SourceNodeCounter: ..NodeCounter::0,
TargetNodeCounter: .NodeCounter::0]
filter[.Source # 0]
project[Source, Target, SourcePos, TargetPos,
SourceNodeCounter, TargetNodeCounter]
aconsume] consume]
# 46.9466sec
let EdgesDown_WayId_flist = NodesDesc_WayId_dlo
hadoopMap[DLF; . feed para(Roads_WayId_dlo) feed
filter[.WayInfo afeed filter[.WayTagKey = "oneway"]
filter[(.WayTagValue = "yes")] count = 0] {r}
itHashJoin[WayId, WayId_r]
projectextend[WayId; Sections: fun(t:TUPLE)
attr(t, Sections) afeed extend[Curve: fun(u: TUPLE)
attr(t, NodeList_r) afeed sortby[NodeCounter_r desc]
filter[.NodeCounter_r between[attr(u, TargetNodeCounter),
attr(u, SourceNodeCounter)] ]
projecttransformstream[Pos_r]
collect_sline[TRUE],
RoadName: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "name"] extract
[WayTagValue_r],
RoadType: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "highway"] extract
[WayTagValue_r],
MaxSpeed: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "maxspeed"] extract
[WayTagValue_r]
]
aconsume ]
unnest[Sections] ]
# 53.3496sec
# 453944 correct
# query EdgesDown_WayId_flist hadoopMap[DLF; . count feed transformstream ] collect[] sum[Elem]
# correct
# Q10 parallel
# correct way (CLUSTER_SIZE parameter omitted, is default)
let Edges_WayId_dlo = EdgesUp_WayId_flist EdgesDown_WayId_flist
hadoopReduce2[WayId, WayId, DLO; . .. concat
projectextend[Source, Target, SourcePos, TargetPos, SourceNodeCounter, TargetNodeCounter, Curve, RoadName,
RoadType, MaxSpeed; WayId: .WayId]
oconsume[Source, Target] ]
# 50.6366sec
# 934081 correct
# query Edges_WayId_dlo hadoopMap[DLF; . feed count feed transformstream ] collect[] sum[Elem]
let Edges1 = Edges_WayId_dlo hadoopMap[DLF; . feed] collect[] consume
# 3:18min (198.265sec) China
let Edges2 = Edges1 feed sortby[Source, Target] oconsume[Source, Target]
# 3:20min (199.584sec)
let Edges = Edges_WayId_dlo hadoopMap[DLF; . feed] collect[]
sortby[Source, Target] sortby[Source, Target] oconsume[Source, Target]
# takes forever (memory exceeded, swapping)
# query Edges_WayId_dlo hadoopMap[DLF; . feed] collect[] count
# let EdgeIndex = Edges feed projectextend[Source, Target, Curve, SourcePos; Box: bbox(.Curve)]
# filter[isdefined(.Box)]
# extend[Box2: bbox(.SourcePos) scale[1000000.0]]
# sortby[Box2]
# project[Source, Target, Curve, Box]
# consume
# 99 secs
# query nodes(EdgeIndex feed addid bulkloadrtree[Box]) consume
# derive EdgeIndex_Box_rtree = EdgeIndex feed addid bulkloadrtree[Box]
# 9 secs
# Queries
#
# Get edges from a restricted area:
# let hombruch = [const rect value (7.419 7.457 51.462 51.484)]
# query EdgeIndex_Box_rtree EdgeIndex windowintersects[hombruch] remove[Box]
# loopsel[Edges orange[.Source, .Target; .Source, .Target]] consume
# 2.17 secs, 3.83, 2.02
# query Ways_Curve_rtree windowintersectsS[hombruch] Ways gettuples remove[NodeList] consume
# 5.08 secs, 4.03, 5.66
# Runtime for Scripts/OrderedRelationGraphFromFullOSMImport.SEC: Times (elapsed / cpu):
# 84:56min (5095.72sec) /2510.84sec = 2.02949
#Runtime for Scripts/OrderedRelationGraphFromFullOSMImportParallelOnly.SEC: Times (elapsed / cpu): 25:23min (1522.81sec) /0.04sec = 38070.3
#File Scripts/OrderedRelationGraphFromFullOSMImportParallelOnly.SEC successful processed.