# Similarity Clustering as in paper + balancing slots over workers. # Database must be open. # the following is done by the script: ## Distributed array T with attribute Pos of type point must be present. ## Workers relation must be present. ## Variable myPort must be set below to an exclusively used port. ## Parameter k below may be adapted (default 50) ## sample size in step 1 may be adapted # let myPort = 61239; query T dmap["", . feed count] getValue tie[. + ..] ## 315113976 # prepare cost measurements let ControlWorkers = createintdarray("ControlWorkers", Workers, Workers count) @%Scripts/DistCost.sec restore Results from Results; # Step 1 let sizeT = size(T); query share("sizeT", TRUE, Workers) # sample is determined beforehand to support comparisons # let SS = T dmap["", . feed some[10000 div sizeT]] dsummarize consume query Results inserttuple[db, "Step 1", SEC2COMMANDS feed tail[3] sum[ElapsedTime]] consume # Step 2 # let k = 50; @&Scripts/SimilarityPartitioning.sec; # script has 9 commands let n = PC count; let MinPts = 10; let Eps = 100.0; let wgs84 = create_geoid("WGS1984"); query Results inserttuple[db, "Step 2", SEC2COMMANDS feed tail[14] sum[ElapsedTime]] consume # Step 3 query share("PC", TRUE, Workers); query share("MinPts", TRUE, Workers); query share("Eps", TRUE, Workers); query share("wgs84", TRUE, Workers); query share("n", TRUE, Workers); query Results inserttuple[db, "Step 3", SEC2COMMANDS feed tail[5] sum[ElapsedTime]] consume # Step 4a query memclear(); query T dcommand['query memclear()'] consume; query T dcommand['query meminit(3600)'] consume; query T dlet["PCm", 'PC feed mconsume'] consume; query T dlet["PCm_Pos_mtree", 'PCm mcreatemtree[Pos, wgs84]'] consume let Va = T dmap["", . feed loopjoin[fun(t: TUPLE) PCm_Pos_mtree PCm mdistScan[attr(t, Pos)] head[1] projectextend[N; Dist: distance(attr(t, Pos), .Pos, wgs84)]] loopjoin[fun(u: TUPLE) PCm_Pos_mtree PCm mdistRange[attr(u, Pos), attr(u, Dist) + (2 * Eps)] projectextend[; N2: .N]] ] partition["Va", .N2, n] query Results inserttuple[db, "Va", SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume let Vb = Va collect2["Vb", myPort] query Results inserttuple[db, "Vb", SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume query Results inserttuple[db, "Step 4a", SEC2COMMANDS feed tail[8] sum[ElapsedTime]] consume # Step 4b Load Balancing let wc = Workers count; let reserve = ((wc - 1) div 20) + 1; let W = intstream(0, wc - 1) namedtransformstream[Worker] extend[Load: 0.0] consume let Sizes = Vb dmap["", . feed count] dsummarize namedtransformstream[Size] addcounter[Slot, 0] consume let Slots = Sizes feed replaceAttr[Size: .Size * 1.0] sortby[Size desc] consume let TargetSize = (Slots feed sum[Size]) / wc; query memclear(); let PQ = W feed head[wc - reserve] mcreatepqueue[Load]; let Assignment = PQ mfeedpq Slots feed obojoin extend[Ok: PQ minserttuplepqprojectU[., .Load + .Size, Load; Worker, Load]] consume delete PQ; let PQ = W feed tail[reserve] mcreatepqueue[Load]; query PQ mfeedpq Assignment feed addid extend[LoadAfter: .Load + .Size] sortby[LoadAfter desc] project[TID] Assignment deletebyid2[TID] project[Size, Slot] obojoin extend[Ok: PQ minserttuplepqprojectU[., .Load + .Size, Load; Worker, Load]] Assignment insert cancel[(.Load + (2 * .Size)) > (TargetSize * 1.03)] count let AssignmentV = Assignment feed sortby[Slot] project[Worker] transformstream collect_vector let V = Va collectC["V", myPort, AssignmentV] query Results inserttuple[db, "V", SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume query Results inserttuple[db, "Step 4b", SEC2COMMANDS feed tail[15] sum[ElapsedTime]] consume # Step 5 update LastCommand := distCostReset(ControlWorkers) let X = V dmap["X", $1 feed extend[Pos2: gk(.Pos)] dbscanM[Pos2, CID0, Eps, MinPts] extend[CID: (.CID0 * n) + $2] consume ] query Results inserttuple[db, "Step 5, X", SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume let Cost1 = distCostSave(ControlWorkers); update LastCommand := distCostReset(ControlWorkers) # Step 6 query T dcommand['query memclear()'] filter[.Ok] count; query T dcommand['query meminit(3600)'] filter[.Ok] count; let Wm = X dmap["Wm", . feed filter[.N # .N2] mconsume]; query Results inserttuple[db, "Wm", SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume let Wm_Pos_mtree = Wm dmap["Wm_Pos_mtree", . mcreatemtree[Pos, wgs84]]; query Results inserttuple[db, "Wm_Pos_mtree", SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume let Neighbors = X Wm_Pos_mtree Wm dmap3["Neighbors", $1 feed filter[.N = .N2] loopsel[fun(t: TUPLE) $2 $3 mdistRange[attr(t, Pos), Eps] projectextend[; P: attr(t, Osm_id), PosP: attr(t, Pos), CID0: attr(t, CID0), CIDp: attr(t, CID), IsCoreP: attr(t, IsCore), Np: attr(t, N), Q: .Osm_id, QPos: .Pos]] , myPort ] query Results inserttuple[db, "Neighbors", SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume let Cost2 = distCostSave(ControlWorkers); update LastCommand := distCostReset(ControlWorkers) query T dcommand['query memclear()'] filter[.Ok] count; let NeighborsByP = Neighbors partition["", hashvalue(.P, 999997), 0] collect2["NeighborsByP", myPort]; let NeighborsByQ = Neighbors partition["", hashvalue(.Q, 999997), 0] collect2["NeighborsByQ", myPort]; query Results inserttuple[db, "NeighborsByP, NeighborsByQ", SEC2COMMANDS feed tail[2] sum[ElapsedTime]] consume query Results inserttuple[db, "Step 6", SEC2COMMANDS feed tail[15] sum[ElapsedTime]] consume let Cost3 = distCostSave(ControlWorkers); update LastCommand := distCostReset(ControlWorkers) # Step 7 let Merge = NeighborsByQ NeighborsByP dmap2["Merge", . feed {n1} .. feed {n2} itHashJoin[Q_n1, P_n2] filter[.P_n1 = .Q_n2] filter[.IsCoreP_n1 and .IsCoreP_n2] project[CIDp_n1, CIDp_n2] krduph[CIDp_n1, CIDp_n2] consume, myPort ] query Results inserttuple[db, "Merge", SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume let Cost4 = distCostSave(ControlWorkers); update LastCommand := distCostReset(ControlWorkers) let Assignments = NeighborsByQ NeighborsByP dmap2["", . feed {n1} .. feed {n2} itHashJoin[Q_n1, P_n2] filter[.P_n1 = .Q_n2] filter[.IsCoreP_n1 and not(.IsCoreP_n2)] projectextend[; P: .P_n2, N: .Np_n2, CID: .CIDp_n1] krduph[P] consume, myPort ] partition["", .N, 0] collect2["Assignments", myPort] query Results inserttuple[db, "Assignments", SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume query Results inserttuple[db, "Step 7", SEC2COMMANDS feed tail[6] sum[ElapsedTime]] consume let Cost5 = distCostSave(ControlWorkers); update LastCommand := distCostReset(ControlWorkers) # Step 8 let MergeM = Merge dsummarize sort rdup createmgraph2[CIDp_n1, CIDp_n2, 1.0]; let MaxCN = X dmap["", . feed max[CID] feed transformstream] dsummarize max[Elem]; query Results inserttuple[db, "Step 8", SEC2COMMANDS feed tail[2] sum[ElapsedTime]] consume # Step 9 let Renumber = MergeM mg2connectedcomponents projectextend[; CID: .CIDp_n1, CIDnew: .CompNo + MaxCN] sort rdup consume query Results inserttuple[db, "Step 9, Renumber", SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume # Step 10 query share("Renumber", TRUE, Workers); query Results inserttuple[db, "Step 10", SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume # Step 11 update LastCommand := distCostReset(ControlWorkers) query X Assignments dmap2["", $1 feed addid filter[.N = .N2] $2 feed sort krdup[P] {a} itHashJoin[Osm_id, P_a] $1 updatedirect2[TID; CID: .CID_a] count, myPort ] getValue tie[. + ..] query Results inserttuple[db, "X Update Assignments", SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume let Cost6 = distCostSave(ControlWorkers); update LastCommand := distCostReset(ControlWorkers) query X dmap["", $1 feed addid filter[.N = .N2] Renumber feed sort krdup[CID] {a} itHashJoin[CID, CID_a] $1 updatedirect2[TID; CID: .CIDnew_a] count ] getValue tie[. + ..] query Results inserttuple[db, "X Update Renumber", SEC2COMMANDS feed tail[1] sum[ElapsedTime]] consume query Results inserttuple[db, "Step 11", SEC2COMMANDS feed tail[7] sum[ElapsedTime]] consume let Cost7 = distCostSave(ControlWorkers); update LastCommand := distCostReset(ControlWorkers) save Results to Results; let Commands = SEC2COMMANDS feed consume;