Files
secondo/bin/Scripts/PregelConnectedComponentsPart2.psec
2026-01-23 17:03:45 +08:00

165 lines
3.6 KiB
Plaintext

/*
//paragraph [10] Title: [{\Large \bf ] [}]
//[@] [\verb+@+]
//[%] [\%]
//[&] [\&]
//[ue] [\"{u}]
[10] Computing Strongly Connected Components in Pregel
Part 2
Ralf Hartmut G[ue]ting, March 12, 2019
Run this script with ~SecondoTTYBDB~ and prefix [@][%] or [@][&].
This part can be called repeatedly.
4 Computing Connected Components
4.1 Initialize MM Structures on Master
*/
query memclear()
query meminit(3000)
let Nodes = NodesPersistent feed mconsume;
let Forward = EdgesForward feed
createmgraph3[Source, Target, Cost, Size];
let Backward = EdgesBackward feed
createmgraph3[Source, Target, Cost, Size];
/*
4.2 Set Up Pregel
*/
query setupPregel(Workers);
query remotePregelCommand('query meminit(3000)');
query remotePregelCommand('let Nodes = NodesPersistent feed mconsume;');
query remotePregelCommand(
'let Forward = EdgesForward feed createmgraph3[Source, Target, Cost, Size]'
);
query remotePregelCommand(
'let Backward = EdgesBackward feed createmgraph3[Source, Target, Cost, Size]'
);
query setPregelFunction(Compute, NodePartition);
/*
4.3 Computation
*/
query remotePregelCommand(
'if isDBObject("NActive") then
update NActive := Nodes mfeed filter[.Active] count
else
let NActive = Nodes mfeed filter[.Active] count
endif')
if isDBObject("N") then
update N := createSDArray("NActive", Workers) getValue tie[. + ..]
else
let N = createSDArray("NActive", Workers) getValue tie[. + ..]
endif
while N > 0 do
{
query remotePregelCommand('query Nodes mfeed filter[.Active]
projectextend[; NodeId: .Id, NodePartition: .Partition]
extend[Message: "checkSingle", Value: 0]
initPregelMessagesWorker')
|
query startPregel(-1)
|
query remotePregelCommand('query Nodes mfeed filter[.Active]
projectextend[; NodeId: .Id, NodePartition: .Partition]
extend[Message: "resetColor", Value: 0]
initPregelMessagesWorker')
|
query startPregel(-1)
|
query remotePregelCommand('query Nodes mfeed filter[.Active]
projectextend[; NodeId: .Id, NodePartition: .Partition]
extend[Message: "maxBack", Value: 0]
initPregelMessagesWorker')
|
query startPregel(-1)
|
query remotePregelCommand(
'update NActive := Nodes mfeed filter[.Active] count')
|
update N := createSDArray("NActive", Workers) getValue tie[. + ..]
|
query writeFile(nl + "Number of active nodes =" + num2string(N),
"NActive.txt", TRUE)
}
endwhile
/*
4.4 Overview of the Phases
----
# Phase 1: Remove Singleton Components
query remotePregelCommand('query Nodes mfeed filter[.Active]
projectextend[; NodeId: .Id, NodePartition: .Partition]
extend[Message: "checkSingle", Value: 0]
initPregelMessagesWorker')
query startPregel(-1)
# Phase 2: Spread Maximum Color
query remotePregelCommand('query Nodes mfeed filter[.Active]
projectextend[; NodeId: .Id, NodePartition: .Partition]
extend[Message: "resetColor", Value: 0]
initPregelMessagesWorker')
query startPregel(-1)
# Phase 3: Spread Back to Predecessors of Maximum
query remotePregelCommand('query Nodes mfeed filter[.Active]
projectextend[; NodeId: .Id, NodePartition: .Partition]
extend[Message: "maxBack", Value: 0]
initPregelMessagesWorker')
query startPregel(-1);
----
*/
/*
4.5 Checking the Results
*/
query remotePregelCommand(
'if isDBObject("NodesQ") then update NodesQ := Nodes mfeed consume
else let NodesQ = Nodes mfeed consume
endif')
query createSDArray("NodesQ", Workers) dsummarize head[20] consume;
if isDBObject("Colors") then
update Colors := createSDArray("NodesQ", Workers)
dsummarize sortby[Color] groupby[Color; Anzahl: group count] consume
else let Colors = createSDArray("NodesQ", Workers)
dsummarize sortby[Color] groupby[Color; Anzahl: group count] consume
endif