Files
secondo/bin/Scripts/Pregel10.sec

192 lines
3.6 KiB
Plaintext
Raw Permalink Normal View History

2026-01-23 17:03:45 +08:00
# Experiments 08.08.2018
# Database arnsberg
#
# assumes a relation Edges is present
delete Nodes3
# filter[bbox(.NodePos) inside hombruch]
let Nodes3 =
Edges feed projectextend[; Node: .Source, NodePos: .SourcePos]
Edges feed projectextend[; Node: .Target, NodePos: .TargetPos]
concat
sort rdup
extend[Value: inf]
consume
query Nodes3 count
# 517140 nodes
# 1785 in Hombruch
query memclear()
query meminit(10000)
let NodesM = Nodes3 feed addid
projectextend[; NodeId: .Node, Node: .TID, NodePos: .NodePos, Value: 0.0] mconsume
delete Edges3a
# filter[bbox(.SourcePos) inside hombruch]
let Edges3a = Edges feed
projectextend[Source, Target, SourcePos; Cost: size(gk(.Curve))]
filter[isdefined(.Cost)]
consume
# 1304503 edges
# 4566 in Hombruch
delete Edges3
let Edges3 =
Edges3a feed {e}
NodesM mfeed addid {source} itHashJoin[Source_e, NodeId_source]
NodesM mfeed addid {target} itHashJoin[Target_e, NodeId_target]
projectextend[; Source: tid2int(.Node_source), Target: tid2int(.Node_target),
Cost: .Cost_e]
consume
query Edges3 count
##########################
### (re)initialize
query memclear()
query meminit(10000)
let NodesM = Nodes3 feed addid
projectextend[; NodeId: .Node, Node: .TID, NodePos: .NodePos, Value: 0.0] mconsume
let EdgesMGraph = Edges3 feed createmgraph3[Source, Target, Cost, Edges3 count]
let Messages = NodesM mfeed projectextend[; MessageNode: .Node, MessageValue: int2real(tid2int(.Node))] mconsume
###
# delete Iterations
let Iterations =
intstream(1, 100000)
namedtransformstream[Iteration]
extend[NoMessages:
Messages mfeed Messages mdeletedirect remove[TID]
loopjoin[fun(t: TUPLE) attr(t, MessageNode) feed NodesM gettuples]
filter[relevantMessage(.)]
NodesM mupdatedirect2[MessageNode; Value: newNodeValue(., ..)]
project[Node, Value]
loopjoin[EdgesMGraph mg3successors[tid2int(.Node)]]
projectextend[; MessageNode: int2tid(.Target), MessageValue: newMessageValue(.)]
mblock
groupby2[MessageNode; MessageValue: fun(t2: TUPLE, r: real)
combinedMessage(t2, r) :: combinedInitialValue]
meminsert[Messages]
count]
printstream
cancel[.NoMessages = 0]
consume
# 1363 Iterations
# 96:11 minutes
groupby2[MessageNode; MessageValue:
combinedMessage(., ..) :: combinedInitialValue]
groupby2[MessageNode; MessageValue:
fun(t5: m, r: real) ifthenelse(attr(t5, MessageValue) > r, attr(t5, MessageValue), r) :: combinedInitialValue]
sortby[MessageNode]
groupby[MessageNode; MessageValue: aggMessage(group)]
# Functions
#
# initialization of Nodes
let NodesM = Nodes3 feed addid
projectextend[; NodeId: .Node, Node: .TID, NodePos: .NodePos, Value: 0.0] mconsume
# initial messages
let Messages = NodesM mfeed projectextend[; MessageNode: .Node, MessageValue: int2real(tid2int(.Node))] mconsume
# handling of messages
type mn = tuple([MessageNode: tid, MessageValue: real, NodeId: int, Node: tid, NodePos: point, Value: real])
type n = tuple([NodeId: int, Node: tid, NodePos: point, Value: real])
type ne = tuple([Node: tid, Value: real, Source: int, Target: int, Cost: real])
type m = tuple([MessageNode: tid, MessageValue: real])
let relevantMessage = fun (t: mn) attr(t, MessageValue) > attr(t, Value)
let newNodeValue = fun(t2: mn, t3: n) attr(t2, MessageValue)
let newMessageValue = fun(t4: ne) attr(t4, Value)
let aggMessage = fun (r: rel(tuple([MessageNode: tid, MessageValue: real]))) r feed max[MessageValue]
let combinedMessage = fun(t5: m, r: real) ifthenelse(attr(t5, MessageValue) > r, attr(t5, MessageValue), r)
let combinedInitialValue = 0.0