/* //paragraph [10] Title: [{\Large \bf ] [}] //[@] [\verb+@+] //[%] [\%] //[&] [\&] //[ue] [\"{u}] [10] Computing Strongly Connected Components in Pregel Part 2 Ralf Hartmut G[ue]ting, March 12, 2019 Run this script with ~SecondoTTYBDB~ and prefix [@][%] or [@][&]. This part can be called repeatedly. 4 Computing Connected Components 4.1 Initialize MM Structures on Master */ query memclear() query meminit(3000) let Nodes = NodesPersistent feed mconsume; let Forward = EdgesForward feed createmgraph3[Source, Target, Cost, Size]; let Backward = EdgesBackward feed createmgraph3[Source, Target, Cost, Size]; /* 4.2 Set Up Pregel */ query setupPregel(Workers); query remotePregelCommand('query meminit(3000)'); query remotePregelCommand('let Nodes = NodesPersistent feed mconsume;'); query remotePregelCommand( 'let Forward = EdgesForward feed createmgraph3[Source, Target, Cost, Size]' ); query remotePregelCommand( 'let Backward = EdgesBackward feed createmgraph3[Source, Target, Cost, Size]' ); query setPregelFunction(Compute, NodePartition); /* 4.3 Computation 4.3.1 Phase 1: Remove Singleton Components */ query remotePregelCommand('query Nodes mfeed filter[.Active] projectextend[; NodeId: .Id, NodePartition: .Partition] extend[Message: "checkSingle", Single: 0, Value: 0] initPregelMessagesWorker') query startPregel(-1) /* 4.3.2 Phase 2: Spread the Maximum Color to Neighborhood */ query remotePregelCommand('query Nodes mfeed filter[.Active] projectextend[; NodeId: .Id, NodePartition: .Partition] extend[Message: "resetColor", Single: 0, Value: 0] initPregelMessagesWorker') query startPregel(20) query clearPregelMessages() /* 4.3.3 Phase 3: Confirm Color */ query remotePregelCommand('query Nodes mfeed filter[.Active] projectextend[; NodeId: .Color, NodePartition: PartFun(.Color), Message: "confirmColor", Single: .Id, Value: .Color] initPregelMessagesWorker') query startPregel(-1) /* 4.3.4 Phase 4: Create Edges in Top Level Network */ query remotePregelCommand('query Nodes mfeed filter[.Active] projectextend[; NodeId: .Id, NodePartition: .Partition, Message: "getSuccColor", Single: 0, Value: 0] initPregelMessagesWorker') query startPregel(1) query remotePregelCommand('{delete Nodes3 | let Nodes3 = Nodes mfeed consume}') {delete Nodes3 | let Nodes3 = createSDArray("Nodes3", Workers) dsummarize consume} query remotePregelCommand('{delete Edges | let Edges = Forward mg3feed consume}') {delete Edges | let Edges = createSDArray("Edges", Workers) dsummarize consume} query Edges feed {e} Nodes3 feed filter[.Color = .Id] {n1} itHashJoin[Source_e, Id_n1] Nodes3 feed filter[.Color = .Id] {n2} itHashJoin[Target_e, Id_n2] project[Source_e, Pos_n1, Target_e, Pos_n2] extend[E: create_sline(.Pos_n1, .Pos_n2)] consume query remotePregelCommand( 'if isDBObject("NActive") then update NActive := Nodes mfeed filter[.Active] count else let NActive = Nodes mfeed filter[.Active] count endif') if isDBObject("N") then update N := createSDArray("NActive", Workers) getValue tie[. + ..] else let N = createSDArray("NActive", Workers) getValue tie[. + ..] endif while N > 0 do { query remotePregelCommand('query Nodes mfeed filter[.Active] projectextend[; NodeId: .Id, NodePartition: .Partition] extend[Message: "checkSingle", Value: 0] initPregelMessagesWorker') | query startPregel(-1) | query remotePregelCommand('query Nodes mfeed filter[.Active] projectextend[; NodeId: .Id, NodePartition: .Partition] extend[Message: "resetColor", Value: 0] initPregelMessagesWorker') | query startPregel(-1) | query remotePregelCommand('query Nodes mfeed filter[.Active] projectextend[; NodeId: .Id, NodePartition: .Partition] extend[Message: "maxBack", Value: 0] initPregelMessagesWorker') | query startPregel(-1) | query remotePregelCommand( 'update NActive := Nodes mfeed filter[.Active] count') | update N := createSDArray("NActive", Workers) getValue tie[. + ..] | query writeFile(nl + "Number of active nodes =" + num2string(N), "NActive.txt", TRUE) } endwhile /* 4.4 Overview of the Phases ---- # Phase 1: Remove Singleton Components query remotePregelCommand('query Nodes mfeed filter[.Active] projectextend[; NodeId: .Id, NodePartition: .Partition] extend[Message: "checkSingle", Value: 0] initPregelMessagesWorker') query startPregel(-1) # Phase 2: Spread Maximum Color query remotePregelCommand('query Nodes mfeed filter[.Active] projectextend[; NodeId: .Id, NodePartition: .Partition] extend[Message: "resetColor", Value: 0] initPregelMessagesWorker') query startPregel(-1) # Phase 3: Spread Back to Predecessors of Maximum query remotePregelCommand('query Nodes mfeed filter[.Active] projectextend[; NodeId: .Id, NodePartition: .Partition] extend[Message: "maxBack", Value: 0] initPregelMessagesWorker') query startPregel(-1); ---- */ /* 4.5 Checking the Results */ query remotePregelCommand( 'if isDBObject("NodesQ") then update NodesQ := Nodes mfeed consume else let NodesQ = Nodes mfeed consume endif') query createSDArray("NodesQ", Workers) dsummarize head[20] consume; let NodesQ = createSDArray("NodesQ", Workers) dsummarize consume; if isDBObject("Colors") then update Colors := createSDArray("NodesQ", Workers) dsummarize sortby[Color] groupby[Color; Anzahl: group count] consume else let Colors = createSDArray("NodesQ", Workers) dsummarize sortby[Color] groupby[Color; Anzahl: group count] consume endif