# Test file for Distributed Algebra 2 ################################################################## # 6.1 Random Partitioning # query deleteRemoteObjects(RoadsB1) # delete RoadsB1 let RoadsB1 = Roads feed dfdistribute3["RoadsB1", 50, TRUE, Workers14] ################################################################## # 6.2 Hash Partitioning # query deleteRemoteObjects(RoadsB2) # delete RoadsB2 let RoadsB2 = Roads feed ddistribute4["RoadsB2", hashvalue(.Osm_id, 999997), 50, Workers14] ################################################################## # 6.3 Range Partitioning delete S let S = Roads feed filter[isdefined(.Name)] nth[113, FALSE] project[Name] sortby[Name] consume delete Boundaries let Boundaries = S feedproject[Name] nth[101, TRUE] addcounter[D, 1] project[Name, D] consume query Boundaries inserttuple["", 0] consume query memclear() query Boundaries feed letmconsume["Boundaries"] mcreateAVLtree[Name] query deleteRemoteObjects(RoadsB3) delete RoadsB3 let RoadsB3 = Roads feed filter[isdefined(.Name)] ddistribute4["RoadsB3", "Boundaries_Name" "Boundaries" matchbelow[.Name] extract[D], 50, Workers14] # Creating the full sorted order query deleteRemoteObjects(RoadsB3S) delete RoadsB3S let RoadsB3S = RoadsB3 dmap["RoadsB3S", . feed sortby[Name]] ################################################################## # 6.4 Spatial Partitioning delete grid # grid for nrw let grid = [const cellgrid2d value (5.8 50.3 0.2 0.2 20)] query share("grid", TRUE, Workers14) query deleteRemoteObjects(RoadsB4) delete RoadsB4 let RoadsB4 = Roads feed extendstream[Cell: cellnumber(bbox(.GeoData), grid)] extend[Original: .Cell = cellnumber(bbox(.GeoData), grid) transformstream extract[Elem] ] dfdistribute2["RoadsB4", Cell, 50, Workers14] # query deleteRemoteObjects(BuildingsB1) # delete BuildingsB1 let BuildingsB1 = Buildings feed dfdistribute3["BuildingsB1", 50, TRUE, Workers14] # 712 sec = 11:52 min query deleteRemoteObjects(BuildingsB4) delete BuildingsB4 # enlarge bounding boxes of buildings by 300 meters so they can be found in # distributed distance joins by 300 meters distance. let BuildingsB4a = BuildingsB1 partitionF["", . feed extend[EnlargedBox: bbox(.GeoData) extendGeo[300]] extendstream[Cell: cellnumber(.EnlargedBox, grid)] extend[Original: .Cell = cellnumber(.EnlargedBox, grid) transformstream extract[Elem] ], ..Cell, 0] collect2["BuildingsB4a", 1238] # 3:19 min # the following creates a darray which can be indexed let BuildingsB4c = BuildingsB4a dloop["Buildings4", . feed consume] # 6:15 min # result size 6991440 let BuildingsB4 = Buildings feed extend[EnlargedBox: bbox(.GeoData) extendGeo[300]] extendstream[Cell: cellnumber(.EnlargedBox, grid)] extend[Original: .Cell = cellnumber(.EnlargedBox, grid) transformstream extract[Elem] ] ddistribute2["BuildingsB4", Cell, 50, Workers14] # 57:49 min # result size 6991440 ################################################################## # 6.5 Replication query share("Roads", TRUE, Workers14) # 19:47 min ################################################################## # 7 Querying # # 7.1 Selection # # 7.1.1 By Scanning query RoadsB1 dmap["", . feed filter[.Name starts "Universitätsstraße"] count] getValue tie[. + ..] # 5.58 seconds # result 357 delete eichlinghofen let eichlinghofen = [const region value ( ( ( (7.419515247680575 51.47332155746125) (7.394967670776298 51.47332155746125) (7.394967670776298 51.48716614802665) (7.419515247680575 51.48716614802665)))) ] query share("eichlinghofen", TRUE, Workers14) query BuildingsB4 dmap["", . feed filter[.Original] filter[.GeoData intersects eichlinghofen] ] dsummarize consume count # 114 seconds # 2263 objects ################################################################## # 7.1.2 Creating a Standard Index query deleteRemoteObjects(RoadsB2_Name) delete RoadsB2_Name let RoadsB2_Name = RoadsB2 dloop["RoadsB2_Name", . createbtree[Name] ] ################################################################## # 7.1.3 Using a Standard Index query RoadsB2_Name RoadsB2 dloop2["", . .. range["Universitätsstraße", "Universitätsstraße"++] count] getValue tie[. + ..] # 4.38 seconds # result 357 ################################################################## # 7.1.4 Creating a Spatial Index query deleteRemoteObjects(BuildingsB4_GeoData) delete BuildingsB4_GeoData let BuildingsB4_GeoData = BuildingsB4 dloop["", . feed addid extend[Box: scalerect(.EnlargedBox, 1000000.0, 1000000.0)] sortby[Box] remove[Box] bulkloadrtree[EnlargedBox] ] # 44.74 sec ################################################################## # 7.1.5 Using a Spatial Index query BuildingsB4_GeoData BuildingsB4 dmap2["", . .. windowintersects[eichlinghofen] filter[.Original] filter[.GeoData intersects eichlinghofen], 1238 ] dsummarize remove[EnlargedBox] consume count # 10 seconds, 24.52 # 2263 objects ################################################################## # 7.2 Join # # 7.2.1 Equijoin # # (1) Distributed by Join Attribute query deleteRemoteObjects(NaturalB2) delete NaturalB2 let NaturalB2 = Natural feed filter[isdefined(.Name)] ddistribute4["NaturalB2", hashvalue(.Name, 999997), 50, Workers14] # 23.89 seconds query NaturalB2 dmap["", . feed {n1} . feed {n2} itHashJoin[Name_n1, Name_n2] filter[.Osm_id_n1 < .Osm_id_n2]] dsummarize consume count # 10.74 seconds, 12.22 # result 6284 ################################################################## # (2) Arbitrary Distribution query deleteRemoteObjects(NaturalB1) delete NaturalB1 let NaturalB1 = Natural feed dfdistribute3["NaturalB1", 50, TRUE, Workers14] # 31 sec query NaturalB1 partitionF["", . feed filter[isdefined(.Name)], hashvalue(..Name, 999997), 0] collect2["", 1238] dmap["", . feed {n1} . feed {n2} itHashJoin[Name_n1, Name_n2] filter[.Osm_id_n1 < .Osm_id_n2]] dsummarize consume count # 43 seconds, 51.5 # result 6284 ################################################################## # 7.2.2 Spatial Join # # (1) Both arguments are distributed by spatial attributes query deleteRemoteObjects(WaterwaysB4) delete WaterwaysB4 let WaterwaysB4 = Waterways feed extendstream[Cell: cellnumber(bbox(.GeoData), grid)] dfdistribute2["WaterwaysB4", Cell, 50, Workers14] # 16.16 seconds query RoadsB4 WaterwaysB4 dmap2["", . feed {r} .. feed {w} itSpatialJoin[GeoData_r, GeoData_w] filter[.Cell_r = .Cell_w] filter[gridintersects(grid, bbox(.GeoData_r), bbox(.GeoData_w), .Cell_r)] filter[.GeoData_r intersects .GeoData_w] count, 1238 ] getValue tie[. + ..] # 4:49 min # result 61579 query RoadsB4 WaterwaysB4 dmap2["", . feed {r} .. feed {w} itSpatialJoin[GeoData_r, GeoData_w] filter[.Cell_r = .Cell_w] filter[gridintersects(grid, bbox(.GeoData_r), bbox(.GeoData_w), .Cell_r)] count, 1238 ] getValue tie[. + ..] # 9.45 sec # Result 1479958 ################################################################## # (2) Not distributed by spatial attributes query deleteRemoteObjects(WaterwaysB1) delete WaterwaysB1 let WaterwaysB1 = Waterways feed dfdistribute3["WaterwaysB1", 50, TRUE, Workers14] # 3.27 seconds, 9.62 query RoadsB1 partitionF["", . feed extendstream[Cell: cellnumber(bbox(.GeoData), grid)], ..Cell, 0] WaterwaysB1 partitionF["", . feed extendstream[Cell: cellnumber(bbox(.GeoData), grid)], ..Cell, 0] areduce2["", . feed {r} .. feed {w} itSpatialJoin[GeoData_r, GeoData_w] filter[.Cell_r = .Cell_w] filter[gridintersects(grid, bbox(.GeoData_r), bbox(.GeoData_w), .Cell_r)] filter[.GeoData_r intersects .GeoData_w] count, 1238 ] getValue tie[. + ..] # 309 seconds, 305 seconds # result 61579 # nur bbox-Vergleich query RoadsB1 partitionF["", . feed extendstream[Cell: cellnumber(bbox(.GeoData), grid)], ..Cell, 0] WaterwaysB1 partitionF["", . feed extendstream[Cell: cellnumber(bbox(.GeoData), grid)], ..Cell, 0] areduce2["", . feed {r} .. feed {w} itSpatialJoin[GeoData_r, GeoData_w] filter[.Cell_r = .Cell_w] filter[gridintersects(grid, bbox(.GeoData_r), bbox(.GeoData_w), .Cell_r)] count, 1238 ] getValue tie[. + ..] # 33.91 sec # result 1479958 # alternative implementation with collect2, dmap2 query RoadsB1 partitionF["", . feed extendstream[Cell: cellnumber(bbox(.GeoData), grid)], ..Cell, 0] collect2["", 1238] WaterwaysB1 partitionF["", . feed extendstream[Cell: cellnumber(bbox(.GeoData), grid)], ..Cell, 0] collect2["", 1238] dmap2["", . feed {r} .. feed {w} itSpatialJoin[GeoData_r, GeoData_w] filter[.Cell_r = .Cell_w] filter[gridintersects(grid, bbox(.GeoData_r), bbox(.GeoData_w), .Cell_r)] filter[.GeoData_r intersects .GeoData_w] count, 1238 ] getValue tie[. + ..] # 368 seconds = 6:08 min # result 61579 # util = 64.7 %, Cost8 # alternative implementation with dproduct query RoadsB1 WaterwaysB1 dproduct["", . feed {r} .. feed {w} itSpatialJoin[GeoData_r, GeoData_w] count, 1238 ] getValue tie[. + ..] # 28.96 sec, 28.73 # result 1479958 query RoadsB1 WaterwaysB1 dproduct["", . feed {r} .. feed {w} itSpatialJoin[GeoData_r, GeoData_w] filter[.GeoData_r intersects .GeoData_w] count, 1238 ] getValue tie[. + ..] # 4:04 min, 4:05 min, 4:13 min = 253 seconds # result 61579 # Cost7, util = 82.7 % query BuildingsB1 partitionF["", . feed extendstream[Cell: cellnumber(bbox(.GeoData), grid)], ..Cell, 0] RoadsB1 partitionF["", . feed extendstream[Cell: cellnumber(bbox(.GeoData), grid)], ..Cell, 0] areduce2["", . feed {r} .. feed {w} itSpatialJoin[GeoData_r, GeoData_w] filter[.Cell_r = .Cell_w] filter[gridintersects(grid, bbox(.GeoData_r), bbox(.GeoData_w), .Cell_r)] filter[.GeoData_r intersects .GeoData_w] count, 1238 ] getValue tie[. + ..] # 518 seconds # result 72668 # Cost9, util = 76.9 % let grid2 = [const cellgrid2d value (5.8 50.3 0.02 0.02 200)] # using grid2: query BuildingsB1 partitionF["", . feed extendstream[Cell: cellnumber(bbox(.GeoData), grid2)], ..Cell, 0] RoadsB1 partitionF["", . feed extendstream[Cell: cellnumber(bbox(.GeoData), grid2)], ..Cell, 0] areduce2["", . feed {r} .. feed {w} itSpatialJoin[GeoData_r, GeoData_w] filter[.Cell_r = .Cell_w] filter[gridintersects(grid2, bbox(.GeoData_r), bbox(.GeoData_w), .Cell_r)] filter[.GeoData_r intersects .GeoData_w] count, 1238 ] getValue tie[. + ..] # 485 seconds # result 72668 # Costs11.png, util = 90.89 % let hist1 = Roads feed projectextendstream[; Cell: cellnumber(bbox(.GeoData), grid)] extend[Partition: int2real(.Cell mod 50)] create_histogram1d[Partition, realstream(0.0, 50.0, 1.0) set_histogram1d] let grid2 = [const cellgrid2d value (5.8 50.3 0.02 0.02 200)] let hist2 = Roads feed projectextendstream[; Cell: cellnumber(bbox(.GeoData), grid2)] extend[Partition: int2real(.Cell mod 50)] create_histogram1d[Partition, realstream(0.0, 50.0, 1.0) set_histogram1d] # now using grid2 query RoadsB1 partitionF["", . feed extendstream[Cell: cellnumber(bbox(.GeoData), grid2)], ..Cell, 0] WaterwaysB1 partitionF["", . feed extendstream[Cell: cellnumber(bbox(.GeoData), grid2)], ..Cell, 0] areduce2["", . feed {r} .. feed {w} itSpatialJoin[GeoData_r, GeoData_w] filter[.Cell_r = .Cell_w] filter[gridintersects(grid2, bbox(.GeoData_r), bbox(.GeoData_w), .Cell_r)] filter[.GeoData_r intersects .GeoData_w] count, 1238 ] getValue tie[. + ..] # 277 seconds # result 61579 # Cost10.png, util = 88.1% # Balanced partitioning. The set of rectangles PartitionNRW must exist and be shared. query RoadsB1 partitionF["", PartitionNRW feed . feed itSpatialJoin[Field, GeoData] extend[TLClass: topleftclass(.Field, bbox(.GeoData))] remove[Field], ..N, 48] WaterwaysB1 partitionF["", PartitionNRW feed . feed itSpatialJoin[Field, GeoData] extend[TLClass: topleftclass(.Field, bbox(.GeoData))] remove[Field], ..N, 48] areduce2["", . feed {r} .. feed {w} itSpatialJoin[GeoData_r, GeoData_w] filter[(.TLClass_r = 3) or (.TLClass_w = 3) or ((.TLClass_r = 2) and (.TLClass_w = 1)) or ((.TLClass_r = 1) and (.TLClass_w = 2)) ] filter[.GeoData_r intersects .GeoData_w] count, 1238 ] getValue tie[. + ..] # 248 seconds # result 61579 # Cost12, util = 88.1 % query RoadsB1 partitionF["", Fields3 feed . feed itSpatialJoin[Field, GeoData] extend[TLClass: topleftclass(.Field, bbox(.GeoData))] remove[Field], ..N, 90] WaterwaysB1 partitionF["", Fields3 feed . feed itSpatialJoin[Field, GeoData] extend[TLClass: topleftclass(.Field, bbox(.GeoData))] remove[Field], ..N, 90] areduce2["", . feed {r} .. feed {w} itSpatialJoin[GeoData_r, GeoData_w] filter[(.TLClass_r = 3) or (.TLClass_w = 3) or ((.TLClass_r = 2) and (.TLClass_w = 1)) or ((.TLClass_r = 1) and (.TLClass_w = 2)) ] filter[.GeoData_r intersects .GeoData_w] count, 1238 ] getValue tie[. + ..] # 255 seconds # result 61579 # Cost16.png, util = 90% query RoadsB1 partitionF["", PartitionNRW feed . feed itSpatialJoin[Field, GeoData] extend[TLClass: topleftclass(.Field, bbox(.GeoData))] remove[Field], ..N, 48] collect2["", 1238] WaterwaysB1 partitionF["", PartitionNRW feed . feed itSpatialJoin[Field, GeoData] extend[TLClass: topleftclass(.Field, bbox(.GeoData))] remove[Field], ..N, 48] collect2["", 1238] dmap2["", . feed {r} .. feed {w} itSpatialJoin[GeoData_r, GeoData_w] filter[(.TLClass_r = 3) or (.TLClass_w = 3) or ((.TLClass_r = 2) and (.TLClass_w = 1)) or ((.TLClass_r = 1) and (.TLClass_w = 2)) ] filter[.GeoData_r intersects .GeoData_w] count, 1238 ] getValue tie[. + ..] # 384 seconds # result 61579 # Cost 17.png, util = 63.5 % query BuildingsB1 partitionF["", PartitionNRW feed . feed itSpatialJoin[Field, GeoData] extend[TLClass: topleftclass(.Field, bbox(.GeoData))] remove[Field], ..N, 48] RoadsB1 partitionF["", PartitionNRW feed . feed itSpatialJoin[Field, GeoData] extend[TLClass: topleftclass(.Field, bbox(.GeoData))] remove[Field], ..N, 48] areduce2["", . feed {r} .. feed {w} itSpatialJoin[GeoData_r, GeoData_w] filter[(.TLClass_r = 3) or (.TLClass_w = 3) or ((.TLClass_r = 2) and (.TLClass_w = 1)) or ((.TLClass_r = 1) and (.TLClass_w = 2)) ] filter[.GeoData_r intersects .GeoData_w] count, 1238 ] getValue tie[. + ..] # 425 seconds, 412 seconds # result 72668 # Cost13, util = 86.5 % # use an unbalanced partition with small fields at the end. Replace PartitionNRW by Fields3. query share("Fields3", TRUE, Workers14) query BuildingsB1 partitionF["", Fields3 feed . feed itSpatialJoin[Field, GeoData] extend[TLClass: topleftclass(.Field, bbox(.GeoData))] remove[Field], ..N, 90] RoadsB1 partitionF["", Fields3 feed . feed itSpatialJoin[Field, GeoData] extend[TLClass: topleftclass(.Field, bbox(.GeoData))] remove[Field], ..N, 90] areduce2["", . feed {r} .. feed {w} itSpatialJoin[GeoData_r, GeoData_w] filter[(.TLClass_r = 3) or (.TLClass_w = 3) or ((.TLClass_r = 2) and (.TLClass_w = 1)) or ((.TLClass_r = 1) and (.TLClass_w = 2)) ] filter[.GeoData_r intersects .GeoData_w] count, 1238 ] getValue tie[. + ..] # 405 seconds, 432 seconds || 402 seconds # result 72668 # Cost14, util = 92.6 % || Cost 15, util = 91.2 % ################################################################## # Expressions in the Select-Clause query RoadsB4 WaterwaysB4 dmap2["", . feed {r} .. feed {w} itSpatialJoin[GeoData_r, GeoData_w] filter[.Cell_r = .Cell_w] filter[gridintersects(grid, bbox(.GeoData_r), bbox(.GeoData_w), .Cell_r)] filter[.GeoData_r intersects .GeoData_w], 1238 ] dsummarize projectextend[Osm_id_r, Name_r, GeoData_r, Osm_id_w, Name_w, GeoData_w; BridgePosition: crossings(.GeoData_r, .GeoData_w)] consume count # 26.6 seconds # result size 99 query RoadsB4 WaterwaysB4 dmap2["", . feed {r} .. feed {w} itSpatialJoin[GeoData_r, GeoData_w] filter[.Cell_r = .Cell_w] filter[gridintersects(grid, bbox(.GeoData_r), bbox(.GeoData_w), .Cell_r)] filter[.GeoData_r intersects .GeoData_w] projectextend[Osm_id_r, Name_r, Osm_id_w, Name_w; BridgePosition: crossings(.GeoData_r, .GeoData_w)], 1238 ] dsummarize consume count ################################################################## # 7.2.3 General Join query share("Waterways", FALSE, Workers14) query RoadsB1 dmap["", . feed filter[isdefined(.Name)] filter[.Type contains "pedestrian"] {r} Waterways feed filter[.Type contains "river"] {w} symmjoin[.Name_r contains ..Name_w] ] dsummarize consume count ################################################################## # 7.2.4 Index-Based Equijoin query deleteRemoteObjects(RoadsB3_Name) delete RoadsB3_Name let RoadsB3_Name = RoadsB3 dloop["RoadsB3_Name", . createbtree[Name]] query RoadsB3 RoadsB3_Name dmap2["", . feed filter[.Type contains "raceway"] {r1} loopjoin[.. exactmatchS[.Name_r1]], 1238] RoadsB3 dmap2["", . feed .. gettuples filter[.Osm_id_r1 < .Osm_id], 1238 ] dsummarize consume count ################################################################## # 7.2.5 Index-Based Spatial Join query RoadsB4 BuildingsB4_GeoData dmap2["", . feed filter[.Type contains "raceway"] {r} loopjoin[.. windowintersectsS[.GeoData_r]], 1238] BuildingsB4 dmap2["", . feed .. gettuples2[Id, b] filter[.Cell_r = .Cell_b] filter[gridintersects(grid, bbox(.GeoData_r), .EnlargedBox_b, .Cell_r)] filter[distance(gk(.GeoData_r), gk(.GeoData_b)) < 500] count, 1238 ] getValue tie[. + ..] ################################################################## # 7.3 Aggregation # # 7.3.1 Counting query RoadsB1 dmap["", . feed sortby[Type] groupby[Type; Cnt: group count] ] dsummarize sortby[Type] groupby[Type; Cnt: group feed sum[Cnt]] consume count # 9.33 seconds # result 73 (groups) # total number of group members is 1505462, correct. ################################################################## # 7.3 Aggregation # # 7.3.2 Sum, Average query WaterwaysB1 dmap["", . feed filter[.Width between[0, 10000]] sortby[Type] groupby[Type; Cnt: group count, SWidth: group feed sum[Width]] ] dsummarize sortby[Type] groupby[Type; SumWidth: group feed sum[SWidth], SumCnt: group feed sum[Cnt]] extend[AWidth: .SumWidth / .SumCnt] project[Type, AWidth] consume # using groupby2 query WaterwaysB1 dmap["", . feed filter[.Width between[0, 10000]] groupby2[Type; Cnt: fun(t: TUPLE, agg:int) agg + 1::0, SWidth: fun(t2: TUPLE, agg2:int) agg2 + attr(t2, Width)::0] ] dsummarize sortby[Type] groupby[Type; AWidth: group feed sum[SWidth] / group feed sum[Cnt]] consume