181 lines
4.0 KiB
Plaintext
181 lines
4.0 KiB
Plaintext
|
|
/*
|
||
|
|
//paragraph [10] title: [{\Large \bf ] [}]
|
||
|
|
//[Contents] [\tableofcontents]
|
||
|
|
//[ue] [\"{u}]
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
[10] Distributed Sorting of a dfarray
|
||
|
|
|
||
|
|
|
||
|
|
[Contents]
|
||
|
|
|
||
|
|
1 Overview
|
||
|
|
|
||
|
|
This script is for distributed sorting of a dfarray by OsmID.
|
||
|
|
|
||
|
|
It is a commented script suitable for viewing with ~pdview~. To be run with
|
||
|
|
|
||
|
|
---- @%Scripts/DistributedSortingDfarrayPD.sec or
|
||
|
|
@&Scripts/DistributedSortingDfarrayPD.sec
|
||
|
|
----
|
||
|
|
|
||
|
|
The algorithm is the following:
|
||
|
|
|
||
|
|
1) Each worker takes a sample from its partition and send it to the master.
|
||
|
|
|
||
|
|
2) Sort the samples on the master and create the boundaries by taking every kth element.
|
||
|
|
|
||
|
|
3) Share boundaries with all worker, create a main memory relation for them and create an AVL-Tree for this relation.
|
||
|
|
|
||
|
|
4) For each tuple (a) of each partition traverse the AVL-Tree and determine the tuple (b) of the AVL-Tree which is the greatest one that is smaller than the tuple (b).
|
||
|
|
|
||
|
|
5) Sort the tuples for each slot.
|
||
|
|
|
||
|
|
|
||
|
|
2 Preparations
|
||
|
|
|
||
|
|
Preparations:
|
||
|
|
|
||
|
|
* include the Distributed2Algebra in makefile.algebras and recompile secondo
|
||
|
|
|
||
|
|
* get the desired shape-file from download.geofabrik.de
|
||
|
|
|
||
|
|
* create and open a database
|
||
|
|
|
||
|
|
* import desired relation using the adapted script ~importGermanyOsm.psec~
|
||
|
|
|
||
|
|
* restore Workers in relation ~Worker~
|
||
|
|
|
||
|
|
* start the remoteMonitors for the workers
|
||
|
|
|
||
|
|
* distribute relation ~Buildings~ as ~BuildingsB1~
|
||
|
|
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
#restore Worker from WorkerNewton32
|
||
|
|
|
||
|
|
let SizeWorker = Worker count
|
||
|
|
|
||
|
|
let NrSlots = SizeWorker * 2
|
||
|
|
|
||
|
|
/*
|
||
|
|
determine the size of samples
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
let SizeSample = 1000
|
||
|
|
|
||
|
|
/*
|
||
|
|
Create a DArray ~ControlWorkers~
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
let ControlWorkers = intstream(0, SizeWorker - 1) transformstream
|
||
|
|
ddistribute3["ControlWorkers", SizeWorker, TRUE, Worker]
|
||
|
|
dloop["", . feed extract[Elem]]
|
||
|
|
|
||
|
|
/*
|
||
|
|
Run script DistCost.sec for cost measurements
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
@&Scripts/DistCost.sec
|
||
|
|
|
||
|
|
/*
|
||
|
|
3 Distribute relation
|
||
|
|
|
||
|
|
If yet not done distribute relation ~Buildings~ to worker as a dfarray.
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
let BuildingsB1 = Buildings feed
|
||
|
|
dfdistribute3["BuildingsB1", NrSlots, TRUE, Worker]
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
/*
|
||
|
|
4 Store size of relation ~Buildings~ and slotsize of the distributed array
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
let SizeBuildingsB1 = BuildingsB1 dmap["", . feed count] getValue tie[. + ..]
|
||
|
|
|
||
|
|
let SizeSlots = size(BuildingsB1)
|
||
|
|
|
||
|
|
/*
|
||
|
|
5 Determine the number of fractions, for sampling and share with worker
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
let Fraction = (SizeBuildingsB1 div SizeSlots) div SizeSample
|
||
|
|
|
||
|
|
query share("Fraction", TRUE, Worker)
|
||
|
|
|
||
|
|
/*
|
||
|
|
6 Determine borders and share with worker
|
||
|
|
|
||
|
|
From each partition take approximately 500 samples and send them to the master.
|
||
|
|
Sort them on the master and select every 500th element. The number of Boundaries should now be equal to the number of Slots.
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
let Boundaries = BuildingsB1 dmap["", . feed nth[Fraction, FALSE]
|
||
|
|
project[Osm_id]] dsummarize sort nth[SizeSample, TRUE] addcounter[D, 1]
|
||
|
|
consume
|
||
|
|
|
||
|
|
query share("Boundaries", TRUE, Worker)
|
||
|
|
|
||
|
|
/*
|
||
|
|
7 Create AVL-Trees
|
||
|
|
|
||
|
|
Create a main memory relation ~Boundaries~ and an AVL-Tree index on its attribute OsmId on each worker.
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
query ControlWorkers dloop["", Boundaries feed letmconsume["Boundaries"]
|
||
|
|
mcreateAVLtree[Osm_id] ]
|
||
|
|
|
||
|
|
/*
|
||
|
|
Create as well on the master
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
query Boundaries feed letmconsume["Boundaries"]
|
||
|
|
mcreateAVLtree[Osm_id]
|
||
|
|
|
||
|
|
/*
|
||
|
|
8 Determine corresponding slots and create a dfmatrix
|
||
|
|
|
||
|
|
Determine for each tuple (1) the tuple (2) indexed in the Boundaries-AVL-Tree whose OsmID is the greatest one that is smaller than the OsmID of the tuple (1).
|
||
|
|
Create a dfmatrix by partition the data.
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
let BuildingsB2 = BuildingsB1
|
||
|
|
partition["", "Boundaries_Osm_id" "Boundaries"
|
||
|
|
matchbelow[.Osm_id] extract[D], SizeSlots]
|
||
|
|
|
||
|
|
/*
|
||
|
|
9 Sort on each worker
|
||
|
|
Sort on each worker in parallel. Save costs for evaluation.
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
update LastCommand := distCostReset(ControlWorkers)
|
||
|
|
|
||
|
|
let BuildingsB2SortedOsm_id = BuildingsB2
|
||
|
|
areduce["", . feed sortby[Osm_id], 1238]
|
||
|
|
|
||
|
|
let CostsSorting = distCostSave(ControlWorkers)
|
||
|
|
|
||
|
|
/*
|
||
|
|
10 Show costs in javagui
|
||
|
|
|
||
|
|
Execute this query in the GUI. There will be a box for each worker representing the time for local sorting.
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
#query distCostBoxes(CostsSorting, 0.0, 5.0)
|
||
|
|
#query distCostUtil(CostsSorting)
|