/* //paragraph [10] Title: [{\Large \bf ] [}] //[@] [\verb+@+] //[%] [\%] //[&] [\&] //[ue] [\"{u}] [10] Computing Strongly Connected Components in Pregel Part 1 Ralf Hartmut G[ue]ting, March 12, 2019 Run this script with ~SecondoTTYBDB~ and prefix [@][%] or [@][&]. Part 1 builds the graph and defines the computing function. Part 2 computes connected components and can be called repeatedly. Data needed are 1 a relation with edges of the graph called ~EdgesA~ as it is created by script ~OrderedRelationGraphFromFullOSMImport.SEC~, 2 a relation ~Workers~ defining workers in the Pregel format. An example tuple is ---- Host : 132.176.69.75 Port : 1783 Config : SecondoConfig.ini MessageServerPort : 1883 ---- 1 Create a Graph */ # Database must exist and be open # adapt next line restore Workers from Workers12Pregel; let WorkerNum = Workers count; let PartFun = fun (id: int) (hashvalue(id, WorkerNum)); # adapt one of next lines # restore EdgesA from EdgesHombruch; # let EdgesA = "EdgesDortmund.bin" ffeed5 consume; let EdgesA = "EdgesArnsberg.bin" ffeed5 consume; update EdgesA := EdgesA feed sortby[Source] consume; # add undefined curves (for applications in general) query EdgesA feed filter[not(isdefined(.Curve))] EdgesA updatedirect[Curve: create_sline(.SourcePos, .TargetPos)] count # extend[R: randint(999997)] sortby[R] remove[R] let NodesPersistent = EdgesA feed projectextend[; PrelId: .Source, Pos: .SourcePos] EdgesA feed projectextend[; PrelId: .Target, Pos: .TargetPos] concat sort rdup extend[R: randint(999997)] sortby[R] remove[R] addcounter[Id, 1] extend[Partition: PartFun(.Id)] extend[Color: 0, Active: TRUE] consume # (PrelId int) (Pos point) (Id int) (Partition int) (Color int) # (Active bool) let Size = (NodesPersistent feed max[Id]) + 1 let EdgesForward = EdgesA feed extend[Cost: size(gk(.Curve))] project[Source, Target, Cost] NodesPersistent feed itHashJoin[Source, PrelId] NodesPersistent feed {n} itHashJoin[Target, PrelId_n] projectextend[; Source: .Id, PartitionSource: .Partition, Target: .Id_n, PartitionTarget: .Partition_n, Cost: .Cost] consume # (Source, PartitionSource, Target, PartitionTarget, Cost) let EdgesBackward = EdgesForward feed projectextend[; Source: .Target, PartitionSource: .PartitionTarget, Target: .Source, PartitionTarget: .PartitionSource, Cost: .Cost] sortby[Source] consume # (Source, PartitionSource, Target, PartitionTarget, Cost) /* 2 Computing Strongly Connected Components Nodes and edges have schemas: ---- Node(PrelId: int, Pos: point, Id: int, Partition: int, Color: int, Active: bool) Edge(Source: int, PartitionSource: int, Target: int, PartitionTarget: int, Cost: real) ---- Initially nodes are numbered 1, ..., ~n~ in field ~Id~, ~Color~ = 0, ~Active~ = TRUE. Only active nodes receive messages. 2.1 Phase 1: Remove Singleton Components ---- To all nodes: checkSingle() on checkSingle(): if count(successors) = 0 or count(predecessors) = 0 then Color := Id; Active := false; send(predecessors, checkSingle()); send(successors, checkSingle()); disconnect; endif ---- 2.2 Phase 2: Spread the Maximum Color ---- To all nodes: resetColor() on resetColor(): Color := 0; send(successors, newColor(Id)); on newColor(X): if X > Color then Color := X; send(successor, newColor(X)) endif ---- 2.3 Phase 3: Spread Back to Predecessors of Maximum ---- To all nodes: maxBack() on maxBack(): if Color = Id then send(predecessor, maxColor(Color)); Active := FALSE endif on maxColor(X): if Color = X then send(predecessor, maxColor(X)); Active := FALSE endif ---- 2.4 Control ---- while count(ActiveNodes) = 0 do Phase 1; Phase 2; Phase 3; endwhile ---- 3 The Compute Function The node programs are implemented in the following ~Compute~ function. */ query memclear() let Nodes = NodesPersistent feed mconsume; let Forward = EdgesForward feed createmgraph3[Source, Target, Cost, Size]; let Backward = EdgesBackward feed createmgraph3[Source, Target, Cost, Size]; let InitialMessages = NodesPersistent feed projectextend[; NodeId: .Id, NodePartition: .Partition] extend[Message: "newColor", Value: .NodeId] consume let NoMessages = fun() InitialMessages feed head[0] let Compute = fun (messages: stream(tuple([NodeId: int, NodePartition: int, Message: string, Value: int]))) messages sortby[NodeId] groupby[NodeId, Message; Max: group feed max[Value]] Nodes mfeed addid filter[.Active] mergejoin[NodeId, Id] loopsel[fun(t: TUPLE) attr(t, Message) switch[ "checkSingle", t feed filter[(Forward mg3numsuccessors[.Id] = 0) or (Backward mg3numsuccessors[.Id] = 0)] Nodes mupdatedirect2[TID; Color: .Id, Active: FALSE] loopsel[fun(t2: TUPLE) Backward mg3successors[attr(t2, Id)] projectextend[; NodeId: .Target, NodePartition: PartFun(.Target), Message: "checkSingle", Value: 0] Forward mg3successors[attr(t2, Id)] projectextend[; NodeId: .Target, NodePartition: PartFun(.Target), Message: "checkSingle", Value: 0] concat t2 feed extend[DisconnectF: Forward mg3disconnect[.Id], DisconnectB: Backward mg3disconnect[.Id]] filter[FALSE] projectextend[; NodeId: .Id, NodePartition: PartFun(.Id), Message: "checkSingle", Value: 0] concat ] ; "resetColor", t feed Nodes mupdatedirect2[TID; Color: 0] projectextend[; NodeId: .Id, NodePartition: PartFun(.Id), Message: "newColor", Value: .Id] ; "newColor", t feed filter[.Max > .Color] Nodes mupdatedirect2[TID; Color: .Max] loopjoin[Forward mg3successors[.Id]] projectextend[; NodeId: .Target, NodePartition: PartFun(.Target), Message: "newColor", Value: .Color] ; "maxBack", t feed filter[.Id = .Color] Nodes mupdatedirect2[TID; Active: FALSE] loopjoin[Backward mg3successors[.Id]] projectextend[; NodeId: .Target, NodePartition: PartFun(.Target), Message: "maxColor", Value: .Color] ; "maxColor", t feed filter[.Color = .Max] Nodes mupdatedirect2[TID; Active: FALSE] loopjoin[Backward mg3successors[.Id]] projectextend[; NodeId: .Target, NodePartition: PartFun(.Target), Message: "maxColor", Value: .Color] ; NoMessages()] ] query share("Compute", TRUE, Workers); /* 4 Distribute Data to Workers */ let NodesD = NodesPersistent feed ddistribute4["NodesD", PartFun(.Id), WorkerNum , Workers]; let NodesSD = NodesD makeSimple[FALSE, "NodesPersistent"]; delete NodesD let EdgesDf = EdgesForward feed ddistribute4["EdgesDf", PartFun(.Source), WorkerNum , Workers]; let EdgesSDf = EdgesDf makeSimple[FALSE, "EdgesForward"]; delete EdgesDf let EdgesDb = EdgesBackward feed ddistribute4["EdgesDb", PartFun(.Source), WorkerNum , Workers]; let EdgesSDb = EdgesDb makeSimple[FALSE, "EdgesBackward"]; delete EdgesDb query share("WorkerNum", TRUE, Workers); query share("PartFun", TRUE, Workers); query share("Size", TRUE, Workers); query share("InitialMessages", TRUE, Workers); query share("NoMessages", TRUE, Workers); let nl = ' '