Files
secondo/bin/Scripts/SimilarityClustering.sec
2026-01-23 17:03:45 +08:00

172 lines
3.8 KiB
Plaintext

# Similarity Clustering as in paper.
# Database must be open.
# Distributed array T with attribute Pos of type point must be present.
# Workers relation must be present.
# restore Workers from WorkersNewton
# let S = 'S' ffeed5 consume
# let T = S feed ddistribute3["T", 160, TRUE, Workers];
# Variable myPort must be set below to an exclusively used port.
# Step 1
let Card = T dmap["", . count] getValue tie[. + ..];
query share("Card", TRUE, Workers)
let SS = T dmap["", . feedNth[Card div 10000, FALSE]] dsummarize consume
# Step 2
@&Scripts/SimilarityPartitioning.sec;
let n = PC count;
let MinPts = 10;
let Eps = 100.0;
let wgs84 = create_geoid("WGS1984");
let myPort = 1238;
# Step 3
query share("PC", TRUE, Workers);
query share("MinPts", TRUE, Workers);
query share("Eps", TRUE, Workers);
query share("wgs84", TRUE, Workers);
query share("n", TRUE, Workers);
# Step 4
query memclear();
query T dcommand['query memclear()'] consume;
query T dcommand['query meminit(3600)'] consume;
query T dlet["PCm", 'PC feed mconsume'] consume;
query T dlet["PCm_Pos_mtree", 'PCm mcreatemtree[Pos, wgs84]'] consume
let V = T
dmap["", . feed
loopjoin[fun(t: TUPLE) PCm_Pos_mtree PCm mdistScan[attr(t, Pos)] head[1]
projectextend[N; Dist: distance(attr(t, Pos), .Pos, wgs84)]]
loopjoin[fun(u: TUPLE) PCm_Pos_mtree PCm mdistRange[attr(u, Pos),
attr(u, Dist) + (2 * Eps)] projectextend[; N2: .N]]
]
partition["", .N2, n]
collect2["V", myPort]
# Step 5
let ControlN = createintdarray("ControlN", Workers, n);
let X = V ControlN
dmap2["X", $1 feed extend[Pos2: gk(.Pos)] dbscanM[Pos2, CID0, Eps, MinPts]
extend[CID: (.CID0 * n) + $2] consume, myPort
]
# Step 6
query T dcommand['query memclear()'] filter[.Ok] count;
let Wm = X dmap["Wm", . feed filter[.N = .N2] mconsume];
let Wm_Pos_mtree = Wm dmap["Wm_Pos_mtree", . mcreatemtree[Pos, wgs84]];
let Neighbors = X Wm_Pos_mtree Wm
dmap3["Neighbors", $1 feed filter[.N # .N2]
loopsel[fun(t: TUPLE) $2 $3 mdistRange[attr(t, Pos), Eps]
projectextend[; P: .Osm_id, PosP: .Pos, CID0: .CID0, CIDp: .CID,
IsCoreP: .IsCore, Np: .N, Q: attr(t, Osm_id), QPos: attr(t, Pos)]]
consume, myPort
]
let NeighborsByP = Neighbors partition["", hashvalue(.P, 999997), 0]
collect2["NeighborsByP", myPort];
let NeighborsByQ = Neighbors partition["", hashvalue(.Q, 999997), 0]
collect2["NeighborsByQ", myPort];
# Step 7
query T dcommand['query memclear()'] filter[.Ok] count;
query T dcommand['query meminit(3600)'] consume;
let Pairs = NeighborsByQ NeighborsByP
dmap2["Pairs", . feed {n1} .. feed {n2} itHashJoin[Q_n1, P_n2] mconsume, myPort]
let Merge = Pairs
dmap["Merge", . mfeed filter[.IsCoreP_n1 and .IsCoreP_n2]
project[CIDp_n1, CIDp_n2] sort rdup consume
]
let Assignments = Pairs
dmap["",
. mfeed filter[.IsCoreP_n1 and not(.IsCoreP_n2)]
projectextend[; P: .P_n2, N: .Np_n2, CID: .CIDp_n1]
. mfeed filter[.IsCoreP_n2 and not(.IsCoreP_n1)]
projectextend[; P: .P_n1, N: .Np_n1, CID: .CIDp_n2]
concat sort krdup[P] consume
]
partition["", .N, 0]
collect2["Assignments", myPort]
# Step 8
let MergeM = Merge dsummarize sort rdup createmgraph2[CIDp_n1, CIDp_n2, 1.0];
let MaxCN = X dmap["", . feed max[CID] feed transformstream] dsummarize max[Elem];
# Step 9
let R = MergeM mg2connectedcomponents projectextend[; CID: .CIDp_n1,
CIDnew: .CompNo + MaxCN] sort rdup consume
# Step 10
query share("R", TRUE, Workers);
# Step 11
query X Assignments
dmap2["", $1 feed addid filter[.N = .N2] $2 feed sort krdup[P] {a}
itHashJoin[Osm_id, P_a] $1 updatedirect2[TID; CID: .CID_a] count, myPort
]
getValue tie[. + ..]
query X
dmap["", $1 feed addid filter[.N = .N2] R feed sort krdup[CID] {a}
itHashJoin[CID, CID_a] $1 updatedirect2[TID; CID: .CIDnew_a] count
]
getValue tie[. + ..]