# Distributed Similarity Clustering # # Distributed relation T with attribute Pos of type point must be present. Pos must be # in geographic coordinates. Also the relation Workers must exist in the database. # # The database must be open. # Step 1a let Card = T dmap["", . count] getValue tie[. + ..]; query share("Card", TRUE, Workers) let SS = T dmap["", . feedNth[Card div 10000, FALSE]] dsummarize consume # Step 1b @&Scripts/SimilarityPartitioning.sec; let n = PC count let MinPts = 10; let Eps = 100.0; let wgs84 = create_geoid("WGS1984") # Step 2 query share("PC", TRUE, Workers); query share("MinPts", TRUE, Workers); query share("Eps", TRUE, Workers); query share("wgs84", TRUE, Workers); query share("n", TRUE, Workers); # Step 3 query memclear(); query T dcommand['query meminit(3600)'] filter[.Ok] count; query T dlet["PCm", 'PC feed mconsume'] consume; query T dlet["PCm_Pos_mtree", 'PCm mcreatemtree[Pos, wgs84]'] consume let V = T dmap["", . feed loopjoin[fun(t: TUPLE) PCm_Pos_mtree PCm mdistScan[attr(t, Pos)] head[1] projectextend[N; Dist: distance(attr(t, Pos), .Pos, wgs84)]] loopjoin[fun(u: TUPLE) PCm_Pos_mtree PCm mdistRange[attr(u, Pos), attr(u, Dist) + (2 * Eps)] projectextend[; N2: .N]] ] partition["", .N2, n] collect2["V", 1238] # Step 4 let ControlN = createintdarray("ControlN", Workers, n); let X = V ControlN dmap2["X", $1 feed extend[Pos2: gk(.Pos)] dbscanM[Pos2, CID0, Eps, MinPts] extend[CID: (.CID0 * n) + $2] consume, 1238] # Step 5 query T dcommand['query memclear()'] consume; let Wm = X dmap["Wm", . feed filter[.N = .N2] mconsume]; let Wm_Pos_mtree = Wm dmap["Wm_Pos_mtree", . mcreatemtree[Pos, wgs84]]; let Neighbors = X Wm_Pos_mtree Wm dmap3["Neighbors", $1 feed filter[.N # .N2] loopsel[fun(t: TUPLE) $2 $3 mdistRange[attr(t, Pos), Eps] projectextend[; P: attr(t, Osm_id), PosP: attr(t, Pos), CID0: attr(t, CID0), CIDp: attr(t, CID), IsCoreP: attr(t, IsCore), Np: attr(t, N), Q: .Osm_id, QPos: .Pos]] consume, 1238] let NeighborsByP = Neighbors partition["", hashvalue(.P, 999997), 0] collect2["NeighborsByP", 1238]; let NeighborsByQ = Neighbors partition["", hashvalue(.Q, 999997), 0] collect2["NeighborsByQ", 1238]; # Step 6 query T dcommand['query memclear()'] filter[.Ok] count; let Pairs = NeighborsByQ NeighborsByP dmap2["Pairs", . feed {n1} .. feed {n2} itHashJoin[Q_n1, P_n2] mconsume, 1238] let Merge = Pairs dmap["Merge", . mfeed filter[.IsCoreP_n1 and .IsCoreP_n2] project[CIDp_n1, CIDp_n2] sort rdup consume] let Assignments = Pairs dmap["", . mfeed filter[.IsCoreP_n1 and not(.IsCoreP_n2)] projectextend[; P: .P_n2, N: .Np_n2, CID: .CIDp_n1] . mfeed filter[.IsCoreP_n2 and not(.IsCoreP_n1)] projectextend[; P: .P_n1, N: .Np_n1, CID: .CIDp_n2] concat sort krdup[P] consume] partition["", .N, 0] collect2["Assignments", 1238] # Step 7 let MergeM = Merge dsummarize sort rdup createmgraph2[CIDp_n1, CIDp_n2, 1.0]; let MaxCN = X dmap["", . feed max[CID] feed transformstream] dsummarize max[Elem]; # Step 8 let R = MergeM mg2connectedcomponents projectextend[; CID: .CIDp_n1, CIDnew: .CompNo + MaxCN] sort rdup consume # Step 9 query share("R", TRUE, Workers); # Step 10 query X Assignments dmap2["", $1 feed addid filter[.N = .N2] $2 feed sort krdup[P] {a} itHashJoin[Osm_id, P_a] $1 updatedirect2[TID; CID: .CID_a] count, 1238] getValue tie[. + ..] query X dmap["", $1 feed addid filter[.N = .N2] R feed sort krdup[CID] {a} itHashJoin[CID, CID_a] $1 updatedirect2[TID; CID: .CIDnew_a] count] getValue tie[. + ..]