/* 1 Creating a Database */ create database taxis; open database taxis; /* Adapt the following: */ restore Workers from WorkersNewton; let NSlots = 40; let NSlots2 = 120; let myPort = 1238 /* 2 Data Import 2.1 Sequential Import and Distribution from the Master */ let Start = createintdarray("Start", Workers); delete Start; let RawDataById = [const rel(tuple([Id: int, DateTime: string, Point: string])) value ()] csvimport['cabsrome/taxidata00', 0, "", ";", FALSE, FALSE] projectextend[Id ; Inst: str2instant(substr(.DateTime, 1, 10) + "-" + substr(.DateTime, 12, 23)), Pos: makepoint( str2real( substr(.Point, findFirst(.Point, " ") + 1, findFirst(.Point, ")") - 1)), str2real( substr(.Point, findFirst(.Point, "(") + 1, findFirst(.Point, " ") - 1)) )] ddistribute4["RawDataById", hashvalue(.Id, 999997) , NSlots, Workers] /* 3 Creating Moving Objects */ let CabsTrips = RawDataById dmap["CabsTrips", . feed sortby[Id, Inst] groupby[Id; TotalTrip: group feed approximate[Inst, Pos, TRUE, create_duration(0, 60000)] removeNoise[35.0, 2000.0, create_geoid("WGS1984")] ] projectextendstream[Id; Trip: .TotalTrip sim_trips[create_duration(0, 180000), 100.0, create_geoid("WGS1984")] ] filter[no_components(.Trip) > 1] consume ] /* 4 Multiplying the Dataset */ let bboxR = CabsTrips dmap["", . feed projectextend[; Box: bbox(.Trip)] transformstream collect_box[FALSE]] dsummarize collect_box[FALSE] let deltax = maxD(bboxR, 1) - minD(bboxR, 1); let deltay = maxD(bboxR, 2) - minD(bboxR, 2) query share("deltax", TRUE, Workers); query share("deltay", TRUE, Workers); let CabsId = CabsTrips dmap["", . feed loopsel[fun(t: TUPLE) intstream(0, 4) namedtransformstream[Nx] intstream(0, 1) namedtransformstream[Ny] product projectextend[ ; Id: attr(t, Id) + ((5 * .Ny) + .Nx) * 1000, Trip: attr(t, Trip) translate[[const duration value (0 0)], .Nx * deltax, .Ny * deltay]] ] ] partition["", hashvalue(.Id, 999997), 0] collect2["", myPort] dmap["CabsId", . feed consume] /* 5 Partitioning 5.1 Partitioning by Object identifier 5.2 Spatio-temporal Partitioning */ let bboxR10 = CabsId dmap["", . feed projectextend[; Box: bbox(.Trip)] transformstream collect_box[TRUE]] dsummarize collect_box[TRUE] let grid = createCellGrid3D(11.0, 41.0, 5140.0, 2.0, 1.5, 5.0, 4, 2) query share("grid", TRUE, Workers) let CabsST = CabsId partitionF["", . feed extendstream[Cell: cellnumber(bbox(.Trip), grid)] extend[Box: bbox(.Trip)], ..Cell, 0] collect2["", myPort] dmap["CabsST", . feed consume] /* 6 Indexing 6.1 By Identifier */ let CabsId_Id_btree = CabsId dmap["CabsId_Id_btree", . createbtree[Id]] /* 6.2 By Spatio-temporal Information */ let CabsST_Trip_rtree = CabsST dmap["CabsST_Trip_rtree", . feed addid extend[ScaledBox: scalerect(.Box, 1000000.0, 1000000.0, 1000.0)] sortby[ScaledBox] remove[ScaledBox] bulkloadrtree[Box] ] /* 7 Querying 7.1 By Identifier */ query CabsId_Id_btree CabsId dmap2["", . .. exactmatch[11], myPort] dsummarize consume /* 7.2 By Spatio-temporal Dimension */ let qObject = rectangle3(12.464446, 12.5013, 41.879938, 41.911759, instant2real(theInstant(2014, 02, 01, 19)), instant2real(theInstant(2014, 02, 01, 20))); query share("qObject", TRUE, Workers) query CabsST_Trip_rtree CabsST dmap2["", . .. windowintersects[qObject] filter[gridintersects(grid, qObject, bbox(.Trip), .Cell)] filter[.Trip passes rectproject(qObject, 1, 2) rect2region] filter[.Trip present rect2periods(qObject)], myPort] dsummarize consume query hashvalue(11, 999997); query cellnumber(qObject, grid) consume query hashvalue(11, 999997) feed CabsId_Id_btree CabsId pdmap2["", . .. exactmatch[11], myPort] dsummarize consume query cellnumber(qObject, grid) CabsST_Trip_rtree CabsST pdmap2["", . .. windowintersects[qObject] filter[gridintersects(grid, qObject, bbox(.Trip), .Cell)] filter[.Trip passes rectproject(qObject, 1, 2) rect2region] filter[.Trip present rect2periods(qObject)], myPort] dsummarize consume