# Experiments 08.08.2018 # Database arnsberg # # assumes a relation Edges is present delete Nodes3 # filter[bbox(.NodePos) inside hombruch] let Nodes3 = Edges feed projectextend[; Node: .Source, NodePos: .SourcePos] Edges feed projectextend[; Node: .Target, NodePos: .TargetPos] concat sort rdup extend[Value: inf] consume query Nodes3 count # 517140 nodes # 1785 in Hombruch query memclear() query meminit(10000) let NodesM = Nodes3 feed addid projectextend[; NodeId: .Node, Node: .TID, NodePos: .NodePos, Value: 0.0] mconsume delete Edges3a # filter[bbox(.SourcePos) inside hombruch] let Edges3a = Edges feed projectextend[Source, Target, SourcePos; Cost: size(gk(.Curve))] filter[isdefined(.Cost)] consume # 1304503 edges # 4566 in Hombruch delete Edges3 let Edges3 = Edges3a feed {e} NodesM mfeed addid {source} itHashJoin[Source_e, NodeId_source] NodesM mfeed addid {target} itHashJoin[Target_e, NodeId_target] projectextend[; Source: tid2int(.Node_source), Target: tid2int(.Node_target), Cost: .Cost_e] consume query Edges3 count ########################## ### (re)initialize query memclear() query meminit(10000) let NodesM = Nodes3 feed addid projectextend[; NodeId: .Node, Node: .TID, NodePos: .NodePos, Value: 0.0] mconsume let EdgesMGraph = Edges3 feed createmgraph3[Source, Target, Cost, Edges3 count] let Messages = NodesM mfeed projectextend[; MessageNode: .Node, MessageValue: int2real(tid2int(.Node))] mconsume ### # delete Iterations let Iterations = intstream(1, 100000) namedtransformstream[Iteration] extend[NoMessages: Messages mfeed Messages mdeletedirect remove[TID] loopjoin[fun(t: TUPLE) attr(t, MessageNode) feed NodesM gettuples] filter[relevantMessage(.)] NodesM mupdatedirect2[MessageNode; Value: newNodeValue(., ..)] project[Node, Value] loopjoin[EdgesMGraph mg3successors[tid2int(.Node)]] projectextend[; MessageNode: int2tid(.Target), MessageValue: newMessageValue(.)] mblock groupby2[MessageNode; MessageValue: fun(t2: TUPLE, r: real) combinedMessage(t2, r) :: combinedInitialValue] meminsert[Messages] count] printstream cancel[.NoMessages = 0] consume # 1363 Iterations # 96:11 minutes groupby2[MessageNode; MessageValue: combinedMessage(., ..) :: combinedInitialValue] groupby2[MessageNode; MessageValue: fun(t5: m, r: real) ifthenelse(attr(t5, MessageValue) > r, attr(t5, MessageValue), r) :: combinedInitialValue] sortby[MessageNode] groupby[MessageNode; MessageValue: aggMessage(group)] # Functions # # initialization of Nodes let NodesM = Nodes3 feed addid projectextend[; NodeId: .Node, Node: .TID, NodePos: .NodePos, Value: 0.0] mconsume # initial messages let Messages = NodesM mfeed projectextend[; MessageNode: .Node, MessageValue: int2real(tid2int(.Node))] mconsume # handling of messages type mn = tuple([MessageNode: tid, MessageValue: real, NodeId: int, Node: tid, NodePos: point, Value: real]) type n = tuple([NodeId: int, Node: tid, NodePos: point, Value: real]) type ne = tuple([Node: tid, Value: real, Source: int, Target: int, Cost: real]) type m = tuple([MessageNode: tid, MessageValue: real]) let relevantMessage = fun (t: mn) attr(t, MessageValue) > attr(t, Value) let newNodeValue = fun(t2: mn, t3: n) attr(t2, MessageValue) let newMessageValue = fun(t4: ne) attr(t4, Value) let aggMessage = fun (r: rel(tuple([MessageNode: tid, MessageValue: real]))) r feed max[MessageValue] let combinedMessage = fun(t5: m, r: real) ifthenelse(attr(t5, MessageValue) > r, attr(t5, MessageValue), r) let combinedInitialValue = 0.0