165 lines
3.6 KiB
Plaintext
165 lines
3.6 KiB
Plaintext
|
|
/*
|
||
|
|
//paragraph [10] Title: [{\Large \bf ] [}]
|
||
|
|
//[@] [\verb+@+]
|
||
|
|
//[%] [\%]
|
||
|
|
//[&] [\&]
|
||
|
|
//[ue] [\"{u}]
|
||
|
|
|
||
|
|
[10] Computing Strongly Connected Components in Pregel
|
||
|
|
|
||
|
|
Part 2
|
||
|
|
|
||
|
|
Ralf Hartmut G[ue]ting, March 12, 2019
|
||
|
|
|
||
|
|
Run this script with ~SecondoTTYBDB~ and prefix [@][%] or [@][&].
|
||
|
|
|
||
|
|
This part can be called repeatedly.
|
||
|
|
|
||
|
|
4 Computing Connected Components
|
||
|
|
|
||
|
|
4.1 Initialize MM Structures on Master
|
||
|
|
|
||
|
|
*/
|
||
|
|
query memclear()
|
||
|
|
|
||
|
|
query meminit(3000)
|
||
|
|
|
||
|
|
let Nodes = NodesPersistent feed mconsume;
|
||
|
|
let Forward = EdgesForward feed
|
||
|
|
createmgraph3[Source, Target, Cost, Size];
|
||
|
|
let Backward = EdgesBackward feed
|
||
|
|
createmgraph3[Source, Target, Cost, Size];
|
||
|
|
|
||
|
|
/*
|
||
|
|
4.2 Set Up Pregel
|
||
|
|
|
||
|
|
*/
|
||
|
|
query setupPregel(Workers);
|
||
|
|
|
||
|
|
query remotePregelCommand('query meminit(3000)');
|
||
|
|
|
||
|
|
query remotePregelCommand('let Nodes = NodesPersistent feed mconsume;');
|
||
|
|
|
||
|
|
query remotePregelCommand(
|
||
|
|
'let Forward = EdgesForward feed createmgraph3[Source, Target, Cost, Size]'
|
||
|
|
);
|
||
|
|
|
||
|
|
query remotePregelCommand(
|
||
|
|
'let Backward = EdgesBackward feed createmgraph3[Source, Target, Cost, Size]'
|
||
|
|
);
|
||
|
|
|
||
|
|
query setPregelFunction(Compute, NodePartition);
|
||
|
|
|
||
|
|
|
||
|
|
/*
|
||
|
|
4.3 Computation
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
query remotePregelCommand(
|
||
|
|
'if isDBObject("NActive") then
|
||
|
|
update NActive := Nodes mfeed filter[.Active] count
|
||
|
|
else
|
||
|
|
let NActive = Nodes mfeed filter[.Active] count
|
||
|
|
endif')
|
||
|
|
|
||
|
|
if isDBObject("N") then
|
||
|
|
update N := createSDArray("NActive", Workers) getValue tie[. + ..]
|
||
|
|
else
|
||
|
|
let N = createSDArray("NActive", Workers) getValue tie[. + ..]
|
||
|
|
endif
|
||
|
|
|
||
|
|
while N > 0 do
|
||
|
|
{
|
||
|
|
query remotePregelCommand('query Nodes mfeed filter[.Active]
|
||
|
|
projectextend[; NodeId: .Id, NodePartition: .Partition]
|
||
|
|
extend[Message: "checkSingle", Value: 0]
|
||
|
|
initPregelMessagesWorker')
|
||
|
|
|
|
||
|
|
query startPregel(-1)
|
||
|
|
|
|
||
|
|
query remotePregelCommand('query Nodes mfeed filter[.Active]
|
||
|
|
projectextend[; NodeId: .Id, NodePartition: .Partition]
|
||
|
|
extend[Message: "resetColor", Value: 0]
|
||
|
|
initPregelMessagesWorker')
|
||
|
|
|
|
||
|
|
query startPregel(-1)
|
||
|
|
|
|
||
|
|
query remotePregelCommand('query Nodes mfeed filter[.Active]
|
||
|
|
projectextend[; NodeId: .Id, NodePartition: .Partition]
|
||
|
|
extend[Message: "maxBack", Value: 0]
|
||
|
|
initPregelMessagesWorker')
|
||
|
|
|
|
||
|
|
query startPregel(-1)
|
||
|
|
|
|
||
|
|
query remotePregelCommand(
|
||
|
|
'update NActive := Nodes mfeed filter[.Active] count')
|
||
|
|
|
|
||
|
|
update N := createSDArray("NActive", Workers) getValue tie[. + ..]
|
||
|
|
|
|
||
|
|
query writeFile(nl + "Number of active nodes =" + num2string(N),
|
||
|
|
"NActive.txt", TRUE)
|
||
|
|
}
|
||
|
|
endwhile
|
||
|
|
|
||
|
|
/*
|
||
|
|
4.4 Overview of the Phases
|
||
|
|
|
||
|
|
----
|
||
|
|
# Phase 1: Remove Singleton Components
|
||
|
|
|
||
|
|
query remotePregelCommand('query Nodes mfeed filter[.Active]
|
||
|
|
projectextend[; NodeId: .Id, NodePartition: .Partition]
|
||
|
|
extend[Message: "checkSingle", Value: 0]
|
||
|
|
initPregelMessagesWorker')
|
||
|
|
|
||
|
|
query startPregel(-1)
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
# Phase 2: Spread Maximum Color
|
||
|
|
|
||
|
|
query remotePregelCommand('query Nodes mfeed filter[.Active]
|
||
|
|
projectextend[; NodeId: .Id, NodePartition: .Partition]
|
||
|
|
extend[Message: "resetColor", Value: 0]
|
||
|
|
initPregelMessagesWorker')
|
||
|
|
|
||
|
|
query startPregel(-1)
|
||
|
|
|
||
|
|
# Phase 3: Spread Back to Predecessors of Maximum
|
||
|
|
|
||
|
|
query remotePregelCommand('query Nodes mfeed filter[.Active]
|
||
|
|
projectextend[; NodeId: .Id, NodePartition: .Partition]
|
||
|
|
extend[Message: "maxBack", Value: 0]
|
||
|
|
initPregelMessagesWorker')
|
||
|
|
|
||
|
|
query startPregel(-1);
|
||
|
|
|
||
|
|
----
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
|
||
|
|
/*
|
||
|
|
4.5 Checking the Results
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
query remotePregelCommand(
|
||
|
|
'if isDBObject("NodesQ") then update NodesQ := Nodes mfeed consume
|
||
|
|
else let NodesQ = Nodes mfeed consume
|
||
|
|
endif')
|
||
|
|
|
||
|
|
query createSDArray("NodesQ", Workers) dsummarize head[20] consume;
|
||
|
|
|
||
|
|
if isDBObject("Colors") then
|
||
|
|
update Colors := createSDArray("NodesQ", Workers)
|
||
|
|
dsummarize sortby[Color] groupby[Color; Anzahl: group count] consume
|
||
|
|
else let Colors = createSDArray("NodesQ", Workers)
|
||
|
|
dsummarize sortby[Color] groupby[Color; Anzahl: group count] consume
|
||
|
|
endif
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|