/* 1 Updates 1.1 Data Import */ let RawDataByIdUpdate = [const rel(tuple([Id: int, DateTime: string, Point: string])) value ()] csvimport['cabsrome/taxidata01', 0, "", ";", FALSE, FALSE] projectextend[Id ; Inst: str2instant(substr(.DateTime, 1, 10) + "-" + substr(.DateTime, 12, 23)), Pos: makepoint( str2real( substr(.Point, findFirst(.Point, " ") + 1, findFirst(.Point, ")") - 1)), str2real( substr(.Point, findFirst(.Point, "(") + 1, findFirst(.Point, " ") - 1)) )] ddistribute4["RawDataByIdUpdate", hashvalue(.Id, 999997) , NSlots, Workers] /* 1.2 Creating Moving Objects */ let CabsTripsUpdate = RawDataByIdUpdate dmap["CabsTripsUpdate", . feed sortby[Id, Inst] groupby[Id; TotalTrip: group feed approximate[Inst, Pos, TRUE, create_duration(0, 60000)] removeNoise[35.0, 2000.0, create_geoid("WGS1984")] ] projectextendstream[Id; Trip: .TotalTrip sim_trips[create_duration(0, 180000), 100.0, create_geoid("WGS1984")] ] filter[no_components(.Trip) > 1] consume ] /* 1.3 Multiplying the Dataset */ let CabsIdUpdate = CabsTripsUpdate dmap["", . feed loopsel[fun(t: TUPLE) intstream(0, 4) namedtransformstream[Nx] intstream(0, 1) namedtransformstream[Ny] product projectextend[ ; Id: attr(t, Id) + ((5 * .Ny) + .Nx) * 1000, Trip: attr(t, Trip) translate[[const duration value (0 0)], .Nx * deltax, .Ny * deltay]] ] ] partition["", hashvalue(.Id, 999997), 0] collect2["", myPort] dmap["CabsIdUpdate", . feed consume] /* 1.4 Updating CabsId */ query CabsIdUpdate CabsId CabsId_Id_btree dmap3["", $1 feed $2 insert $3 insertbtree[Id] count, myPort] getValue let UpdateStart = [const rel(tuple([Id: int, DateTime: string, Point: string])) value ()] csvimport['cabsrome/taxidata01', 0, "", ";", FALSE, FALSE] projectextend[Id ; Inst: str2instant(substr(.DateTime, 1, 10) + "-" + substr(.DateTime, 12, 23)), Pos: makepoint( str2real( substr(.Point, findFirst(.Point, " ") + 1, findFirst(.Point, ")") - 1)), str2real( substr(.Point, findFirst(.Point, "(") + 1, findFirst(.Point, " ") - 1)) )] extract[Inst] query share("UpdateStart", TRUE, Workers) /* Find update pairs: */ let UpdatePairsId = CabsId CabsId dmap2["UpdatePairsId", . feed addid filter[UpdateStart > inst(final(.Trip))] filter[(UpdateStart - inst(final(.Trip))) < create_duration(0, 60000)] {c1} .. feed addid filter[UpdateStart < inst(initial(.Trip))] filter[(inst(initial(.Trip)) - UpdateStart) < create_duration(0, 60000)] {c2} itHashJoin[Id_c1, Id_c2] filter[ (inst(initial(.Trip_c2)) - inst(final(.Trip_c1))) < create_duration(0, 60000)] consume, myPort] /* Insert concatenated trips: */ query UpdatePairsId CabsId CabsId_Id_btree dmap3["", . feed projectextend[ ; Id: .Id_c1, Trip: units(.Trip_c1) the_unit(val(final(.Trip_c1)), val(initial(.Trip_c2)), inst(final(.Trip_c1)), inst(initial(.Trip_c2)), TRUE, FALSE) feed concat units(.Trip_c2) concat transformstream makemvalue[Elem] ] $2 insert $3 insertbtree[Id] count, myPort] getValue /* Delete trips belonging to update pairs: */ query UpdatePairsId CabsId CabsId_Id_btree dmap3["", . feed projectextend[; TID: .TID_c1] . feed projectextend[; TID: .TID_c2] concat $2 deletebyid4[TID] $3 deletebtree[Id] count, myPort] getValue /* 1.4 Updating CabsST Redistribute: */ let CabsSTUpdate = CabsIdUpdate partitionF["", . feed extendstream[Cell: cellnumber(bbox(.Trip), grid)] extend[Box: bbox(.Trip)], ..Cell, NSlots] collect2["", myPort] dmap["CabsSTUpdate", . feed consume] /* Insert: */ query CabsSTUpdate CabsST CabsST_Trip_rtree dmap3["", . feed $2 insert $3 insertrtree[Box] count, myPort] getValue /* Find update pairs: */ let UpdatePairsST = CabsST CabsST dmap2["UpdatePairsST", . feed addid filter[UpdateStart > inst(final(.Trip))] filter[(UpdateStart - inst(final(.Trip))) < create_duration(0, 60000)] {c1} .. feed addid filter[UpdateStart < inst(initial(.Trip))] filter[(inst(initial(.Trip)) - UpdateStart) < create_duration(0, 60000)] {c2} itHashJoin[Id_c1, Id_c2] filter[.Cell_c1 = .Cell_c2] filter[ (inst(initial(.Trip_c2)) - inst(final(.Trip_c1))) < create_duration(0, 60000)] consume, myPort] /* Insert concatenated trips: */ query UpdatePairsST CabsST CabsST_Trip_rtree dmap3["", . feed projectextend[ ; Id: .Id_c1, Trip: units(.Trip_c1) the_unit(val(final(.Trip_c1)), val(initial(.Trip_c2)), inst(final(.Trip_c1)), inst(initial(.Trip_c2)), TRUE, FALSE) feed concat units(.Trip_c2) concat transformstream makemvalue[Elem], Cell: .Cell_c1 ] extend[Box: bbox(.Trip)] $2 insert $3 insertrtree[Box] count, myPort] getValue /* Delete trips contained in upate pairs: */ query UpdatePairsST CabsST CabsST_Trip_rtree dmap3["", . feed projectextend[; TID: .TID_c1] . feed projectextend[; TID: .TID_c2] concat $2 deletebyid4[TID] $3 deletertree[Box] count, myPort] getValue