Files
secondo/Algebras/Pregel/Examples/ShortestPath/singleSourceShortestPath.sec
2026-01-23 17:03:45 +08:00

76 lines
2.3 KiB
Plaintext

delete database shortestpath;
create database shortestpath;
open database shortestpath;
let inf = 10000000;
restore Edges from '../Algebras/Pregel/Examples/PageRank/data/LinksStanford';
update Edges := Edges feed sortby[Source] consume;
#restore Nodes from Nodes;
let NodesPersistent = Edges feed projectextend[;Id: .Source] Edges feed projectextend[;Id: .Target] concat sort rdup extend[Value: inf] consume;
let Nodes = NodesPersistent feed mconsume;
let Messages = [const rel(tuple([Id: int, Value: int])) value
(
(1 0)
)];
restore Workers from Workers12Pregel; #FourTwoWorkers;
let WorkerNum = Workers count;
let PartitionFunction = fun (id: int) (hashvalue(id, WorkerNum));
let Compute = fun
(messages: stream(tuple([Id: int, Value: int])))
( messages sortby[Id] rename[m] groupby[Id_m; MessageValue: group feed min[Value_m]]
Nodes mfeed addid
mergejoin[Id_m, Id] filter[.MessageValue < .Value]
Nodes
mupdatedirect2[TID; Value: .MessageValue]
Edges feed
mergejoin[Id, Source] projectextend[; Id: .Target, Value: .Value + 1]);
let ComputeRouted = fun
( messages: stream(tuple([Id: int, Value: int, Address: int])))
( Compute(messages project[Id, Value]) extend[Address: PartitionFunction(.Id)])
#while Messages count > 0 do update Messages := Compute(Messages feed) consume endwhile;
#
# Distribute
#
let EdgesD = Edges feed ddistribute4["EdgesD", PartitionFunction(.Source), WorkerNum , Workers];
let EdgesSD = EdgesD makeSimple[FALSE, "Edges"];
let NodesD = NodesPersistent feed ddistribute4["NodesD", PartitionFunction(.Id), WorkerNum , Workers];
let NodesSD = NodesD makeSimple[FALSE, "NodesPersistent"];
query share("WorkerNum", TRUE, Workers);
query share("PartitionFunction", TRUE, Workers);
query share("Compute", TRUE, Workers);
query share("ComputeRouted", TRUE, Workers);
query closeWorkers();
query setupPregel(Workers);
query remotePregelCommand('let Nodes = NodesPersistent feed mconsume;');
query setPregelFunction(ComputeRouted, Address);
# query Messages feed projectextend[; Id: 1000, Address: PartitionFunction(.Id)] initPregelMessages;
query Messages feed extend[Address: PartitionFunction(.Id)] initPregelMessages;
query pregelStatus();
query startPregel(1);
# query startPregel(-1);