244 lines
5.4 KiB
Plaintext
244 lines
5.4 KiB
Plaintext
|
|
/*
|
||
|
|
//paragraph [10] Title: [{\Large \bf ] [}]
|
||
|
|
//[@] [\verb+@+]
|
||
|
|
//[%] [\%]
|
||
|
|
//[&] [\&]
|
||
|
|
//[ue] [\"{u}]
|
||
|
|
|
||
|
|
[10] Computing Strongly Connected Components in Pregel
|
||
|
|
|
||
|
|
Part 2
|
||
|
|
|
||
|
|
Ralf Hartmut G[ue]ting, March 12, 2019
|
||
|
|
|
||
|
|
Run this script with ~SecondoTTYBDB~ and prefix [@][%] or [@][&].
|
||
|
|
|
||
|
|
This part can be called repeatedly.
|
||
|
|
|
||
|
|
4 Computing Connected Components
|
||
|
|
|
||
|
|
4.1 Initialize MM Structures on Master
|
||
|
|
|
||
|
|
*/
|
||
|
|
query memclear()
|
||
|
|
|
||
|
|
query meminit(3000)
|
||
|
|
|
||
|
|
let Nodes = NodesPersistent feed mconsume;
|
||
|
|
let Forward = EdgesForward feed
|
||
|
|
createmgraph3[Source, Target, Cost, Size];
|
||
|
|
let Backward = EdgesBackward feed
|
||
|
|
createmgraph3[Source, Target, Cost, Size];
|
||
|
|
|
||
|
|
/*
|
||
|
|
4.2 Set Up Pregel
|
||
|
|
|
||
|
|
*/
|
||
|
|
query setupPregel(Workers);
|
||
|
|
|
||
|
|
query remotePregelCommand('query meminit(3000)');
|
||
|
|
|
||
|
|
query remotePregelCommand('let Nodes = NodesPersistent feed mconsume;');
|
||
|
|
|
||
|
|
query remotePregelCommand(
|
||
|
|
'let Forward = EdgesForward feed createmgraph3[Source, Target, Cost, Size]'
|
||
|
|
);
|
||
|
|
|
||
|
|
query remotePregelCommand(
|
||
|
|
'let Backward = EdgesBackward feed createmgraph3[Source, Target, Cost, Size]'
|
||
|
|
);
|
||
|
|
|
||
|
|
query setPregelFunction(Compute, NodePartition);
|
||
|
|
|
||
|
|
|
||
|
|
/*
|
||
|
|
4.3 Computation
|
||
|
|
|
||
|
|
4.3.1 Phase 1: Remove Singleton Components
|
||
|
|
|
||
|
|
*/
|
||
|
|
query remotePregelCommand('query Nodes mfeed filter[.Active]
|
||
|
|
projectextend[; NodeId: .Id, NodePartition: .Partition]
|
||
|
|
extend[Message: "checkSingle", Single: 0, Value: 0]
|
||
|
|
initPregelMessagesWorker')
|
||
|
|
|
||
|
|
query startPregel(-1)
|
||
|
|
|
||
|
|
|
||
|
|
/*
|
||
|
|
|
||
|
|
4.3.2 Phase 2: Spread the Maximum Color to Neighborhood
|
||
|
|
|
||
|
|
*/
|
||
|
|
query remotePregelCommand('query Nodes mfeed filter[.Active]
|
||
|
|
projectextend[; NodeId: .Id, NodePartition: .Partition]
|
||
|
|
extend[Message: "resetColor", Single: 0, Value: 0]
|
||
|
|
initPregelMessagesWorker')
|
||
|
|
|
||
|
|
query startPregel(20)
|
||
|
|
|
||
|
|
query clearPregelMessages()
|
||
|
|
|
||
|
|
/*
|
||
|
|
4.3.3 Phase 3: Confirm Color
|
||
|
|
|
||
|
|
*/
|
||
|
|
query remotePregelCommand('query Nodes mfeed filter[.Active]
|
||
|
|
projectextend[; NodeId: .Color, NodePartition: PartFun(.Color),
|
||
|
|
Message: "confirmColor", Single: .Id, Value: .Color]
|
||
|
|
initPregelMessagesWorker')
|
||
|
|
|
||
|
|
query startPregel(-1)
|
||
|
|
|
||
|
|
|
||
|
|
/*
|
||
|
|
4.3.4 Phase 4: Create Edges in Top Level Network
|
||
|
|
|
||
|
|
*/
|
||
|
|
query remotePregelCommand('query Nodes mfeed filter[.Active]
|
||
|
|
projectextend[; NodeId: .Id, NodePartition: .Partition,
|
||
|
|
Message: "getSuccColor", Single: 0, Value: 0]
|
||
|
|
initPregelMessagesWorker')
|
||
|
|
|
||
|
|
query startPregel(1)
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
query remotePregelCommand('{delete Nodes3 | let Nodes3 =
|
||
|
|
Nodes mfeed consume}')
|
||
|
|
|
||
|
|
{delete Nodes3 | let Nodes3 = createSDArray("Nodes3", Workers) dsummarize
|
||
|
|
consume}
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
query remotePregelCommand('{delete Edges | let Edges =
|
||
|
|
Forward mg3feed consume}')
|
||
|
|
|
||
|
|
{delete Edges | let Edges = createSDArray("Edges", Workers) dsummarize
|
||
|
|
consume}
|
||
|
|
|
||
|
|
query Edges feed {e} Nodes3 feed filter[.Color = .Id] {n1}
|
||
|
|
itHashJoin[Source_e, Id_n1] Nodes3 feed filter[.Color = .Id] {n2}
|
||
|
|
itHashJoin[Target_e, Id_n2] project[Source_e, Pos_n1, Target_e, Pos_n2]
|
||
|
|
extend[E: create_sline(.Pos_n1, .Pos_n2)] consume
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
query remotePregelCommand(
|
||
|
|
'if isDBObject("NActive") then
|
||
|
|
update NActive := Nodes mfeed filter[.Active] count
|
||
|
|
else
|
||
|
|
let NActive = Nodes mfeed filter[.Active] count
|
||
|
|
endif')
|
||
|
|
|
||
|
|
if isDBObject("N") then
|
||
|
|
update N := createSDArray("NActive", Workers) getValue tie[. + ..]
|
||
|
|
else
|
||
|
|
let N = createSDArray("NActive", Workers) getValue tie[. + ..]
|
||
|
|
endif
|
||
|
|
|
||
|
|
while N > 0 do
|
||
|
|
{
|
||
|
|
query remotePregelCommand('query Nodes mfeed filter[.Active]
|
||
|
|
projectextend[; NodeId: .Id, NodePartition: .Partition]
|
||
|
|
extend[Message: "checkSingle", Value: 0]
|
||
|
|
initPregelMessagesWorker')
|
||
|
|
|
|
||
|
|
query startPregel(-1)
|
||
|
|
|
|
||
|
|
query remotePregelCommand('query Nodes mfeed filter[.Active]
|
||
|
|
projectextend[; NodeId: .Id, NodePartition: .Partition]
|
||
|
|
extend[Message: "resetColor", Value: 0]
|
||
|
|
initPregelMessagesWorker')
|
||
|
|
|
|
||
|
|
query startPregel(-1)
|
||
|
|
|
|
||
|
|
query remotePregelCommand('query Nodes mfeed filter[.Active]
|
||
|
|
projectextend[; NodeId: .Id, NodePartition: .Partition]
|
||
|
|
extend[Message: "maxBack", Value: 0]
|
||
|
|
initPregelMessagesWorker')
|
||
|
|
|
|
||
|
|
query startPregel(-1)
|
||
|
|
|
|
||
|
|
query remotePregelCommand(
|
||
|
|
'update NActive := Nodes mfeed filter[.Active] count')
|
||
|
|
|
|
||
|
|
update N := createSDArray("NActive", Workers) getValue tie[. + ..]
|
||
|
|
|
|
||
|
|
query writeFile(nl + "Number of active nodes =" + num2string(N),
|
||
|
|
"NActive.txt", TRUE)
|
||
|
|
}
|
||
|
|
endwhile
|
||
|
|
|
||
|
|
/*
|
||
|
|
4.4 Overview of the Phases
|
||
|
|
|
||
|
|
----
|
||
|
|
# Phase 1: Remove Singleton Components
|
||
|
|
|
||
|
|
query remotePregelCommand('query Nodes mfeed filter[.Active]
|
||
|
|
projectextend[; NodeId: .Id, NodePartition: .Partition]
|
||
|
|
extend[Message: "checkSingle", Value: 0]
|
||
|
|
initPregelMessagesWorker')
|
||
|
|
|
||
|
|
query startPregel(-1)
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
# Phase 2: Spread Maximum Color
|
||
|
|
|
||
|
|
query remotePregelCommand('query Nodes mfeed filter[.Active]
|
||
|
|
projectextend[; NodeId: .Id, NodePartition: .Partition]
|
||
|
|
extend[Message: "resetColor", Value: 0]
|
||
|
|
initPregelMessagesWorker')
|
||
|
|
|
||
|
|
query startPregel(-1)
|
||
|
|
|
||
|
|
# Phase 3: Spread Back to Predecessors of Maximum
|
||
|
|
|
||
|
|
query remotePregelCommand('query Nodes mfeed filter[.Active]
|
||
|
|
projectextend[; NodeId: .Id, NodePartition: .Partition]
|
||
|
|
extend[Message: "maxBack", Value: 0]
|
||
|
|
initPregelMessagesWorker')
|
||
|
|
|
||
|
|
query startPregel(-1);
|
||
|
|
|
||
|
|
----
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
|
||
|
|
/*
|
||
|
|
4.5 Checking the Results
|
||
|
|
|
||
|
|
*/
|
||
|
|
|
||
|
|
query remotePregelCommand(
|
||
|
|
'if isDBObject("NodesQ") then update NodesQ := Nodes mfeed consume
|
||
|
|
else let NodesQ = Nodes mfeed consume
|
||
|
|
endif')
|
||
|
|
|
||
|
|
query createSDArray("NodesQ", Workers) dsummarize head[20] consume;
|
||
|
|
|
||
|
|
let NodesQ = createSDArray("NodesQ", Workers) dsummarize consume;
|
||
|
|
|
||
|
|
if isDBObject("Colors") then
|
||
|
|
update Colors := createSDArray("NodesQ", Workers)
|
||
|
|
dsummarize sortby[Color] groupby[Color; Anzahl: group count] consume
|
||
|
|
else let Colors = createSDArray("NodesQ", Workers)
|
||
|
|
dsummarize sortby[Color] groupby[Color; Anzahl: group count] consume
|
||
|
|
endif
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
|