Files
secondo/bin/Scripts/PregelConnectedComponents2Part1.psec

317 lines
9.8 KiB
Plaintext
Raw Permalink Normal View History

2026-01-23 17:03:45 +08:00
/*
//paragraph [10] Title: [{\Large \bf ] [}]
//[@] [\verb+@+]
//[%] [\%]
//[&] [\&]
//[ue] [\"{u}]
[10] Computing Strongly Connected Components in Pregel
Part 1
Ralf Hartmut G[ue]ting, March 12, 2019
Run this script with ~SecondoTTYBDB~ and prefix [@][%] or [@][&]. Part 1 builds the graph and defines the computing function. Part 2 computes connected components and can be called repeatedly.
Data needed are
1 a relation with edges of the graph called ~EdgesA~ as it is created by script ~OrderedRelationGraphFromFullOSMImport.SEC~,
2 a relation ~Workers~ defining workers in the Pregel format. An example tuple is
----
Host : 132.176.69.75
Port : 1783
Config : SecondoConfig.ini
MessageServerPort : 1883
----
1 Create a Graph
*/
# Database must exist and be open
# adapt next line
restore Workers from Workers12Pregel;
let WorkerNum = Workers count;
let PartFun = fun (id: int) (hashvalue(id, WorkerNum));
# adapt one of next lines
restore EdgesA from EdgesHombruch;
# let EdgesA = "EdgesDortmund.bin" ffeed5 consume;
# let EdgesA = "EdgesArnsberg.bin" ffeed5 consume;
update EdgesA := EdgesA feed sortby[Source] consume;
# add undefined curves (for applications in general)
query EdgesA feed filter[not(isdefined(.Curve))]
EdgesA updatedirect[Curve: create_sline(.SourcePos, .TargetPos)]
count
let NodesPersistent =
EdgesA feed projectextend[; PrelId: .Source, Pos: .SourcePos]
EdgesA feed projectextend[; PrelId: .Target, Pos: .TargetPos]
concat
sort rdup
extend[R: randint(999997)] sortby[R] remove[R]
addcounter[Id, 1]
extend[Partition: PartFun(.Id)]
extend[Color: 0, Active: TRUE]
consume
# (PrelId int) (Pos point) (Id int) (Partition int) (Color int)
# (Active bool)
let Size = (NodesPersistent feed max[Id]) + 1
let EdgesForward = EdgesA feed extend[Cost: size(gk(.Curve))]
project[Source, Target, Cost]
NodesPersistent feed itHashJoin[Source, PrelId]
NodesPersistent feed {n} itHashJoin[Target, PrelId_n]
projectextend[; Source: .Id, PartitionSource: .Partition, Target: .Id_n,
PartitionTarget: .Partition_n, Cost: .Cost]
consume
# (Source, PartitionSource, Target, PartitionTarget, Cost)
let EdgesBackward = EdgesForward feed projectextend[; Source: .Target,
PartitionSource: .PartitionTarget, Target: .Source,
PartitionTarget: .PartitionSource, Cost: .Cost] sortby[Source] consume
# (Source, PartitionSource, Target, PartitionTarget, Cost)
/*
2 Computing Strongly Connected Components
Nodes and edges have schemas:
----
Node(PrelId: int, Pos: point, Id: int, Partition: int, Color: int,
Active: bool)
Edge(Source: int, PartitionSource: int, Target: int, PartitionTarget: int,
Cost: real)
----
Initially nodes are numbered 1, ..., ~n~ in field ~Id~, ~Color~ = 0, ~Active~ = TRUE. Only active nodes receive messages.
2.1 Phase 1: Remove Singleton Components
----
To all nodes: checkSingle()
on checkSingle():
if count(successors) = 0 or count(predecessors) = 0 then
Color := Id;
Active := false;
send(predecessors, checkSingle());
send(successors, checkSingle());
disconnect;
endif
----
2.2 Phase 2: Spread the Maximum Color to Neighborhood
We first propagate the maximum color to a neighborhood with radius 30 by letting Pregel run for 30 rounds.
----
To all nodes: resetColor()
For 30 rounds:
on resetColor():
Color := 0;
send(successors, newColor(Id));
on newColor(X):
if X > Color then
Color := X;
send(successor, newColor(X))
endif
----
2.3 Phase 3: Confirm Color
Now each node has received the largest color from some predecessor ~v~ within radius 30. It may be that ~v~ itself has received another color from an even stronger predecessor within radius 30 from ~v~.
We now let each node ~w~ ask the respective node ~v~ whether it has changed its color. If so, the node ~v~ sends the new color to ~w~ which updates its color and sends a new confirmation request to the node providing the new color.
In this way, after some iterations, each node has received the color of a ~chief~ which is a node that has successfully spread its color but not changed its color itself. Hence for a chief holds ~Id~ = ~Color~. This implies that now we have only as many colors as there are chiefs.
The nodes that have received color ~c~ from chief ~c~ we call ~realm~(~c~)
----
For all nodes: send(Color, confirmColor(Id, Color));
on confirmColor(Sender, X):
if X # Color then
send(Sender, colorIs(Color))
endif
on colorIs(X):
if X # Color then
Color := X;
send(Color, confirmColor(Id, Color))
endif
----
2.4 Phase 4: Create Edges in Top Level Network
We now consider edges between two nodes ~a~ and ~b~ that have different colors, say, from ~a~ to ~b~. This means we have a directed edge from ~realm~(~a~) to ~realm~(~b~)
For each such edge, we create an additional edge of the same direction connecting the two chiefs, within the partition of the start chief.
Note that for messages with two parameters (~Single~ and ~Value~) messages are grouped by the first (~Single~) parameter whereas the values of the second parameter (~Value~) are aggregated, currently into the maximum value. Hence, for example, in
----
send(successors, checkColor(Color, _))
----
a successor receives exactly one message for each incoming color.
----
To all nodes: getSuccColor()
on getSuccColor():
send(successors, checkColor(Color, _))
on checkColor(X, _):
if Color # X then
send(X, createEdge(Color, _))
endif
on createEdge(X, _):
graph.insertEdge(Id, X, 1.0)
----
3 The Compute Function
The node programs are implemented in the following ~Compute~ function.
*/
query memclear()
let Nodes = NodesPersistent feed mconsume;
let Forward = EdgesForward feed createmgraph3[Source, Target, Cost, Size];
let Backward = EdgesBackward feed
createmgraph3[Source, Target, Cost, Size];
let NoMessages = fun() NodesPersistent feed head[1]
projectextend[; NodeId: .Id, NodePartition: .Partition]
extend[Message: "ERROR", Single: 0, Value: 0]
let Compute = fun (messages: stream(tuple([NodeId: int,
NodePartition: int, Message: string, Single: int, Value: int])))
messages
sortby[NodeId, Single]
groupby[NodeId, Single, Message
; Max: group feed max[Value]]
Nodes mfeed addid filter[.Active] mergejoin[NodeId, Id]
loopsel[fun(t: TUPLE)
attr(t, Message)
switch[
"checkSingle",
t feed filter[(Forward mg3numsuccessors[.Id] = 0) or
(Backward mg3numsuccessors[.Id] = 0)]
Nodes mupdatedirect2[TID; Color: .Id, Active: FALSE]
loopsel[fun(t2: TUPLE)
Backward mg3successors[attr(t2, Id)]
projectextend[; NodeId: .Target,
NodePartition: PartFun(.Target),
Message: "checkSingle", Single: 0, Value: 0]
Forward mg3successors[attr(t2, Id)]
projectextend[; NodeId: .Target,
NodePartition: PartFun(.Target),
Message: "checkSingle", Single: 0, Value: 0]
concat
t2 feed extend[DisconnectF: Forward mg3disconnect[.Id],
DisconnectB: Backward mg3disconnect[.Id]]
filter[FALSE]
projectextend[; NodeId: .Id,
NodePartition: PartFun(.Id),
Message: "checkSingle", Single: 0, Value: 0]
concat
]
; "resetColor",
t feed
Nodes mupdatedirect2[TID; Color: 0]
projectextend[; NodeId: .Id,
NodePartition: PartFun(.Id),
Message: "newColor", Single: 0, Value: .Id]
; "newColor",
t feed filter[.Max > .Color]
Nodes mupdatedirect2[TID; Color: .Max]
loopjoin[Forward mg3successors[.Id]]
projectextend[; NodeId: .Target,
NodePartition: PartFun(.Target),
Message: "newColor", Single: 0, Value: .Color]
; "confirmColor",
t feed filter[.Max # .Color]
projectextend[; NodeId: .Single,
NodePartition: PartFun(.Single),
Message: "colorIs", Single: 0, Value: .Color]
; "colorIs",
t feed filter[.Max # .Color]
Nodes mupdatedirect2[TID; Color: .Max]
projectextend[; NodeId: .Color,
NodePartition: PartFun(.Color),
Message: "confirmColor", Single: .Id, Value: .Color]
; "getSuccColor",
t feed
loopjoin[Forward mg3successors[.Id]]
projectextend[; NodeId: .Target,
NodePartition: PartFun(.Target),
Message: "checkColor", Single: .Color, Value: 0]
; "checkColor",
t feed filter[.Color # .Single]
projectextend[; NodeId: .Single,
NodePartition: PartFun(.Single),
Message: "createEdge", Single: .Color, Value: 0]
; "createEdge",
t feed
projectextend[; Source: .Id, PartitionSource: PartFun(.Id),
Target: .Single, PartitionTarget: PartFun(.Single), Cost: 1.0]
mg3insert[Forward]
projectextend[; Source: .Target, PartitionSource: .PartitionTarget,
Target: .Source, PartitionTarget: .PartitionSource, Cost: 1.0]
mg3insert[Backward]
filter[FALSE]
projectextend[; NodeId: 0,
NodePartition: PartFun(0),
Message: "empty", Single: 0, Value: 0]
; NoMessages()]
]
query share("Compute", TRUE, Workers);
/*
4 Distribute Data to Workers
*/
let NodesD = NodesPersistent feed ddistribute4["NodesD", PartFun(.Id),
WorkerNum , Workers];
let NodesSD = NodesD makeSimple[FALSE, "NodesPersistent"];
delete NodesD
let EdgesDf = EdgesForward feed ddistribute4["EdgesDf",
PartFun(.Source), WorkerNum , Workers];
let EdgesSDf = EdgesDf makeSimple[FALSE, "EdgesForward"];
delete EdgesDf
let EdgesDb = EdgesBackward feed ddistribute4["EdgesDb",
PartFun(.Source), WorkerNum , Workers];
let EdgesSDb = EdgesDb makeSimple[FALSE, "EdgesBackward"];
delete EdgesDb
query share("WorkerNum", TRUE, Workers);
query share("PartFun", TRUE, Workers);
query share("Size", TRUE, Workers);
query share("NoMessages", TRUE, Workers);
let nl = '
'