492 lines
11 KiB
Plaintext
492 lines
11 KiB
Plaintext
/*
|
|
//paragraph [10] title: [{\Large \bf ] [}]
|
|
//[Contents] [\tableofcontents]
|
|
//[ue] [\"{u}]
|
|
|
|
|
|
|
|
[10] Distributed Clustering
|
|
|
|
|
|
[Contents]
|
|
|
|
1 Overview
|
|
|
|
This script is for Distributed Clustering with DBScan.
|
|
|
|
It is a commented script suitable for viewing with ~pdview~. To be run with
|
|
|
|
---- @%Scripts/DistributedClusteringPD.sec or
|
|
@&Scripts/DistributedClusteringPD.sec
|
|
----
|
|
|
|
2 Preparations
|
|
|
|
Preparations:
|
|
|
|
* include the Distributed2Algebra, NearestNeighborAlgebra, DBScanAlgebra and MainMemory2Algebra in makefile.algebras and recompile secondo
|
|
|
|
* get the desired shape-file from download.geofabrik.de
|
|
|
|
* create and open a database
|
|
|
|
* import relation ~Pois~ using the script ~importGermanyOsm.psec~
|
|
|
|
* restore Workers in relation ~Worker~
|
|
|
|
* start the remoteMonitors for the workers
|
|
|
|
If yet not done restore worker
|
|
|
|
*/
|
|
|
|
#restore Worker from WorkerNewton82
|
|
|
|
let SizeWorker = Worker count
|
|
|
|
/*
|
|
Create a darray ~ControlWorkers~
|
|
|
|
*/
|
|
|
|
let ControlWorkers = intstream(0, SizeWorker - 1) transformstream
|
|
ddistribute3["WorkerControl", SizeWorker, TRUE, Worker]
|
|
dloop["", . feed extract[Elem]]
|
|
|
|
/*
|
|
Run script ~DistCost.sec~ for cost measurements
|
|
|
|
*/
|
|
|
|
@&Scripts/DistCost.sec
|
|
|
|
/*
|
|
Initialze ~epsilon~ and ~minPts~
|
|
|
|
*/
|
|
|
|
let eps = 2500.0
|
|
|
|
let minPts = 150
|
|
|
|
/*
|
|
Share ~eps~ and ~minPts~ to worker
|
|
|
|
*/
|
|
|
|
query share("eps", TRUE, Worker)
|
|
|
|
query share("minPts", TRUE, Worker)
|
|
|
|
/*
|
|
3 Determine SmallCore
|
|
|
|
Determine a set of core points using the script ~DistancePartitioning.sec~. Core points are now available as relation ~SmallCore~.
|
|
|
|
HINT: Maybe number of SamplePoint in the script ~DistancePartionoing.sec~ should be adapted because they are crucial for the numbers of SmallCores.
|
|
|
|
*/
|
|
|
|
@&Scripts/DeterminingSmallCoresPD.sec
|
|
|
|
/*
|
|
Store number of SmallCoreTuples
|
|
|
|
*/
|
|
|
|
let NrSmallCore = SmallCore count
|
|
|
|
/*
|
|
4 Distribute ~Pois~ to worker
|
|
|
|
Distribute the relation ~Pois~ as fast as possible to workers using round robin for example.
|
|
|
|
HINT: Dont forget to start the monitors for the worker first.
|
|
|
|
*/
|
|
|
|
let PoisP1 = Pois feed dfdistribute3["PoisP1", NrSmallCore, TRUE, Worker]
|
|
|
|
/*
|
|
Extend ~PoisP1~ by attribute CenterGK and rename attribute GeoData
|
|
|
|
*/
|
|
|
|
let PoisP1Ex = PoisP1 dmap["", . feed extend[CenterGK: gk(.GeoData)]
|
|
renameattr[Center: GeoData]]
|
|
|
|
/*
|
|
5 Share ~SmallCore~ with all worker
|
|
|
|
Before sharing ~SmallCore~ to all workers initialize a new counter N. Furthermore just OsmID, Center and N will be needed.
|
|
|
|
*/
|
|
|
|
let SmallCore2 = SmallCore feed remove[N]
|
|
addcounter[N, 0]
|
|
project[Osm_id, Center, N] consume
|
|
|
|
#Insert a dummy in ~SmallCore2~. This is necessary due to an indexfault
|
|
#in the MainMemoryAlgebra2 which isn't fixed at the moment.
|
|
#query SmallCore2
|
|
# inserttuple["0" , makepoint(0.0 , 0.0) , -1] consume
|
|
|
|
/*
|
|
Share ~SmallCore2~ to all worker
|
|
|
|
*/
|
|
|
|
query share("SmallCore2", TRUE, Worker)
|
|
|
|
/*
|
|
6 Create main memory relations and MMTrees for ~SmallCores~
|
|
|
|
Delete all main memory objects on the worker
|
|
|
|
*/
|
|
|
|
query ControlWorkers dloop["", memclear()]
|
|
|
|
/*
|
|
Delete all main memory objects on the master
|
|
|
|
*/
|
|
|
|
query memclear()
|
|
|
|
/*
|
|
Create a main memory relation ~SmallCore2~ on each worker
|
|
|
|
*/
|
|
|
|
let S1 = ControlWorkers dloop["", SmallCore2 feed letmconsume["SmallCore2"] ]
|
|
|
|
/*
|
|
Create a mtree for ~SmallCore2~ on each worker
|
|
|
|
*/
|
|
|
|
let S2 = S1 dloop["", . mcreateMtree[Center, "SmallCore2_Center"]]
|
|
|
|
/*
|
|
Create a main memory relation ~SmallCore2~ on the master as well
|
|
|
|
*/
|
|
|
|
query SmallCore2 feed letmconsume["SmallCore2"]
|
|
|
|
/*
|
|
Create a mtree for ~SmallCore2~ on the master as well
|
|
|
|
*/
|
|
|
|
query "SmallCore2" mcreateMtree[Center, "SmallCore2_Center"]
|
|
|
|
/*
|
|
7 Assign each Building to its closest element in ~SmallCore2~
|
|
|
|
On each Worker assign to each element in ~PoisP1~ its closest element in ~SmallCore2~, using the main memory index ~SmallCore2Center~ built in previous step.
|
|
Store the distance in D and create new dfarray ~PoisMembers~.
|
|
|
|
HINT: The input tuples for a loopjoin must have different attribute names, hence renaming is applied to one input stream (marked here with {s}).
|
|
The difference to dfarry ~PoisP1~ is that we have now the ID (N) of the assigned SmallCore and the distance to this SmallCore. The Elements are still on the initial worker.
|
|
|
|
*/
|
|
|
|
let PoisMembers = PoisP1Ex dmap["PoisMembers", . feed
|
|
loopjoin[fun(t: TUPLE)
|
|
"SmallCore2_Center" "SmallCore2" mdistScan[attr(t, Center)] head[1] {s}
|
|
extend[D: distance(gk(attr(t, Center)), gk(.Center_s))]
|
|
project[N_s, D] renameattr[N: N_s] ] ]
|
|
|
|
/*
|
|
8 Repartition ~PoisMembers~ by N (SmallCoreNumber)
|
|
|
|
Repartition ~PoisMembers~ created in previous step by N in dfarray ~PoisByCore~.
|
|
|
|
*/
|
|
|
|
let PoisByCore = PoisMembers partition["", .N, NrSmallCore]
|
|
collect2["PoisByCore", 1238]
|
|
|
|
let LokalPoisByCore = PoisByCore dsummarize consume
|
|
|
|
/*
|
|
9 Determine the radius
|
|
|
|
For each ~PoisByCore~ determine the maxiumum value of D (which is the distance between the tuple and the smallcore) and let this be the radius.
|
|
|
|
*/
|
|
|
|
let RadiusMax = PoisByCore dmap["", . feed kbiggest[1; D] extract[D]]
|
|
|
|
|
|
/*
|
|
Create ~SmallCore3~ by joining ~SmallCore2~ with ~RadiusMax~.
|
|
|
|
*/
|
|
|
|
#Delete dummy from ~SmallCore2~
|
|
#query SmallCore2 feed filter[.N = -1] SmallCore2 deletedirect consume
|
|
|
|
let SmallCore3 = SmallCore2 feed
|
|
RadiusMax dloop["", . feed namedtransformstream[Radius] consume] dsummarize
|
|
obojoin consume
|
|
|
|
|
|
/*
|
|
Extend ~SmallCore3~ by CenterGK and Radius2
|
|
|
|
*/
|
|
|
|
let SmallCore4 = SmallCore3 feed
|
|
extend[CenterGK: gk(.Center), Radius2: .Radius + eps] consume
|
|
|
|
/*
|
|
Share ~SmallCore4~ to all worker
|
|
|
|
*/
|
|
|
|
query share("SmallCore4", TRUE, Worker)
|
|
|
|
|
|
/*
|
|
Create a main memory relation ~SmallCore4~ on each worker
|
|
|
|
*/
|
|
|
|
let S4 = ControlWorkers dloop["", SmallCore4 feed letmconsume["SmallCore4"] ]
|
|
|
|
/*
|
|
Create a mtree for ~SmallCore4~ on each worker
|
|
|
|
*/
|
|
|
|
let S5 = S4 dloop["", . mcreateMtree[Center, "SmallCore4_Center"]]
|
|
|
|
/*
|
|
Create a main memory relation ~SmallCore4~ on the master
|
|
|
|
*/
|
|
|
|
query SmallCore4 feed letmconsume["SmallCore4"]
|
|
|
|
/*
|
|
Create a mtree for ~SmallCore4~ on the master
|
|
|
|
*/
|
|
|
|
query "SmallCore4" mcreateMtree[Center, "SmallCore4_Center"]
|
|
|
|
|
|
/*
|
|
10 Determine Neighbors
|
|
|
|
Load relation ~PoisMembers~ in memory and create a MMM-Tree over it. Search on each partition, for each corepoint within radius + eps for neighbors. Repartition the result by NN.
|
|
Result is, each partition contains the tuple corresponding to the respective smallcore and the neighbors from the other partitions which are within the circle of radius + eps from the smallcore.
|
|
|
|
*/
|
|
|
|
let Numbers = intstream(0, NrSmallCore - 1) transformstream
|
|
ddistribute3["Numbers", NrSmallCore, TRUE, Worker]
|
|
dloop["", . feed extract[Elem]]
|
|
|
|
/*
|
|
Create main memory relation for ~PoisMembers~
|
|
|
|
*/
|
|
|
|
let PoisMembersMain = PoisMembers Numbers dmap2["PoisMembersMain", . feed
|
|
letmconsumeflob["PoisMembersMain" + "_" + num2string(..)]
|
|
, 1238 ]
|
|
|
|
/*
|
|
Create MMM-Tree for ~PoisMembers~
|
|
|
|
*/
|
|
|
|
let PoisMembersMMMTree = PoisMembersMain Numbers
|
|
dmap2["PoisMembersMMMTree",
|
|
. mfeed addid
|
|
mcreateMtree2[CenterGK, TID, "PoisMembersMMMTree" + "_" + num2string(..)]
|
|
, 1238]
|
|
|
|
/*
|
|
Determine neighbors within radius + eps (which is Radius2) for each partition. Relation ~NeighborsByCore~ is of type dfarray.
|
|
|
|
*/
|
|
|
|
let NeighborsByCore = PoisMembersMMMTree PoisMembersMain dmap2["",
|
|
fun(index: ARRAYFUNARG1, indexedrel: ARRAYFUNARG2) SmallCore4 feed
|
|
loopsel[fun(t: TUPLE) index
|
|
mdistRange2[attr(t, CenterGK), attr(t, Radius2)] indexedrel
|
|
gettuples extend[NN: attr(t, N)] ]
|
|
, 1238]
|
|
partition["", .NN, NrSmallCore]
|
|
collect2["NeighborsByCore", 1238]
|
|
|
|
/*
|
|
|
|
11 Classify ~PoisByCore~
|
|
|
|
Load relation ~PoisByCore~ and ~NeighborsByCore~ in memory and create a MMM-Tree over it. For each point in ~PoisByCore~ search on ~NeighborsByCore~ with a circle of size eps.
|
|
If the result set has size > minPts, extend p by attribute IsCorePoint = true, else false. Furthermore determine wheater p is a borderpoint or not. Save result in dfarray ~PoisByCoreClassified~.
|
|
|
|
Create main memory relations (NeighborsByCoreMain1, PoisByCoreMain1) on each worker
|
|
|
|
*/
|
|
|
|
let NeighborsByCoreMainW1 = NeighborsByCore
|
|
dmap["NeighborsByCoreMain1", . feed
|
|
letmconsumeflob["NeighborsByCoreMain1"]]
|
|
|
|
let PoisByCoreMainW1 = PoisByCore
|
|
dmap["PoisByCoreMain1", . feed
|
|
letmconsumeflob["PoisByCoreMain1"]]
|
|
|
|
/*
|
|
Create mtrees (NeighborsByCoreMain1, PoisByCoreMain1) on each worker
|
|
|
|
*/
|
|
|
|
let NeighborsByCoreMMMTreeW1 = NeighborsByCoreMainW1
|
|
dloop["NeighborsByCoreMMMTree1", .
|
|
mcreateMtree[CenterGK, "NeighborsByCoreMMMTree1"]]
|
|
|
|
let PoisByCoreMMMTreeW1 = PoisByCoreMainW1
|
|
dloop["PoisByCoreMMMTree1", .
|
|
mcreateMtree[CenterGK, "PoisByCoreMMMTree1"]]
|
|
|
|
/*
|
|
Create main memory relations (NeighborsByCoreMain1 and PoisByCoreMain1) on the master as well
|
|
Therefore create first an empty dummy-relation.
|
|
|
|
*/
|
|
|
|
let NeighborsByCoreMain1 = NeighborsByCore dmap["", . feed head[0]]
|
|
dsummarize consume
|
|
|
|
let PoisByCoreMain1 = PoisByCore dmap["", . feed head[0]]
|
|
dsummarize consume
|
|
|
|
query NeighborsByCoreMain1 feed letmconsume["NeighborsByCoreMain1"]
|
|
|
|
query PoisByCoreMain1 feed letmconsume["PoisByCoreMain1"]
|
|
|
|
/*
|
|
Create mtrees (NeighborsByCoreMain1, PoisByCoreMain1) on the master as well
|
|
|
|
*/
|
|
|
|
query "NeighborsByCoreMain1" mcreateMtree[CenterGK, "NeighborsByCoreMMMTree1"]
|
|
|
|
query "PoisByCoreMain1" mcreateMtree[CenterGK, "PoisByCoreMMMTree1"]
|
|
|
|
/*
|
|
Determine CorePoints and BorderPoints for each partition ~PoisByCore~.
|
|
|
|
*/
|
|
|
|
let PoisByCoreClassified = PoisByCore dmap["", . feed
|
|
extend[Anz: "NeighborsByCoreMMMTree1" "NeighborsByCoreMain1"
|
|
mdistRange[gk(.Center), eps] count]
|
|
extend[IsCorePoint: .Anz > minPts]
|
|
extend[IsBorderPoint: "NeighborsByCoreMMMTree1" "NeighborsByCoreMain1"
|
|
mdistRange[gk(.Center), eps] filter[.N # .NN] count > 0 ] ]
|
|
|
|
/*
|
|
12 Perform Clustering
|
|
|
|
Perform clustering with parameters eps and minPts on ~PoisByCoreClassified~. Assuming we have less than 100 core points, multiply each corepoint number with 100 and add the cluster number.
|
|
So we have clusters with distinct numbers globally.
|
|
|
|
*/
|
|
|
|
let Cluster = PoisByCoreClassified dloop["", . feed
|
|
extend[Box: gk(bbox(.Center))]
|
|
dbscanR[Box, CId, eps, minPts]
|
|
replaceAttr[CId: ifthenelse(.CId > 0, (.N * 100) + .CId, -2)]
|
|
remove[Box] consume]
|
|
|
|
/*
|
|
13 Determine borderpairs
|
|
|
|
Join each tuple which was classified as a borderpoint with relation ~NeighborsByCore~.
|
|
|
|
*/
|
|
|
|
let BorderPairs = Cluster dmap["", . feed
|
|
filter[.IsBorderPoint]
|
|
loopjoin["NeighborsByCoreMMMTree1" "NeighborsByCoreMain1"
|
|
mdistRange[gk(.Center), eps] filter[.N # .NN] {neighbor}]
|
|
project[Osm_id, N, D, Anz, IsCorePoint, IsBorderPoint,
|
|
CId, Osm_id_neighbor, N_neighbor, D_neighbor, NN_neighbor]]
|
|
|
|
/*
|
|
Collect triples on the master
|
|
|
|
*/
|
|
|
|
let BorderPairsCollected = BorderPairs dsummarize consume
|
|
|
|
/*
|
|
14 Determine cluster merge tasks
|
|
|
|
*/
|
|
|
|
let MergeTask = BorderPairsCollected feed
|
|
projectextend[; PairKey1: .Osm_id + .Osm_id_neighbor, CId1: .CId]
|
|
BorderPairsCollected feed
|
|
projectextend[; PairKey2: .Osm_id_neighbor + .Osm_id, CId2: .CId]
|
|
itHashJoin[PairKey1, PairKey2]
|
|
sort rdup
|
|
filter[.CId1 > 0]
|
|
filter[.CId2 > 0]
|
|
consume
|
|
|
|
/*
|
|
Construct graph
|
|
|
|
*/
|
|
|
|
let MergeGraph = MergeTask feed
|
|
MergeTask feed replaceAttr[CId1: .CId2, CId2: .CId1]
|
|
concat
|
|
constgraph[CId1, CId2, 1.0]
|
|
|
|
/*
|
|
Create table for renumbering
|
|
|
|
*/
|
|
|
|
let RenumberingClusters = connectedcomponents(MergeGraph)
|
|
addcounter[NewCId, 1]
|
|
extend[Nodes: vertices(.Graph)
|
|
projectextend[; CId: get_key(.Vertex)] aconsume]
|
|
remove[Graph]
|
|
unnest[Nodes]
|
|
project[CId, NewCId] renameattr[OldCId: CId]
|
|
consume
|
|
|
|
/*
|
|
Share ~RenumberingClusters~ with worker
|
|
|
|
*/
|
|
|
|
query share("RenumberingClusters", TRUE, Worker)
|
|
|
|
|
|
/*
|
|
15 Apply renumbering on each partition
|
|
|
|
*/
|
|
|
|
query Cluster dloop["", . feed addid RenumberingClusters feed
|
|
itHashJoin[CId, OldCId]
|
|
. updatedirect2[TID; CId: .OldCId] consume]
|
|
|
|
|
|
/*
|
|
The End
|
|
|
|
*/ |