/* //paragraph [10] title: [{\Large \bf ] [}] //[Contents] [\tableofcontents] //[ue] [\"{u}] [10] Parallel Construction of a Road Network Graph [10] from OpenStreetMap Data Ralf Hartmut G[ue]ting, August 2016 [Contents] 1 Overview This script constructs the edges of a road network from OpenStreetMap data. It is a parallel version of the script ~OrderedRelationGraphFromFullOsmImport.sec~, based on the ~Distributed2Algebra~. It is a commented script suitable for viewing with ~pdview~. To be run with ---- @%Scripts/importGermanyOsm.sec or @&Scripts/importGermanyOsm.sec ---- 0 Preparations Preparations: * get the OSM-file and put it to the correct position on each computer having workers, adapt the path below. Note that the path must be the same for all workers. * create and open database on master * restore Workers */ # create database arnsberg5 # open database arnsberg5 restore WorkersNewton from WorkersNewton /* 1 Set Parameters */ let Workers = WorkersNewton let NWorkers = Workers count let NSlots = 160 query share("NSlots", TRUE, Workers) let File = '/home/ralf/Daten/germany-latest.osm' query share("File", TRUE, Workers) let Results = [const rel(tuple([ Object: string, Cardinality: int, ElapsedTime: real])) value () ] let WorkerInfo = showWorkers() consume query Results inserttuple["Start", NSlots, SEC2COMMANDS feed tail[9] sum[ElapsedTime]] consume /* 1 Divide Osm-File */ let ControlSlots = intstream(0, NSlots - 1) transformstream ddistribute3["ControlSlots", NSlots, TRUE, Workers] dloop["", . feed extract[Elem]] let Division = ControlSlots dloop["Division", divide_osm3(File, NSlots, .)] query Results inserttuple["Division", NSlots, SEC2COMMANDS feed tail[2] sum[ElapsedTime]] consume /* 2 Import OSM Data on Workers */ let Import = ControlSlots dloop["Import", fullosmimport(File + "_" + num2string(.), "City", .)] query Results inserttuple["Import", NSlots, SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume /* 6 Create DArrays on Top of Distributed Objects */ (let CityNodes_type = ( (rel (tuple ((NodeId longint) (Lat real) (Lon real)))) () ) ) (let CityNodeTags_type = ( (rel (tuple ((NodeIdInTag longint) (NodeTagKey text) (NodeTagValue text)))) () ) ) (let CityWays_type = ( (rel (tuple ((WayId longint) (NodeCounter int) (NodeRef longint)))) () ) ) (let CityWayTags_type = ( (rel (tuple ((WayIdInTag longint) (WayTagKey text) (WayTagValue text)))) () ) ) (let CityRelations_type = ( (rel (tuple ((RelId longint) (RefCounter int) (MemberType text) (MemberRef longint) (MemberRole text)))) () ) ) (let CityRelationTags_type = ( (rel (tuple ((RelIdInTag longint) (RelTagKey text) (RelTagValue text)))) () ) ) let CityNodesB0 = Workers feed createDArray["CityNodes", NSlots, CityNodes_type, TRUE] let CityNodeTagsB0 = Workers feed createDArray["CityNodeTags", NSlots, CityNodeTags_type, TRUE] let CityWaysB0 = Workers feed createDArray["CityWays", NSlots, CityWays_type, TRUE] let CityWayTagsB0 = Workers feed createDArray["CityWayTags", NSlots, CityWayTags_type, TRUE] let CityRelationsB0 = Workers feed createDArray["CityRelations", NSlots, CityRelations_type, TRUE] let CityRelationTagsB0 = Workers feed createDArray["CityRelationTags", NSlots, CityRelationTags_type, TRUE] query Results inserttuple["Create DArrays", NSlots, SEC2COMMANDS feed tail[12] sum[ElapsedTime]] consume /* 7 Prepare Cost Measurements */ let ControlWorkers = intstream(0, NWorkers - 1) transformstream ddistribute3["ControlWorkers", NWorkers, TRUE, Workers] dloop["", . feed extract[Elem]] @%Scripts/DistCost.sec /* 8 Redistribute Nodes, Ways, and WayTags */ update LastCommand := distCostReset(ControlWorkers) let CityNodesB1_NodeId = CityNodesB0 partitionF["", . feed, hashvalue(..NodeId, 999997), NSlots] collect2["CityNodesB1", 1238] let NCityNodes = CityNodesB1_NodeId dloop["", . feed count] getValue tie[. + ..] query Results inserttuple["CityNodesB1_NodeId", NCityNodes, SEC2COMMANDS feed tail[2] sum[ElapsedTime]] consume let Cost1 = distCostSave(ControlWorkers); update LastCommand := distCostReset(ControlWorkers) let CityWaysB1_NodeRef = CityWaysB0 partitionF["", . feed, hashvalue(..NodeRef, 999997), NSlots] collect2["CityWaysB1", 1238] query Results inserttuple["CityWaysB1_NodeRef", CityWaysB1_NodeRef dloop["", . feed count] getValue tie[. + ..], SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume let Cost2 = distCostSave(ControlWorkers); update LastCommand := distCostReset(ControlWorkers) let CityWayTagsB1_WayIdInTag = CityWayTagsB0 partitionF["", . feed, hashvalue(..WayIdInTag, 999997), NSlots] collect2["CityWayTagsB1", 1238] query Results inserttuple["CityWayTagsB1_WayIdInTag", CityWayTagsB1_WayIdInTag dloop["", . feed count] getValue tie[. + ..], SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume let Cost3 = distCostSave(ControlWorkers); update LastCommand := distCostReset(ControlWorkers) /* 9 Assign New NodeIds to Nodes Clustered Spatially per Slot */ query CityNodesB1_NodeId dloop["", . feed count] getValue # all values about 4.8 million, all less than 5 million. update LastCommand := distCostReset(ControlWorkers) /* Disjoint numbering of nodes. */ let MaxNodesPerSlot = (NCityNodes div NSlots) * 3 query share("MaxNodesPerSlot", TRUE, Workers) let CityNodesNewB1_NodeId = CityNodesB1_NodeId ControlSlots dmap2["CityNodesNewB1", . feed extend[Easting: .Lon * 1000000, Northing: .Lat * 1000000] extend[Box: rectangle2(.Easting, .Easting, .Northing, .Northing)] sortby[Box] projectextend[NodeId; Pos: makepoint(.Lon, .Lat)] addcounter[NodeIdNew, (.. * MaxNodesPerSlot) + 1] , 1238] query Results inserttuple["CityNodesNewB1_NodeId", CityNodesNewB1_NodeId dloop["", . feed count] getValue tie[. + ..], SEC2COMMANDS feed tail[3] sum[ElapsedTime]] consume let Cost5 = distCostSave(ControlWorkers); update LastCommand := distCostReset(ControlWorkers) /* 10 Construct Ways and Roads ---- let Ways = CityNodesNew feed CityWays feed itHashJoin[NodeId, NodeRef] sortby[WayId, NodeCounter] nest[WayId; NodeList] extend[Curve : .NodeList afeed projecttransformstream[Pos] collect_line[TRUE]] CityWayTags feed nest[WayIdInTag; WayInfo] itHashJoin[WayId, WayIdInTag] extend[Box: bbox(.Curve) scale[1000000.0]] sortby[Box] remove[Box] consume ---- */ let WaysB1_WayId = CityNodesNewB1_NodeId CityWaysB1_NodeRef dmap2["", . feed .. feed itHashJoin[NodeId, NodeRef], 1238] partitionF["", . feed, hashvalue(..WayId, 999997), 0] collect2["WaysB1", 1238] query Results inserttuple["WaysB1_WayId", WaysB1_WayId dloop["", . feed count] getValue tie[. + ..], SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume # let Cost6 = distCostSave(ControlWorkers); update LastCommand := distCostReset(ControlWorkers) let WaysB3_WayId = WaysB1_WayId CityWayTagsB1_WayIdInTag dmap2["", . feed sortby[WayId, NodeCounter] nest[WayId; NodeList] extend[Curve : .NodeList afeed projecttransformstream[Pos] collect_line[TRUE]] .. feed sortby[WayIdInTag] nest[WayIdInTag; WayInfo] itHashJoin[WayId, WayIdInTag] extend[Box: bbox(.Curve) scale[1000000.0]] sortby[Box] remove[Box] consume, 1238] query Results inserttuple["WaysB3_WayId", WaysB3_WayId dloop["", . feed count] getValue tie[. + ..], SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume let Cost7 = distCostSave(ControlWorkers); update LastCommand := distCostReset(ControlWorkers) /* ---- let Roads = Ways feed filter[.WayInfo afeed filter[.WayTagKey = "highway"] count > 0] consume ---- */ let RoadsB1_WayId = WaysB3_WayId dloop["RoadsB1_WayId", . feed filter[.WayInfo afeed filter[.WayTagKey = "highway"] count > 0] consume] query Results inserttuple["RoadsB1_WayId", RoadsB1_WayId dloop["", . feed count] getValue tie[. + ..], SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume /* 11 Construct Nodes ---- let Nodes = CityWays feed CityWays feed {h2} itHashJoin[NodeRef, NodeRef_h2] filter[.WayId # .WayId_h2] CityNodesNew feed itHashJoin[NodeRef, NodeId] Roads feed {r1} itHashJoin[WayId, WayId_r1] Roads feed {r2} itHashJoin[WayId_h2, WayId_r2] project[WayId, NodeCounter, NodeIdNew, Pos] Roads feed projectextend[WayId; Node: .NodeList afeed filter[.NodeCounter = 0] aconsume] unnest[Node] project[WayId, NodeCounter, NodeIdNew, Pos] concat Roads feed extend[HighNodeNo: (.NodeList afeed count) - 1] projectextend[WayId; Node: fun(t: TUPLE) attr(t, NodeList) afeed filter[.NodeCounter = attr(t, HighNodeNo)] aconsume] unnest[Node] project[WayId, NodeCounter, NodeIdNew, Pos] concat sortby[WayId, NodeCounter] rdup consume ---- */ let NodesB1 = CityWaysB1_NodeRef CityNodesNewB1_NodeId dmap2["", . feed . feed {h2} itHashJoin[NodeRef, NodeRef_h2] filter[.WayId # .WayId_h2] .. feed itHashJoin[NodeRef, NodeId] project[WayId, NodeCounter, NodeIdNew, Pos, WayId_h2], 1238 ] partitionF["", . feed, hashvalue(..WayId, 999997), NSlots] collect2["", 1238] RoadsB1_WayId dmap2["", . feed .. feed project[WayId] {r1} itHashJoin[WayId, WayId_r1], 1238] partitionF["", . feed, hashvalue(..WayId_h2, 999997), NSlots] collect2["", 1238] RoadsB1_WayId dmap2["", . feed .. feed project[WayId] {r2} itHashJoin[WayId_h2, WayId_r2] project[WayId, NodeCounter, NodeIdNew, Pos], 1238] query Results inserttuple["NodesB1", NodesB1 dloop["", . feed count] getValue tie[. + ..], SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume let NodesB2 = RoadsB1_WayId dloop["", fun(r: DARRAYELEM) r feed projectextend[WayId; Node: .NodeList afeed filter[.NodeCounter = 0] aconsume] unnest[Node] project[WayId, NodeCounter, NodeIdNew, Pos] r feed extend[HighNodeNo: (.NodeList afeed count) - 1] projectextend[WayId; Node: fun(t: TUPLE) attr(t, NodeList) afeed filter[.NodeCounter = attr(t, HighNodeNo)] aconsume] unnest[Node] project[WayId, NodeCounter, NodeIdNew, Pos] concat consume ] query Results inserttuple["NodesB2", NodesB2 dloop["", . feed count] getValue tie[. + ..], SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume let NodesB3_WayId = NodesB1 NodesB2 dmap2["", . feed .. feed concat, 1238] partitionF["", . feed, hashvalue(..WayId, 999997), NSlots] collect2["", 1238] dmap["", . feed sortby[WayId, NodeCounter] rdup consume] query Results inserttuple["NodesB3_WayId", NodesB3_WayId dloop["", . feed count] getValue tie[. + ..], SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume /* 12 Construct Edges 12.1 EdgesUp ---- let EdgesUp = Nodes feed nest[WayId; SectionNodes] projectextend[WayId; Sections: .SectionNodes afeed extend_last[Source: ..NodeIdNew::0, Target: .NodeIdNew::0, SourcePos: ..Pos::[const point value undef], TargetPos: .Pos::[const point value undef], SourceNodeCounter: ..NodeCounter::0, TargetNodeCounter: .NodeCounter::0] filter[.Source # 0] project[Source, Target, SourcePos, TargetPos, SourceNodeCounter, TargetNodeCounter] aconsume] Roads feed {r} itHashJoin[WayId, WayId_r] projectextend[WayId; Sections: fun(t:TUPLE) attr(t, Sections) afeed extend[ Curve: fun(u: TUPLE) attr(t, NodeList_r) afeed filter[.NodeCounter_r between[attr(u, SourceNodeCounter), attr(u, TargetNodeCounter)] ] projecttransformstream[Pos_r] collect_sline[TRUE], RoadName: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "name"] extract[WayTagValue_r], RoadType: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "highway"] extract[WayTagValue_r] ] aconsume ] unnest[Sections] consume ---- */ let NodesB4_WayId = NodesB3_WayId dloop["", . feed nest[WayId; SectionNodes] projectextend[WayId; Sections: .SectionNodes afeed extend_last[Source: ..NodeIdNew::0, Target: .NodeIdNew::0, SourcePos: ..Pos::[const point value undef], TargetPos: .Pos::[const point value undef], SourceNodeCounter: ..NodeCounter::0, TargetNodeCounter: .NodeCounter::0] filter[.Source # 0] project[Source, Target, SourcePos, TargetPos, SourceNodeCounter, TargetNodeCounter] aconsume] consume] query Results inserttuple["NodesB4_WayId", NodesB4_WayId dloop["", . feed count] getValue tie[. + ..], SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume let EdgesUpB1_WayId = NodesB4_WayId RoadsB1_WayId dloop2["", . feed .. feed {r} itHashJoin[WayId, WayId_r] projectextend[WayId; Sections: fun(t:TUPLE) attr(t, Sections) afeed extend[ Curve: fun(u: TUPLE) attr(t, NodeList_r) afeed filter[.NodeCounter_r between[attr(u, SourceNodeCounter), attr(u, TargetNodeCounter)] ] projecttransformstream[Pos_r] collect_sline[TRUE], RoadName: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "name"] extract[WayTagValue_r], RoadType: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "highway"] extract[WayTagValue_r] ] aconsume ] unnest[Sections] consume] query Results inserttuple["EdgesUpB1_WayId", EdgesUpB1_WayId dloop["", . feed count] getValue tie[. + ..], SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume /* 12.2 EdgesDown ---- let EdgesDown = Nodes feed nest[WayId; SectionNodes] projectextend[WayId; Sections: .SectionNodes afeed sortby[NodeCounter desc] extend_last[Source: ..NodeIdNew::0, Target: .NodeIdNew::0, SourcePos: ..Pos::[const point value undef], TargetPos: .Pos::[const point value undef], SourceNodeCounter: ..NodeCounter::0, TargetNodeCounter: .NodeCounter::0] filter[.Source # 0] project[Source, Target, SourcePos, TargetPos, SourceNodeCounter, TargetNodeCounter] aconsume] Roads feed filter[.WayInfo afeed filter[.WayTagKey = "oneway"] filter[(.WayTagValue = "yes")] count = 0] {r} itHashJoin[WayId, WayId_r] projectextend[WayId; Sections: fun(t:TUPLE) attr(t, Sections) afeed extend[Curve: fun(u: TUPLE) attr(t, NodeList_r) afeed sortby[NodeCounter_r desc] filter[.NodeCounter_r between[attr(u, TargetNodeCounter), attr(u, SourceNodeCounter)] ] projecttransformstream[Pos_r] collect_sline[TRUE], RoadName: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "name"] extract[WayTagValue_r], RoadType: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "highway"] extract[WayTagValue_r] ] aconsume ] unnest[Sections] consume ---- */ let NodesB5_WayId = NodesB3_WayId dloop["", . feed nest[WayId; SectionNodes] projectextend[WayId; Sections: .SectionNodes afeed sortby[NodeCounter desc] extend_last[Source: ..NodeIdNew::0, Target: .NodeIdNew::0, SourcePos: ..Pos::[const point value undef], TargetPos: .Pos::[const point value undef], SourceNodeCounter: ..NodeCounter::0, TargetNodeCounter: .NodeCounter::0] filter[.Source # 0] project[Source, Target, SourcePos, TargetPos, SourceNodeCounter, TargetNodeCounter] aconsume] consume] query Results inserttuple["NodesB5_WayId", NodesB5_WayId dloop["", . feed count] getValue tie[. + ..], SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume let EdgesDownB1_WayId = NodesB5_WayId RoadsB1_WayId dloop2["", . feed .. feed filter[.WayInfo afeed filter[.WayTagKey = "oneway"] filter[(.WayTagValue = "yes")] count = 0] {r} itHashJoin[WayId, WayId_r] projectextend[WayId; Sections: fun(t:TUPLE) attr(t, Sections) afeed extend[Curve: fun(u: TUPLE) attr(t, NodeList_r) afeed sortby[NodeCounter_r desc] filter[.NodeCounter_r between[attr(u, TargetNodeCounter), attr(u, SourceNodeCounter)] ] projecttransformstream[Pos_r] collect_sline[TRUE], RoadName: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "name"] extract[WayTagValue_r], RoadType: attr(t, WayInfo_r) afeed filter[.WayTagKey_r = "highway"] extract[WayTagValue_r] ] aconsume ] unnest[Sections] consume] query Results inserttuple["EdgesDownB1_WayId", EdgesDownB1_WayId dloop["", . feed count] getValue tie[. + ..], SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume /* 12.3 Edges ---- let Edges = EdgesUp feed EdgesDown feed concat projectextend[Source, Target, SourcePos, TargetPos, SourceNodeCounter, TargetNodeCounter, Curve, RoadName, RoadType; WayId: .WayId] oconsume[Source, Target] ---- */ let EdgesB1_WayId = EdgesUpB1_WayId EdgesDownB1_WayId dloop2["", . feed .. feed concat projectextend[Source, Target, SourcePos, TargetPos, SourceNodeCounter, TargetNodeCounter, Curve, RoadName, RoadType; WayId: .WayId] consume ] query Results inserttuple["EdgesB1_WayId", EdgesB1_WayId dloop["", . feed count] getValue tie[. + ..], SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume /* 13 Test */ let hombruch = [const rect value (7.419 7.457 51.462 51.484)] query share("hombruch", TRUE, Workers) let hombruchEdges = EdgesB1_WayId dloop["", . feed filter[bbox(.Curve) intersects hombruch] consume] dsummarize consume let Commands = SEC2COMMANDS feed consume let TotalTime = Commands feed filter[not(.CmdStr contains "Results")] filter[not(.CmdStr contains "distCost")] sum[ElapsedTime]