# Secondo Scrip for parallel contraction: # requires: # Relation: Edges # Relation: Speeds2 # Relation: Workers # # and of course the workers # # {close database | delete database arnsberg} # create database arnsberg {close database | create database arnsberg | open database arnsberg} query memclear(); query meminit(2000); if not(isDBObject("Edges")) then # restore Edges from 'Edges' let Edges = 'Edges' ffeed5 consume endif if not(isDBObject("workers")) then restore workers from workers endif if not(isDBObject("Speeds2")) then restore Speeds2 from Speeds2 endif # Preparations # cells in each direction if isDBObject("cs") then delete cs endif let cs = 2; # number of darray slots # this number should be smaller than or equal to cs * cs if isDBObject("noSlots") then delete noSlots endif let noSlots = cs * cs # Compute the bounding box containing all edges within the edges relation if isDBObject("box") then delete box endif let box = Edges feed projectextend[; B: bbox(.Curve)] transformstream collect_box[FALSE] # Create a grid from this box if isDBObject("grid") then delete grid endif let grid = createCellGrid2D(minD(box,1),minD(box,2), (maxD(box,1)-minD(box,1))/cs, (maxD(box,2)-minD(box,2))/cs, cs) # for each edge use assign the cell number for the SourcePos and for TargetPos. # filter out edges having an undefined or empty curve, # filter out edges having different cellnumber for target and end and store them into # another relation. The remaining tuples are distributed over the workers # create an empty relation for the cell spanning edges if isDBObject("CellSpanningEdges") then delete CellSpanningEdges endif let CellSpanningEdges = Edges feed extend[SourceCell : 0, TargetCell : 0] head[0] consume # function computing the cell number of a given point if isDBObject("getCellNo") then delete getCellNo endif let getCellNo = fun(p : point) cellnumber(bbox(p),grid) transformstream sorth extract[Elem] # distribute the edges. Edges crossing between different cells are not distributed but # inserted into the relation CellSpanningEdges if isDBObject("dedges") then delete dedges endif let dedges = Edges feed filter[isdefined(bbox(.Curve))] extend[SourceCell : getCellNo(.SourcePos), TargetCell : getCellNo(.TargetPos) ] filterinsert[.SourceCell = .TargetCell, CellSpanningEdges] dfdistribute4["Edges", .SourceCell, noSlots, workers] # check slot sizes query dedges dmap["", . feed count] dsummarize consume # check size of CellSpanningEdges query CellSpanningEdges count # distribute the speeds relation query share("Speeds2", TRUE, dedges) # assigns speeds to the relations if isDBObject("dedgesb") then delete dedgesb endif let dedgesb = dedges dmap["dedgesb", Speeds2 feed {s} . feed filter[isdefined(.Curve)] itHashJoin[RoadType_s, RoadType] extend[Cost : size(.Curve,[const geoid value WGS1984])/(.Speed_s/3.6), Middle : 0, NEdges : 1.0] project[Source, Target, SourcePos, TargetPos, Cost, Middle, NEdges] filter[isdefined(.Cost)] ] # do the same with the cell spanning edges if isDBObject("CellSpanningEdgesB") then delete CellSpanningEdgesB endif let CellSpanningEdgesB = Speeds2 feed {s} CellSpanningEdges feed filter[isdefined(.Curve)] itHashJoin[RoadType_s, RoadType] extend[Cost: size(.Curve,[const geoid value WGS1984]) / (.Speed_s / 3.6), Middle: 0, NEdges: 1.0] project[Source, Target, SourcePos, TargetPos, Cost, Middle, NEdges] filter[isdefined(.Cost)] consume # extract border nodes, i.e., nodes that are part of a cell spanning edge # such nodes should if isDBObject("borderNodes") then delete borderNodes endif let borderNodes = CellSpanningEdges feed project[Source] renameattr[Node: Source] CellSpanningEdges feed project[Target] renameattr[Node : Target] concat sort rdup consume # check size of borderNodes query borderNodes count # share bordernodes query share("borderNodes", TRUE, dedgesb) # on each worker, we clean up the memory query dedgesb dmap["", memclear() ] query dedgesb dmap["", meminit(2000)] # we create a darray containing names for the memory graphs # Note: Secondo crashes if just using a constant in the function if isDBObject("mg2names") then delete mg2names endif let mg2names = dedgesb dmap["mg2names", "dummy" + ""] query intstream(0,noSlots - 1) transformstream extend[N : "EdgesD_" + num2string(.Elem)] extend[S : size(put(mg2names , .Elem, .N))] consume # create a graph on each worker using the name of mg2names if isDBObject("graphs") then delete graphs endif let graphs = dedgesb mg2names dmap2["", fun( r : frel(tuple([Source :int , Target : int, SourcePos : point, TargetPos: point, Cost : real,Middle: int,NEdges : real])), na : string) r feed createmgraph2m[Source, Target, .Cost, na, FALSE], 1238] # create names for ContractionX in the same way as for the graphs if isDBObject("contractNames") then delete contractNames endif let contractNames = dedgesb dmap["contractnames", "dummy" + ""] query intstream(0,noSlots - 1) transformstream extend[N : "ContractionX_" + num2string(.Elem)] extend[S : size(put(contractNames , .Elem, .N))] consume # create empty memory relations for storing the contracted edges in each slot if isDBObject("contractionX") then delete contractionX endif let contractionX = graphs contractNames dmap2["", . mg2feed head[0] letmconsume[..], 1238 ] # create a list of nodes on each slot filtering out border nodes if isDBObject("nodesA") then delete nodesA endif; let nodesA = graphs dmap["nodesA", . mg2feed project[Source, MG_Source, SourcePos] renameattr[Orig : Source, NodeIdNew : MG_Source, Pos : SourcePos] . mg2feed project[Target, MG_Target, TargetPos] renameattr[Orig : Target, NodeIdNew : MG_Target, Pos : TargetPos] concat sortby[Orig] krdup[Orig] borderNodes feed renameattr[Orig : Node] kmergediff ] # check the number of nodes in slots query nodesA dmap["", . feed count] dsummarize consume # create names for the priority queues if isDBObject("pqNames") then delete pqNames endif let pqNames = dedgesb dmap["pqnames", "dummy" + ""] query intstream(0,noSlots - 1) transformstream extend[N : "NodesX_" + num2string(.Elem)] extend[S : size(put(pqNames , .Elem, .N))] consume # create a priority queu from the nodes at each slot if isDBObject("nodesX") then delete nodesX endif let nodesX = nodesA pqNames dmap2[ "mempqueue", . feed projectextend[Orig; Node: .NodeIdNew, Pos: .Pos, Prio: 0.0] mcreatepqueue2[Prio, ..], 1238 ] # check sizes of the queues query nodesX dmap["", size(.) ] dsummarize consume # create main memeory relations for the deleted nodes if isDBObject("dnNames") then delete dnNames endif let dnNames = dedgesb dmap["dnNames", "dummy" + ""] query intstream(0,noSlots - 1) transformstream extend[N : "deletedNodes_" + num2string(.Elem)] extend[S : size(put(dnNames , .Elem, .N))] consume if isDBObject("deletedNodes") then delete deletedNodes endif let deletedNodes = nodesA dnNames dmap2["deletedNodes", . feed projectextend[Orig; Node: .NodeIdNew, Pos: .Pos, Prio: 0.0, In: 0, Out: 0, CEdges: 0, Deleted: 0] head[0] letmconsume[..], 1238 ] # define a function determining the contraction edges for a certain node if isDBObject("mtcreationEdges") then delete mtcreationEdges endif let mtcreationEdges = fun(node10 : int, maxHops : int, g : mem (mgraph2 (tuple([Source :int, Target : int, SourcePos: point,TargetPos :point,Cost: real,Middle: int, NEdges : real, MG_Source : int, MG_Target : int, MG_Cost : real]))) ) pwrap(g) mg2predecessors[node10] sortbyh[MG_Source,MG_Cost] krdup[MG_Source] {i} loopjoin[ fun(t10 : TUPLE) (fun(node11: int) pwrap(g) mg2successors[node11]) pwrap(g) mg2successors[node10] sortbyh[MG_Target,MG_Cost] krdup[MG_Target] {o} mtMinPathCosts1[MG_Target,attr(t10,MG_Source_i), MG_Target_o, MG_Cost, maxHops] filter[attr(t10,MG_Source_i) # .MG_Target_o] extend[Cost : attr(t10,MG_Cost_i) + .MG_Cost_o] filter[.MinPC >= .Cost] remove[MinPC] ] # share this function with the workers query share("mtcreationEdges",TRUE,workers) ## define the main function if isDBObject("mtcontract") then delete mtcontract endif let mtcontract = fun( nodesXa : mem (mpqueue (tuple ([Orig : int, Node : int,Pos : point,Prio : real]))), edgesXa : mem (mgraph2 (tuple ([Source : int,Target: int, SourcePos : point, TargetPos : point, Cost : real, Middle : int, NEdges : real, MG_Source : int, MG_Target : int, MG_Cost : real]))), contractionXa : mem (rel (tuple ([Source : int, Target : int, SourcePos: point, TargetPos : point, Cost : real, Middle : int, NEdges : real, MG_Source : int, MG_Target : int, MG_Cost : real]))), deletedNodesa : mem (rel (tuple ([Orig : int, Node : int, Pos : point,Prio : real,In : int, Out: int, CEdges : int, Deleted : int]))), maxprio: int, minBlockSize : int, maxHops : int) nodesXa mfeedpq extend[ In : pwrap(edgesXa) mg2numpredecessors[.Node], Out: pwrap(edgesXa) mg2numsuccessors[.Node] ] extend[CEdges: fun(t: TUPLE) ifthenelse( (attr(t, In) * attr(t, Out)) <= attr(t, Prio), mtcreationEdges(attr(t,Node), maxHops, edgesXa) groupby[; Added: fun(g: GROUP) ifthenelse( (((attr(t, In) * attr(t, Out)) + ((g count) - (attr(t, In) + attr(t, Out)))) <= attr(t, Prio)) or ( (g count) <= attr(t,In) + attr(t,Out)), g feed projectextend[; Source: .Source_i, Target: .Target_o, SourcePos: .SourcePos_i, TargetPos: .TargetPos_o, Cost: .Cost_i + .Cost_o, Middle: attr(t, Orig), NEdges: .NEdges_i + .NEdges_o, MG_Source : .MG_Source_i, MG_Target : .MG_Target_o, MG_Cost : .MG_Cost_i + .MG_Cost_o ] mg2insert[pwrap(edgesXa)] minsert[pwrap(contractionXa)] count, bool2int(pwrap(nodesXa) minserttuplepqprojectU[t, ((attr(t, In) * attr(t, Out)) + ((g count) - (attr(t, In) + attr(t, Out)))) * 1.0, Prio;Orig, Node, Pos, Prio]) - 10 )] extractDef[Added, 0] , bool2int(pwrap(nodesXa) minserttuplepqprojectU[t, (attr(t, In) * attr(t, Out)) * 1.0, Prio; Orig, Node, Pos, Prio]) - 10 ) ] filter[.CEdges >= 0] addModCounter[Num,1, (.Prio > maxprio) and (.. > minBlockSize), 0] extend[ Dummy : ifthenelse(.Num = 0,pwrap(nodesXa) mpqreorderupdate[ 0.0, Prio ],0)] remove[Num,Dummy] extend[Deleted: bool2int(pwrap(edgesXa) mg2disconnect[.Node]) ] minsert[pwrap(deletedNodesa)] count # share this function query share("mtcontract", TRUE, workers) # do the contraction by calling the function on each worker query nodesX graphs contractionX deletedNodes dmap4["", mtcontract($1,$2,$3,$4, 30, 200, 8) , 1238] # make the main memory structures persistent at each worker # we need: # * graphs : contain remaining edges, note not all nodes are contracted # * deletedNodes : to determine the deletion order # * contractionX : the contracted Edges if isDBObject("pgraphs") then delete pgraphs endif let pgraphs = graphs dmap["pgraphs", . mg2feed ] if isDBObject("pdeletedNodes") then delete pdeletedNodes endif let pdeletedNodes = deletedNodes dmap["pdeletedNodes", . mfeed addcounter[DelNum, 1] ] if isDBObject("pcontractionX") then delete pcontractionX endif let pcontractionX = contractionX dmap["pcontractionX", . mfeed] # merge the remaining edges on the master # we remove MG_... because these are not unique if isDBObject("remainingEdges") then delete remainingEdges endif let remainingEdges = pgraphs dsummarize remove[MG_Source, MG_Target, MG_Cost] consume # check number of remaining edges query remainingEdges count # merge the deletedNodes, sort and renumber them if isDBObject("alldeletedNodes") then delete alldeletedNodes endif let alldeletedNodes = pdeletedNodes dsummarize sortby[DelNum] remove[DelNum] addcounter[DelNum,1] consume # get the contraction edges if isDBObject("allContraction") then delete allContraction endif let allContraction = pcontractionX dsummarize remove[MG_Source, MG_Target, MG_Cost] consume query allContraction count #store contraction edges between border nodes if isDBObject("borderContractions") then delete borderContractions endif let borderContractions = borderNodes feed {s} borderNodes feed allContraction feed itHashJoin[Node, Target] itHashJoin[Node_s,Source] project[Source, Target, SourcePos, TargetPos, Cost, Middle, NEdges] consume query borderContractions count # build a graph from the remaining edges and the CellSpanning edges if isDBObject("remainingGraph") then delete remainingGraph endif let remainingGraph = remainingEdges feed CellSpanningEdgesB feed concat borderContractions feed concat consume query remainingGraph count if isDBObject("localGraph") then delete localGraph endif let localGraph = remainingGraph feed createmgraph2m[Source,Target,.Cost,"localGraph", FALSE] query mg2numvertices(localGraph) query localGraph mg2feed count query localGraph mg2feed sortby[Source] groupby[Source; C : group count] max[C] query localGraph mg2feed sortby[Target] groupby[Target; C : group count] max[C] # extract the node information from this graph if isDBObject("localNodesA") then delete localNodesA endif let localNodesA = localGraph mg2feed project[Source, MG_Source, SourcePos] renameattr[Orig : Source, NodeIdNew : MG_Source, Pos : SourcePos] localGraph mg2feed project[Target, MG_Target, TargetPos] renameattr[Orig : Target, NodeIdNew : MG_Target, Pos : TargetPos] concat sortby[Orig] krdup[Orig] consume # create memory object for contraction edges if isDBObject("localContractionX") then delete localContractionX endif let localContractionX = localGraph mg2feed head[0] letmconsume["localContractionX"] # create memory object for deleted nodes if isDBObject("localDeletedNodes") then delete localDeletedNodes endif let localDeletedNodes = localNodesA feed projectextend[Orig; Node: .NodeIdNew, Pos: .Pos, Prio: 0.0, In: 0, Out: 0, CEdges: 0, Deleted: 0] head[0] letmconsume["localDeletedNodes"] # insert the localNodesA into a priority queue if isDBObject("localNodesX") then delete localNodesX endif let localNodesX = localNodesA feed projectextend[Orig; Node: .NodeIdNew, Pos: .Pos, Prio: 0.0] mcreatepqueue2[Prio, "localNodesX"] # define function computing contraction edges for a node if isDBObject("localmtcreationEdges") then delete localmtcreationEdges endif let localmtcreationEdges = fun(node10 : int, maxHops : int) pwrap(localGraph) mg2predecessors[node10] sortbyh[MG_Source,MG_Cost] krdup[MG_Source] {i} loopjoin[ fun(t10 : TUPLE) (fun(node11: int) pwrap(localGraph) mg2successors[node11]) pwrap(localGraph) mg2successors[node10] sortbyh[MG_Target,MG_Cost] krdup[MG_Target] {o} mtMinPathCosts1[MG_Target,attr(t10,MG_Source_i), MG_Target_o, MG_Cost, maxHops] filter[attr(t10,MG_Source_i) # .MG_Target_o] extend[Cost : attr(t10,MG_Cost_i) + .MG_Cost_o] filter[.MinPC >= .Cost] remove[MinPC] ] # define the local main function if isDBObject("localmtcontract") then delete localmtcontract endif let localmtcontract = fun( maxprio: int, minBlockSize : int, maxHops : int) localNodesX mfeedpq extend[ In : pwrap(localGraph) mg2numpredecessors[.Node], Out: pwrap(localGraph) mg2numsuccessors[.Node] ] extend[CEdges: fun(t: TUPLE) ifthenelse( (attr(t, In) * attr(t, Out)) <= attr(t, Prio), localmtcreationEdges(attr(t,Node), maxHops) groupby[; Added: fun(g: GROUP) ifthenelse( (((attr(t, In) * attr(t, Out)) + ((g count) - (attr(t, In) + attr(t, Out)))) <= attr(t, Prio)) or ( (g count) <= attr(t,In) + attr(t,Out)), g feed projectextend[; Source: .Source_i, Target: .Target_o, SourcePos: .SourcePos_i, TargetPos: .TargetPos_o, Cost: .Cost_i + .Cost_o, Middle: attr(t, Orig), NEdges: .NEdges_i + .NEdges_o, MG_Source : .MG_Source_i, MG_Target : .MG_Target_o, MG_Cost : .MG_Cost_i + .MG_Cost_o ] mg2insert[pwrap(localGraph)] minsert[pwrap(localContractionX)] count, bool2int(pwrap(localNodesX) minserttuplepqprojectU[t, ((attr(t, In) * attr(t, Out)) + ((g count) - (attr(t, In) + attr(t, Out)))) * 1.0, Prio;Orig, Node, Pos, Prio]) - 10 )] extractDef[Added, 0] , bool2int(pwrap(localNodesX) minserttuplepqprojectU[t, (attr(t, In) * attr(t, Out)) * 1.0, Prio; Orig, Node, Pos, Prio]) - 10 ) ] filter[.CEdges >= 0] addModCounter[Num,1, (.Prio > maxprio) and (.. > minBlockSize), 0] extend[ Dummy : ifthenelse(.Num = 0,pwrap(localNodesX) mpqreorderupdate[ 0.0, Prio ],0)] remove[Num,Dummy] extend[Deleted: bool2int(pwrap(localGraph) mg2disconnect[.Node]) ] minsert[pwrap(localDeletedNodes)] count # contract the graph query localmtcontract(30,200,4) # make results persistent if isDBObject("localContraction") then delete localContraction endif let localContraction = localContractionX mfeed remove[MG_Source, MG_Target, MG_Cost] consume if isDBObject("localNodeOrder") then delete localNodeOrder endif let localNodeOrder = localDeletedNodes mfeed addcounter[DelNum, alldeletedNodes count + 1] consume # connect the local results with the remote results if isDBObject("finalContractionX") then delete finalContractionX endif let finalContractionX = localContraction feed allContraction feed concat consume if isDBObject("finalNodeOrder") then delete finalNodeOrder endif let finalNodeOrder = alldeletedNodes feed localNodeOrder feed concat consume # combine the original graph edges with the contraction edges if isDBObject("allEdges") then delete allEdges endif let allEdges = dedgesb dsummarize finalContractionX feed concat consume if isDBObject("numberedEdges") then delete numberedEdges endif let numberedEdges = finalNodeOrder feed project [Orig,DelNum] {a} finalNodeOrder feed project [Orig,DelNum] {b} allEdges feed itHashJoin[Orig_b, Source, 9999997] remove[Orig_b] itHashJoin[Orig_a, Target, 9999997] remove[Orig_a] renameattr[SourceNum : DelNum_b, TargetNum : DelNum_a] consume query memclear(); if isDBObject("cgraph") then delete cgraph endif let cgraph = numberedEdges feed createmgraph2m[Source, Target,.Cost,"cgraph",TRUE] if isDBObject("forward") then delete forward endif let forward = fun(n : int) cgraph mg2successors[n] filter[.SourceNum < .TargetNum] if isDBObject("backward") then delete backward endif let backward = fun(nb : int) cgraph mg2predecessors[nb] filter[.SourceNum > .TargetNum] if isDBObject("sp") then delete sp endif let sp = fun(s: int , t : int) forward backward gbidijkstra[MG_Target, MG_Source, s,t,.Cost]