# The following objects must be present in the database: # # Relations Buildings, SmallCore, Workers14 # # # Clustering parameters; there must be n neighbors within distance eps let eps = 100; let n = 10; query share("eps", TRUE, Workers14) query share("n", TRUE, Workers14) # Now the implementation: # The algorithm is the following: # Let S (= Buildings) be the given set of objects. ################### # 1 Determine a set of core points C using the script SimilarityPartitioning. Core points are now available as set C = SmallCore. ################### # 2 Distribute S = Buildings as fast as possible to workers using round robin, for example. The set BuildingsB1 has been built in this way. let BuildingsB1 = Buildings feed dfdistribute3["BuildingsB1", 50, TRUE, Workers14] # 14:28 min ################### # 3 Share C = SmallCore with all workers. let SmallCore2 = SmallCore feed remove[N] addcounter[N, 0] project[Osm_id, Center, N] consume query share("SmallCore2", TRUE, Workers14) # 1.3 sec, 1.45 ################### # 4 On each worker, create a main memory relation for SmallCore and an MMM-tree over SmallCore_Center. let Control14 = intstream(0, 13) transformstream ddistribute3["Control14", 14, TRUE, Workers14] dloop["", . feed extract[Elem]] query Control14 dloop["", memclear()] query memclear() delete S1; let S1 = Control14 dloop["", SmallCore2 feed letmconsume["SmallCore2"] ] query S1 dloop["", . mfeed count] getValue # result 92 for each of the fields 0 .. 13 delete S2; let S2 = S1 dloop["", . mcreateMtree[Center, "SmallCore2_Center"]] # ok query S2 dloop["", memgetcatalog() project[UsedMB, Name, ObjectType, ObjSizeInMB] consume] dsummarize addcounter[No, 0] consume # we can see that SmallCore2 and SmallCore2_Center have been loaded into the memories of 14 Workers. # inform the master about these memory objects by loading them as well query SmallCore2 feed letmconsume["SmallCore2"] query "SmallCore2" mcreateMtree[Center, "SmallCore2_Center"] # now the state of memory is the same on the master and on Workers ################### # 5a Assign to each element in S (BuildingsB1) its closest element in C (SmallCore2), using the main memory index SmallCore2_Center built in step 4. let BuildingsMembers = BuildingsB1 dmap["BuildingsMembers", . feed extend[Center: center(bbox(.GeoData))] loopjoin[fun(t: TUPLE) "SmallCore2_Center" "SmallCore2" mdistScan[attr(t, Center)] head[1] {s} extend[D: distance(gk(attr(t, Center)), gk(.Center_s))] project[N_s, D] renameattr[N: N_s] ] ] # 4:41min (280.736sec) # 5b Repartition BuildingsMembers by N. query deleteRemoteObjects(BuildingsByCore); delete BuildingsByCore; let BuildingsByCore = BuildingsMembers partition["", .N, 92] collect2["BuildingsByCore", 1238] # 2:32min (151.732sec) query BuildingsByCore dmap["", . feed count] getValue tie[. + ..] # result 6516159, correct ################### # 5 Repartition S (BuildingsB1) by C (SmallCore2), call it T (BuildingsByCore). Use the main memory index SmallCore2_Center built in step 4. Now partitions correspond to core points. let BuildingsByCore = BuildingsB1 partitionF["BuildingsByCore", . feed extend[Center: center(bbox(.GeoData))] loopjoin[fun(t: TUPLE) "SmallCore2_Center" "SmallCore2" mdistScan[attr(t, Center)] head[1] {s} extend[D: distance(gk(attr(t, Center)), gk(.Center_s))] project[Osm_id_s, N_s, D] ], ..N_s, 92] # 291 seconds ### remember distributed costs for this let CostBuildingsByCore = Control14 dloop["", SEC2COMMANDS feed consume] let CostRelBuildingsByCore = CostBuildingsByCore Control14 dloop2["", . feed extend[Server: ..] consume] dsummarize consume ### build cost diagram query CostRelBuildingsByCore feed project[CmdNr, CmdStr, ElapsedTime, Server] addcounter[N, 0] filter[.ElapsedTime > 1.0] addcounter[N2, 0] nest[Server; Costs] extend[CostsB: fun(t: TUPLE) attr(t, Costs) afeed addcounter[N4, 1] extend[Below: fun(t2: TUPLE) attr(t, Costs) afeed addcounter[N5, 1] filter[.N5 < attr(t2, N4)] sum[ElapsedTime]] extend[CostBox: rectangle2(attr(t, Server) * 50.0, (attr(t, Server) * 50.0) + 40.0, .Below, .Below + .ElapsedTime)] aconsume] remove[Costs] consume let BuildingsByCoreB = BuildingsByCore areduce["", . feed, 1238] let CostBuildingsByCoreB = Control14 dloop["", SEC2COMMANDS feed consume] let CostRelBuildingsByCoreB = CostBuildingsByCoreB Control14 dloop2["", . feed extend[Server: ..] consume] dsummarize consume # 58.47 seconds, 58.36 query CostRelBuildingsByCoreB feed project[CmdNr, CmdStr, ElapsedTime, Server] addcounter[N, 0] filter[.ElapsedTime > 1.0] addcounter[N2, 0] nest[Server; Costs] extend[CostsB: fun(t: TUPLE) attr(t, Costs) afeed addcounter[N4, 1] extend[Below: fun(t2: TUPLE) attr(t, Costs) afeed addcounter[N5, 1] filter[.N5 < attr(t2, N4)] sum[ElapsedTime]] extend[CostBox: rectangle2(attr(t, Server) * 50.0, (attr(t, Server) * 50.0) + 40.0, .Below, .Below + .ElapsedTime)] aconsume] remove[Costs] consume query BuildingsByCoreB dmap["", . feed count] getValue tie[. + ..] # result 6516159, correct ################### # 6 For each partition T_i, determine the maximum value of D. Let this be Radius_i. ### prepare time measurements update LastCommand := Control14 dloop["", SEC2COMMANDS feed max[CmdNr]] query deleteRemoteObjects(RadiusMembers); delete RadiusMembers; let RadiusMembers = BuildingsByCore dmap["", . feed kbiggest[1; D] extract[D]] # 9.25 seconds, 9.23, 11.32 let CostRadiusMembers = Control14 LastCommand dloop2["", SEC2COMMANDS feed filter[.CmdNr > ..] consume] Control14 dloop2["", . feed extend[Server: ..] consume] dsummarize consume ################### # 7 Load each partition S_i into memory and create a create a main memory M-Tree index S_Center over it. Then for each partition S_i, for each core point c_j in C, search on S_Center with the radius Radius_j + epsilon. Repartition the result set by N into U. query Control14 dloop["", meminit(2000)] let Numbers50 = intstream(0, 49) transformstream ddistribute3["Numbers50", 50, TRUE, Workers14] dloop["", . feed extract[Elem]] query deleteRemoteObjects(BuildingsGK2); delete BuildingsGK2; let BuildingsGK2 = BuildingsMembers Numbers50 dmap2["BuildingsGK2", . feed extend[CenterGK: gk(center(bbox(.GeoData)))] remove[GeoData] letmconsumeflob["BuildingsGK2" + "_" + num2string(..)] , 1238 ] # 27 seconds, 26.77 query deleteRemoteObjects(BuildingsGK2_CenterGK); delete BuildingsGK2_CenterGK; let BuildingsGK2_CenterGK = BuildingsGK2 Numbers50 dmap2["BuildingsGK2_CenterGK", . mfeed addid mcreateMtree2[CenterGK, TID, "BuildingsGK2_CenterGK" + "_" + num2string(..)] , 1238] # 8.94 seconds let SmallCore3 = SmallCore2 feed RadiusMembers dloop["", . feed namedtransformstream[Radius] consume] dsummarize obojoin consume query share("SmallCore3", TRUE, Workers14) query deleteRemoteObjects(NeighborsByCore); delete NeighborsByCore; update LastCommand := Control14 dloop["", SEC2COMMANDS feed max[CmdNr]] let NeighborsByCore = BuildingsGK2_CenterGK BuildingsGK2 dmap2["", fun(index: ARRAYFUNARG1, indexedrel: ARRAYFUNARG2) SmallCore3 feed extend[CenterGK: gk(.Center)] extend[Radius2: .Radius + eps] loopsel[fun(t: TUPLE) index mdistRange2[attr(t, CenterGK), attr(t, Radius2)] indexedrel gettuples extend[NN: attr(t, N)] ] , 1238] partition["", .N, 92] collect2["NeighborsByCore", 1238] # 1:40min (99.9391sec) let CostNeighborsByCore = Control14 LastCommand dloop2["", SEC2COMMANDS feed filter[.CmdNr > ..] consume] Control14 dloop2["", . feed extend[Server: ..] consume] dsummarize consume # 6.33 seconds query NeighborsByCore dmap["", . feed count] getValue We now have partitions S_i with an arbitrary subset of S, T_j with the points assigned to some core c_j called members of the partition, and U_j with the points within distance Radius_j + epsilon from core c_j (those that are not members are called neighbors of the partition). 8 For each point p in T_j, search on U_j with a circle of size epsilon. If the result set has size >= n, extend p by attribute IsCorePoint = true, else false. Also for each neighbor q encountered, store the triple (p_id, IsCorePoint, q_id). Call the set of triples Triple_j. 9 Perform clustering with parameters epsilon, n on T_j. Assuming we have less than 100 core points, multiply each cluster number with 100 and add the index j of the partition. So we have clusters with distinct numbers globally. 10 Extend the triples of Triple_j with the cluster numbers for p. On the master: 11 Collect all triples on the master. 12 For each pair of triples (p_id, IsCorePoint, ClusterNo_p, q_id) and (q_id, IsCorePoint, ClusterNo_q, p_id) on the master: If IsCorePoint is true in both triples: generate a task of merging clusters ClusterNo_p and ClusterNo_q. 13 Construct a graph G over tasks with pairs of cluster numbers (c_a, c_b); pairs are edges, cluster numbers are nodes. 14 Compute connected components in that graph; assign each component a new number c_x. Create a global table R of renumberings. 15 Send the table R with renumberings to all partitions. 16 For each partition T_u, apply the renumberings of R. Now all points have received their correct cluster number. (1) # Set SmallCore is available. (2) let BuildingsB1 = Buildings feed dfdistribute3["BuildingsB1", 50, TRUE, Workers14] (3) query share("SmallCore", TRUE, Workers14) # 1.3 sec (4) let BuildingsB1_CenterGK = BuildingsB1