Files
secondo/bin/Scripts/importGermanyOsm.sec

662 lines
17 KiB
Plaintext
Raw Permalink Normal View History

2026-01-23 17:03:45 +08:00
/*
//paragraph [10] title: [{\Large \bf ] [}]
//[Contents] [\tableofcontents]
//[ue] [\"{u}]
[10] Parallel Construction of a Road Network Graph
[10] from OpenStreetMap Data
Ralf Hartmut G[ue]ting, August 2016
[Contents]
1 Overview
This script constructs the edges of a road network from OpenStreetMap data. It is a parallel version of the script ~OrderedRelationGraphFromFullOsmImport.sec~, based on the ~Distributed2Algebra~.
It is a commented script suitable for viewing with ~pdview~. To be run with
---- @%Scripts/importGermanyOsm.sec or
@&Scripts/importGermanyOsm.sec
----
0 Preparations
Preparations:
* get the OSM-file and put it to the correct position on each computer having workers, adapt the path below. Note that the path must be the same for all workers.
* create and open database on master
* restore Workers
*/
# create database arnsberg5
# open database arnsberg5
restore WorkersNewton from WorkersNewton
/*
1 Set Parameters
*/
let Workers = WorkersNewton
let NWorkers = Workers count
let NSlots = 160
query share("NSlots", TRUE, Workers)
let File = '/home/ralf/Daten/germany-latest.osm'
query share("File", TRUE, Workers)
let Results = [const rel(tuple([
Object: string,
Cardinality: int,
ElapsedTime: real]))
value () ]
let WorkerInfo = showWorkers() consume
query Results inserttuple["Start", NSlots,
SEC2COMMANDS feed tail[9] sum[ElapsedTime]] consume
/*
1 Divide Osm-File
*/
let ControlSlots = intstream(0, NSlots - 1) transformstream
ddistribute3["ControlSlots", NSlots, TRUE, Workers]
dloop["", . feed extract[Elem]]
let Division = ControlSlots
dloop["Division", divide_osm3(File, NSlots, .)]
query Results inserttuple["Division", NSlots,
SEC2COMMANDS feed tail[2] sum[ElapsedTime]] consume
/*
2 Import OSM Data on Workers
*/
let Import = ControlSlots
dloop["Import", fullosmimport(File + "_" + num2string(.), "City", .)]
query Results inserttuple["Import", NSlots,
SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume
/*
6 Create DArrays on Top of Distributed Objects
*/
(let CityNodes_type = (
(rel (tuple ((NodeId longint) (Lat real) (Lon real))))
() )
)
(let CityNodeTags_type = (
(rel (tuple ((NodeIdInTag longint) (NodeTagKey text)
(NodeTagValue text))))
() )
)
(let CityWays_type = (
(rel (tuple ((WayId longint) (NodeCounter int) (NodeRef longint))))
() )
)
(let CityWayTags_type = (
(rel (tuple ((WayIdInTag longint) (WayTagKey text)
(WayTagValue text))))
() )
)
(let CityRelations_type = (
(rel (tuple ((RelId longint) (RefCounter int) (MemberType text)
(MemberRef longint) (MemberRole text))))
() )
)
(let CityRelationTags_type = (
(rel (tuple ((RelIdInTag longint) (RelTagKey text)
(RelTagValue text))))
() )
)
let CityNodesB0 = Workers feed
createDArray["CityNodes", NSlots, CityNodes_type, TRUE]
let CityNodeTagsB0 = Workers feed
createDArray["CityNodeTags", NSlots, CityNodeTags_type, TRUE]
let CityWaysB0 = Workers feed
createDArray["CityWays", NSlots, CityWays_type, TRUE]
let CityWayTagsB0 = Workers feed
createDArray["CityWayTags", NSlots, CityWayTags_type, TRUE]
let CityRelationsB0 = Workers feed
createDArray["CityRelations", NSlots, CityRelations_type, TRUE]
let CityRelationTagsB0 = Workers feed
createDArray["CityRelationTags", NSlots, CityRelationTags_type, TRUE]
query Results inserttuple["Create DArrays", NSlots,
SEC2COMMANDS feed tail[12] sum[ElapsedTime]] consume
/*
7 Prepare Cost Measurements
*/
let ControlWorkers = intstream(0, NWorkers - 1) transformstream
ddistribute3["ControlWorkers", NWorkers, TRUE, Workers]
dloop["", . feed extract[Elem]]
@%Scripts/DistCost.sec
/*
8 Redistribute Nodes, Ways, and WayTags
*/
update LastCommand := distCostReset(ControlWorkers)
let CityNodesB1_NodeId = CityNodesB0 partitionF["", . feed,
hashvalue(..NodeId, 999997), NSlots]
collect2["CityNodesB1", 1238]
let NCityNodes = CityNodesB1_NodeId dloop["", . feed count] getValue tie[. + ..]
query Results inserttuple["CityNodesB1_NodeId", NCityNodes,
SEC2COMMANDS feed tail[2] sum[ElapsedTime]] consume
let Cost1 = distCostSave(ControlWorkers);
update LastCommand := distCostReset(ControlWorkers)
let CityWaysB1_NodeRef = CityWaysB0 partitionF["", . feed,
hashvalue(..NodeRef, 999997), NSlots]
collect2["CityWaysB1", 1238]
query Results inserttuple["CityWaysB1_NodeRef",
CityWaysB1_NodeRef dloop["", . feed count] getValue tie[. + ..],
SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume
let Cost2 = distCostSave(ControlWorkers);
update LastCommand := distCostReset(ControlWorkers)
let CityWayTagsB1_WayIdInTag = CityWayTagsB0 partitionF["", . feed,
hashvalue(..WayIdInTag, 999997), NSlots]
collect2["CityWayTagsB1", 1238]
query Results inserttuple["CityWayTagsB1_WayIdInTag",
CityWayTagsB1_WayIdInTag dloop["", . feed count] getValue tie[. + ..],
SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume
let Cost3 = distCostSave(ControlWorkers);
update LastCommand := distCostReset(ControlWorkers)
/*
9 Assign New NodeIds to Nodes Clustered Spatially per Slot
*/
query CityNodesB1_NodeId dloop["", . feed count] getValue
# all values about 4.8 million, all less than 5 million.
update LastCommand := distCostReset(ControlWorkers)
/*
Disjoint numbering of nodes.
*/
let MaxNodesPerSlot = (NCityNodes div NSlots) * 3
query share("MaxNodesPerSlot", TRUE, Workers)
let CityNodesNewB1_NodeId =
CityNodesB1_NodeId ControlSlots dmap2["CityNodesNewB1",
. feed
extend[Easting: .Lon * 1000000, Northing: .Lat * 1000000]
extend[Box: rectangle2(.Easting, .Easting, .Northing, .Northing)]
sortby[Box]
projectextend[NodeId; Pos: makepoint(.Lon, .Lat)]
addcounter[NodeIdNew, (.. * MaxNodesPerSlot) + 1] , 1238]
query Results inserttuple["CityNodesNewB1_NodeId",
CityNodesNewB1_NodeId dloop["", . feed count] getValue tie[. + ..],
SEC2COMMANDS feed tail[3] sum[ElapsedTime]] consume
let Cost5 = distCostSave(ControlWorkers);
update LastCommand := distCostReset(ControlWorkers)
/*
10 Construct Ways and Roads
----
let Ways =
CityNodesNew feed
CityWays feed itHashJoin[NodeId, NodeRef]
sortby[WayId, NodeCounter] nest[WayId; NodeList]
extend[Curve : .NodeList afeed projecttransformstream[Pos]
collect_line[TRUE]]
CityWayTags feed nest[WayIdInTag; WayInfo] itHashJoin[WayId, WayIdInTag]
extend[Box: bbox(.Curve) scale[1000000.0]]
sortby[Box]
remove[Box]
consume
----
*/
let WaysB1_WayId = CityNodesNewB1_NodeId CityWaysB1_NodeRef dmap2["",
. feed
.. feed itHashJoin[NodeId, NodeRef], 1238]
partitionF["", . feed, hashvalue(..WayId, 999997), 0]
collect2["WaysB1", 1238]
query Results inserttuple["WaysB1_WayId",
WaysB1_WayId dloop["", . feed count] getValue tie[. + ..],
SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume
# let Cost6 = distCostSave(ControlWorkers);
update LastCommand := distCostReset(ControlWorkers)
let WaysB3_WayId = WaysB1_WayId CityWayTagsB1_WayIdInTag dmap2["",
. feed sortby[WayId, NodeCounter] nest[WayId; NodeList]
extend[Curve : .NodeList afeed projecttransformstream[Pos]
collect_line[TRUE]]
.. feed sortby[WayIdInTag] nest[WayIdInTag; WayInfo]
itHashJoin[WayId, WayIdInTag]
extend[Box: bbox(.Curve) scale[1000000.0]]
sortby[Box]
remove[Box]
consume, 1238]
query Results inserttuple["WaysB3_WayId",
WaysB3_WayId dloop["", . feed count] getValue tie[. + ..],
SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume
let Cost7 = distCostSave(ControlWorkers);
update LastCommand := distCostReset(ControlWorkers)
/*
----
let Roads = Ways feed
filter[.WayInfo afeed filter[.WayTagKey = "highway"] count > 0] consume
----
*/
let RoadsB1_WayId = WaysB3_WayId
dloop["RoadsB1_WayId", . feed
filter[.WayInfo afeed filter[.WayTagKey = "highway"] count > 0]
consume]
query Results inserttuple["RoadsB1_WayId",
RoadsB1_WayId dloop["", . feed count] getValue tie[. + ..],
SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume
/*
11 Construct Nodes
----
let Nodes =
CityWays feed
CityWays feed {h2}
itHashJoin[NodeRef, NodeRef_h2]
filter[.WayId # .WayId_h2]
CityNodesNew feed
itHashJoin[NodeRef, NodeId]
Roads feed {r1} itHashJoin[WayId, WayId_r1]
Roads feed {r2} itHashJoin[WayId_h2, WayId_r2]
project[WayId, NodeCounter, NodeIdNew, Pos]
Roads feed
projectextend[WayId; Node: .NodeList afeed filter[.NodeCounter = 0]
aconsume]
unnest[Node]
project[WayId, NodeCounter, NodeIdNew, Pos]
concat
Roads feed
extend[HighNodeNo: (.NodeList afeed count) - 1]
projectextend[WayId; Node: fun(t: TUPLE)
attr(t, NodeList) afeed filter[.NodeCounter = attr(t, HighNodeNo)]
aconsume]
unnest[Node]
project[WayId, NodeCounter, NodeIdNew, Pos]
concat
sortby[WayId, NodeCounter]
rdup
consume
----
*/
let NodesB1 =
CityWaysB1_NodeRef CityNodesNewB1_NodeId dmap2["",
. feed
. feed {h2}
itHashJoin[NodeRef, NodeRef_h2]
filter[.WayId # .WayId_h2]
.. feed
itHashJoin[NodeRef, NodeId]
project[WayId, NodeCounter, NodeIdNew, Pos, WayId_h2], 1238
]
partitionF["", . feed, hashvalue(..WayId, 999997), NSlots]
collect2["", 1238]
RoadsB1_WayId dmap2["", . feed .. feed project[WayId] {r1}
itHashJoin[WayId, WayId_r1], 1238]
partitionF["", . feed, hashvalue(..WayId_h2, 999997), NSlots]
collect2["", 1238]
RoadsB1_WayId dmap2["", . feed .. feed project[WayId] {r2}
itHashJoin[WayId_h2, WayId_r2]
project[WayId, NodeCounter, NodeIdNew, Pos], 1238]
query Results inserttuple["NodesB1",
NodesB1 dloop["", . feed count] getValue tie[. + ..],
SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume
let NodesB2 =
RoadsB1_WayId dloop["", fun(r: DARRAYELEM) r feed
projectextend[WayId; Node: .NodeList afeed filter[.NodeCounter = 0]
aconsume]
unnest[Node]
project[WayId, NodeCounter, NodeIdNew, Pos]
r feed
extend[HighNodeNo: (.NodeList afeed count) - 1]
projectextend[WayId; Node: fun(t: TUPLE)
attr(t, NodeList) afeed filter[.NodeCounter = attr(t, HighNodeNo)]
aconsume]
unnest[Node]
project[WayId, NodeCounter, NodeIdNew, Pos]
concat consume
]
query Results inserttuple["NodesB2",
NodesB2 dloop["", . feed count] getValue tie[. + ..],
SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume
let NodesB3_WayId = NodesB1 NodesB2 dmap2["",
. feed .. feed concat, 1238]
partitionF["", . feed, hashvalue(..WayId, 999997), NSlots]
collect2["", 1238]
dmap["", . feed sortby[WayId, NodeCounter] rdup consume]
query Results inserttuple["NodesB3_WayId",
NodesB3_WayId dloop["", . feed count] getValue tie[. + ..],
SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume
/*
12 Construct Edges
12.1 EdgesUp
----
let EdgesUp =
Nodes feed nest[WayId; SectionNodes]
projectextend[WayId; Sections: .SectionNodes afeed
extend_last[Source: ..NodeIdNew::0, Target: .NodeIdNew::0,
SourcePos: ..Pos::[const point value undef],
TargetPos: .Pos::[const point value undef],
SourceNodeCounter: ..NodeCounter::0,
TargetNodeCounter: .NodeCounter::0]
filter[.Source # 0]
project[Source, Target, SourcePos, TargetPos,
SourceNodeCounter, TargetNodeCounter]
aconsume]
Roads feed {r}
itHashJoin[WayId, WayId_r]
projectextend[WayId; Sections: fun(t:TUPLE)
attr(t, Sections) afeed
extend[
Curve: fun(u: TUPLE)
attr(t, NodeList_r) afeed
filter[.NodeCounter_r between[attr(u, SourceNodeCounter),
attr(u, TargetNodeCounter)] ]
projecttransformstream[Pos_r]
collect_sline[TRUE],
RoadName: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "name"]
extract[WayTagValue_r],
RoadType: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "highway"]
extract[WayTagValue_r]
]
aconsume ]
unnest[Sections]
consume
----
*/
let NodesB4_WayId = NodesB3_WayId dloop["",
. feed nest[WayId; SectionNodes]
projectextend[WayId; Sections: .SectionNodes afeed
extend_last[Source: ..NodeIdNew::0, Target: .NodeIdNew::0,
SourcePos: ..Pos::[const point value undef],
TargetPos: .Pos::[const point value undef],
SourceNodeCounter: ..NodeCounter::0,
TargetNodeCounter: .NodeCounter::0]
filter[.Source # 0]
project[Source, Target, SourcePos, TargetPos,
SourceNodeCounter, TargetNodeCounter]
aconsume]
consume]
query Results inserttuple["NodesB4_WayId",
NodesB4_WayId dloop["", . feed count] getValue tie[. + ..],
SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume
let EdgesUpB1_WayId = NodesB4_WayId RoadsB1_WayId dloop2["",
. feed .. feed {r}
itHashJoin[WayId, WayId_r]
projectextend[WayId; Sections: fun(t:TUPLE)
attr(t, Sections) afeed
extend[
Curve: fun(u: TUPLE)
attr(t, NodeList_r) afeed
filter[.NodeCounter_r between[attr(u, SourceNodeCounter),
attr(u, TargetNodeCounter)] ]
projecttransformstream[Pos_r]
collect_sline[TRUE],
RoadName: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "name"]
extract[WayTagValue_r],
RoadType: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "highway"]
extract[WayTagValue_r]
]
aconsume ]
unnest[Sections] consume]
query Results inserttuple["EdgesUpB1_WayId",
EdgesUpB1_WayId dloop["", . feed count] getValue tie[. + ..],
SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume
/*
12.2 EdgesDown
----
let EdgesDown =
Nodes feed nest[WayId; SectionNodes]
projectextend[WayId; Sections: .SectionNodes afeed
sortby[NodeCounter desc]
extend_last[Source: ..NodeIdNew::0, Target: .NodeIdNew::0,
SourcePos: ..Pos::[const point value undef],
TargetPos: .Pos::[const point value undef],
SourceNodeCounter: ..NodeCounter::0,
TargetNodeCounter: .NodeCounter::0]
filter[.Source # 0]
project[Source, Target, SourcePos, TargetPos,
SourceNodeCounter, TargetNodeCounter]
aconsume]
Roads feed
filter[.WayInfo afeed filter[.WayTagKey = "oneway"]
filter[(.WayTagValue = "yes")] count = 0] {r}
itHashJoin[WayId, WayId_r]
projectextend[WayId; Sections: fun(t:TUPLE)
attr(t, Sections) afeed extend[Curve: fun(u: TUPLE)
attr(t, NodeList_r) afeed sortby[NodeCounter_r desc]
filter[.NodeCounter_r between[attr(u, TargetNodeCounter),
attr(u, SourceNodeCounter)] ]
projecttransformstream[Pos_r]
collect_sline[TRUE],
RoadName: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "name"]
extract[WayTagValue_r],
RoadType: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "highway"]
extract[WayTagValue_r]
]
aconsume ]
unnest[Sections]
consume
----
*/
let NodesB5_WayId = NodesB3_WayId dloop["",
. feed nest[WayId; SectionNodes]
projectextend[WayId; Sections: .SectionNodes afeed
sortby[NodeCounter desc]
extend_last[Source: ..NodeIdNew::0, Target: .NodeIdNew::0,
SourcePos: ..Pos::[const point value undef],
TargetPos: .Pos::[const point value undef],
SourceNodeCounter: ..NodeCounter::0,
TargetNodeCounter: .NodeCounter::0]
filter[.Source # 0]
project[Source, Target, SourcePos, TargetPos,
SourceNodeCounter, TargetNodeCounter]
aconsume]
consume]
query Results inserttuple["NodesB5_WayId",
NodesB5_WayId dloop["", . feed count] getValue tie[. + ..],
SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume
let EdgesDownB1_WayId = NodesB5_WayId RoadsB1_WayId dloop2["",
. feed
.. feed filter[.WayInfo afeed filter[.WayTagKey = "oneway"]
filter[(.WayTagValue = "yes")] count = 0] {r}
itHashJoin[WayId, WayId_r]
projectextend[WayId; Sections: fun(t:TUPLE)
attr(t, Sections) afeed extend[Curve: fun(u: TUPLE)
attr(t, NodeList_r) afeed sortby[NodeCounter_r desc]
filter[.NodeCounter_r between[attr(u, TargetNodeCounter),
attr(u, SourceNodeCounter)] ]
projecttransformstream[Pos_r]
collect_sline[TRUE],
RoadName: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "name"]
extract[WayTagValue_r],
RoadType: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "highway"]
extract[WayTagValue_r]
]
aconsume ]
unnest[Sections]
consume]
query Results inserttuple["EdgesDownB1_WayId",
EdgesDownB1_WayId dloop["", . feed count] getValue tie[. + ..],
SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume
/*
12.3 Edges
----
let Edges = EdgesUp feed EdgesDown feed concat
projectextend[Source, Target, SourcePos, TargetPos, SourceNodeCounter,
TargetNodeCounter, Curve, RoadName,
RoadType; WayId: .WayId]
oconsume[Source, Target]
----
*/
let EdgesB1_WayId = EdgesUpB1_WayId EdgesDownB1_WayId dloop2["",
. feed .. feed concat
projectextend[Source, Target, SourcePos, TargetPos, SourceNodeCounter,
TargetNodeCounter, Curve, RoadName,
RoadType; WayId: .WayId]
consume ]
query Results inserttuple["EdgesB1_WayId",
EdgesB1_WayId dloop["", . feed count] getValue tie[. + ..],
SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume
/*
13 Test
*/
let hombruch = [const rect value (7.419 7.457 51.462 51.484)]
query share("hombruch", TRUE, Workers)
let hombruchEdges = EdgesB1_WayId dloop["", . feed
filter[bbox(.Curve) intersects hombruch] consume]
dsummarize consume
let Commands = SEC2COMMANDS feed consume
let TotalTime = Commands feed filter[not(.CmdStr contains "Results")]
filter[not(.CmdStr contains "distCost")] sum[ElapsedTime]