/* ---- This file is part of SECONDO. Copyright (C) 2019, Faculty of Mathematics and Computer Science, Database Systems for New Applications. SECONDO is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. SECONDO is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with SECONDO; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ---- //[<] [\ensuremath{<}] //[>] [\ensuremath{>}] \setcounter{tocdepth}{3} \tableofcontents 1 Implementation of the multithread sptial join operator */ #include #include "opSpatialJoin.h" #include "NestedList.h" // required at many places #include "QueryProcessor.h" // needed for implementing value mappings #include "Operator.h" // for operator creation #include "StandardTypes.h" // provides int, real, string, bool type #include "Symbols.h" // predefined strings #include "ListUtils.h" // useful functions for nested lists #include "LogMsg.h" // send error messages #include #include #include #include "Algebras/Rectangle/RectangleAlgebra.h" #include using namespace mthreaded; using namespace std; extern NestedList* nl; extern QueryProcessor* qp; namespace spatialJoinGlobal { static constexpr double MEMFACTOR = 1.2; constexpr unsigned int DIMENSIONS = 2; condition_variable workers; mutex workersDone_; bool workersDone; size_t threadsDone; } using namespace spatialJoinGlobal; CandidateWorker::CandidateWorker( size_t _globalMem, size_t _coreNoWorker, size_t _streamInNo, shared_ptr _tupleBuffer, shared_ptr _partBufferR, shared_ptr _partBufferS, pair _joinAttr, const double _resize, TupleType* _resultTupleType, const Rect* _gridcell) : maxMem(_globalMem), workersMem(_globalMem), coreNoWorker(_coreNoWorker), streamInNo(_streamInNo), tupleBuffer(_tupleBuffer), partBufferR(_partBufferR), partBufferS(_partBufferS), joinAttr(_joinAttr), resize(_resize), resultTupleType(_resultTupleType), gridcell(_gridcell) { countInMem = 0; if (resize == 0.0) { calcBbox = make_shared>( [](Tuple* t, size_t attr) { Rect rect = ((StandardSpatialAttribute<2>*) t->GetAttribute(attr))->BoundingBox(); return rect; }); } else { calcBbox = make_shared>( [=](Tuple* t, size_t attr) { Rect rect = ((StandardSpatialAttribute<2>*) t->GetAttribute(attr))->BoundingBox(); return rect.Extend(resize); }); } rtreeR = make_shared>(MINRTREE, MAXRTREE); } CandidateWorker::~CandidateWorker() { rtreeR.reset(); } // Thread void CandidateWorker::operator()() { workersDone = false; Tuple* tupleR = partBufferR->dequeue(); ttR = tupleR->GetTupleType(); ttR->IncReference(); size_t tupleSize = tupleR->GetMemSize(); TupleId id = 0; bool overflowR = false; shared_ptr overflowBufferR = make_shared(ttR); while (tupleR != nullptr) { calcRtree(tupleR, id, overflowBufferR, overflowR); ++id; tupleR = partBufferR->dequeue(); } overflowBufferR->closeWrite(); Tuple* tupleS = partBufferS->dequeue(); if (tupleS != nullptr) { TupleType* ttS = tupleS->GetTupleType(); ttS->IncReference(); shared_ptr overflowBufferS; if (overflowR) { overflowBufferS = make_shared(ttS); } while (tupleS != nullptr) { calcResult(tupleS); if (overflowR) { overflowBufferS->appendTuple(tupleS); } else { std::lock_guard lockT(mutexTupleCounter_); tupleS->DeleteIfAllowed(); } //cout << tupleS->GetNumOfRefs() << "*"; tupleS = partBufferS->dequeue(); } freeRTree(); rtreeR = make_shared>(MINRTREE, MAXRTREE); if (overflowR) { const size_t countOverflow = (size_t) id - countInMem - 1; size_t iterationsR = calcIterations(countOverflow, tupleSize); cout << "iterations on R: " << iterationsR << endl; TupleId oneRun = countOverflow / iterationsR; overflowBufferR->openRead(); for (size_t i = 0; i < iterationsR; ++i) { for (TupleId id = 0; id < ((i == 0) ? (oneRun + (TupleId) (countOverflow % iterationsR)) : oneRun); ++id) { Tuple* tupleR = overflowBufferR->readTuple(); bufferRMem.push_back(tupleR); rtreeR->insert(((StandardSpatialAttribute*) tupleR->GetAttribute(joinAttr.first)) ->BoundingBox(), id); } overflowBufferS->openRead(); Tuple* tupleS = overflowBufferS->readTuple(); while (tupleS != nullptr) { calcResult(tupleS); { std::lock_guard lockT(mutexTupleCounter_); tupleS->DeleteIfAllowed(); } tupleS = overflowBufferS->readTuple(); } overflowBufferS->closeWrite(); freeRTree(); rtreeR = make_shared>(MINRTREE, MAXRTREE); } overflowBufferR->closeWrite(); } ttS->DeleteIfAllowed(); } ttR->DeleteIfAllowed(); delete gridcell; --threadsDone; if (threadsDone == 0) { tupleBuffer->enqueue(nullptr); lock_guard lock(workersDone_); workersDone = true; workers.notify_all(); } else { std::unique_lock lock(workersDone_); workers.wait(lock, [&] { return workersDone; }); } } void CandidateWorker::calcRtree(Tuple* tuple, TupleId id, shared_ptr overflowBufferR, bool &overflowR) { if (!overflowR) { calcMem(tuple); bufferRMem.push_back(tuple); rtreeR->insert((*calcBbox)(tuple, joinAttr.first), id); workersMem = workersMem - rtreeR->usedMem(); if (workersMem > maxMem) { overflowR = true; countInMem = (size_t) id; } } else { overflowBufferR->appendTuple(tuple); } } void CandidateWorker::calcResult(Tuple* tuple) { Rect bboxS = ((StandardSpatialAttribute*) tuple->GetAttribute(joinAttr.second))->BoundingBox(); mmrtree::RtreeT::iterator* it = rtreeR->find(bboxS); const TupleId* id; while ((id = it->next())) { Tuple* tupleR = bufferRMem[(size_t) *id]; Rect bboxR = (*calcBbox)(tupleR, joinAttr.first); if (reportTopright(topright(&bboxR), topright(&bboxS))) { auto* result = new Tuple(resultTupleType); Concat(tupleR, tuple, result); tupleBuffer->enqueue(result); } } delete it; } size_t CandidateWorker::topright(Rect* r1) const { size_t value = 0; if (r1->MaxD(0) >= gridcell->MaxD(0)) { value++; } if (r1->MaxD(1) >= gridcell->MaxD(1)) { value += 2; } return value; } inline bool CandidateWorker::reportTopright(size_t r1, size_t r2) const { return ((r1 & r2) == 0) || (r1 + r2 == 3); } inline void CandidateWorker::calcMem(Tuple* tuple) { { workersMem = workersMem - tuple->GetMemSize() - sizeof(void*) + rtreeR->usedMem(); } } void CandidateWorker::freeRTree() { size_t tupleMemSize = 0; for (Tuple* tupleR : bufferRMem) { tupleMemSize += tupleR->GetMemSize() + sizeof(void*); std::lock_guard lockT(mutexTupleCounter_); tupleR->DeleteIfAllowed(); } size_t usedMemRTree = rtreeR->usedMem(); workersMem = workersMem - tupleMemSize - usedMemRTree; bufferRMem.clear(); } size_t CandidateWorker::calcIterations(const size_t countOverflow, const size_t tupleSize) const { size_t memOverflow = ULONG_MAX; size_t iterations = 0; while (memOverflow > maxMem) { ++iterations; memOverflow = rtreeR->guessSize(countOverflow / iterations, true); memOverflow += (countOverflow / iterations) * (sizeof(void*) + tupleSize); } return iterations; } //Constructor spatialJoinLI::spatialJoinLI(Word _streamR, Word _streamS, pair _joinAttr, double _resize, size_t _maxMem, ListExpr resultType) : streamR(_streamR), streamS(_streamS), joinAttr(_joinAttr), resize(_resize), maxMem(_maxMem) { resultTupleType = new TupleType(nl->Second(resultType)); resultTupleType->IncReference(); vector bufferR = {}; coreNo = MThreadedSingleton::getCoresToUse(); coreNoWorker = pow(coreNo - 1, 2); bboxsample = BBOXSAMPLESTEPS * coreNoWorker; tupleBuffer = make_shared(maxMem / 10, resultTupleType); if (resize == 0.0) { calcBbox = make_shared>( [](Tuple* t, size_t attr) { Rect rect = ((StandardSpatialAttribute<2>*) t->GetAttribute(attr))->BoundingBox(); return rect; }); } else { calcBbox = make_shared>( [=](Tuple* t, size_t attr) { Rect rect = ((StandardSpatialAttribute<2>*) t->GetAttribute(attr))->BoundingBox(); return rect.Extend(resize); }); } streamR.open(); streamS.open(); irrGrid2d = nullptr; Scheduler(); } //Destructor spatialJoinLI::~spatialJoinLI() { for (CellInfo* info : cellInfoVec) { delete info; } cellInfoVec.clear(); delete irrGrid2d; resultTupleType->DeleteIfAllowed(); } //Output Tuple* spatialJoinLI::getNext() { Tuple* res; res = tupleBuffer->dequeue(); if (res != nullptr) { return res; } // neccecary to secure following operators working well this_thread::sleep_for(std::chrono::nanoseconds(100)); return 0; } void spatialJoinLI::Scheduler() { // build bbox // Stream R globalMem = 9 * maxMem / 10; workersDone = false; Tuple* tupleR = streamR.request(); Tuple* tupleS = streamS.request(); if (tupleR != nullptr && tupleS != nullptr) { ttR = tupleR->GetTupleType(); ttR->IncReference(); ttS = tupleS->GetTupleType(); ttS->IncReference(); MultiBuffer bufferR = MultiBuffer(ttR, maxMem); vector> bboxRSample; Rect bboxSpan = (*calcBbox)(tupleR, joinAttr.first); mt19937 rng(chrono::steady_clock::now().time_since_epoch().count()); size_t count = 0; do { size_t replaceNo = uniform_int_distribution(0, count)(rng); if (bboxRSample.size() < bboxsample) { Rect bbox = (*calcBbox)(tupleR, joinAttr.first); bboxRSample.push_back(bbox); double minB[] = {min(bboxSpan.MinD(0), bbox.MinD(0)), min(bboxSpan.MinD(1), bbox.MinD(1))}; double maxB[] = {max(bboxSpan.MaxD(0), bbox.MaxD(0)), max(bboxSpan.MaxD(1), bbox.MaxD(1))}; bboxSpan.Set(true, minB, maxB); } else if (replaceNo < bboxsample) { Rect bbox = (*calcBbox)(tupleR, joinAttr.first); bboxRSample[replaceNo] = bbox; double minB[] = {min(bboxSpan.MinD(0), bbox.MinD(0)), min(bboxSpan.MinD(1), bbox.MinD(1))}; double maxB[] = {max(bboxSpan.MaxD(0), bbox.MaxD(0)), max(bboxSpan.MaxD(1), bbox.MaxD(1))}; bboxSpan.Set(true, minB, maxB); } bufferR.appendTuple(tupleR); count++; if (count % CHANGEBOXSAMPLESTEP == 0) { bboxsample += BBOXSAMPLESTEPS; } } while ((tupleR = streamR.request())); streamR.close(); bufferR.closeWrite(); // not enough MBBs for cores if (bboxRSample.size() < coreNoWorker) { coreNoWorker = bboxRSample.size(); irrGrid2d = new IrregularGrid2D(bboxSpan, 1, coreNoWorker); irrGrid2d->SetVector(&bboxRSample, bboxSpan, 1, coreNoWorker); cellInfoVec = IrregularGrid2D::getCellInfoVector(irrGrid2d); } else { irrGrid2d = new IrregularGrid2D(bboxSpan, sqrt(coreNoWorker), sqrt(coreNoWorker)); irrGrid2d->SetVector(&bboxRSample, bboxSpan, sqrt(coreNoWorker), sqrt(coreNoWorker)); cellInfoVec = IrregularGrid2D::getCellInfoVector(irrGrid2d); } //resize cells at margin of grid to min/max const size_t gridSize = cellInfoVec.size(); const size_t edgeSize = sqrt(coreNoWorker); for (CellInfo* cellInfo : cellInfoVec) { double minB[] = {(cellInfo->cell)->MinD(0), (cellInfo->cell)->MinD(1)}; double maxB[] = {(cellInfo->cell)->MaxD(0), (cellInfo->cell)->MaxD(1)}; if ((size_t) cellInfo->cellId <= edgeSize) { minB[1] = -DBL_MAX; } if ((size_t) cellInfo->cellId > gridSize - edgeSize) { maxB[1] = DBL_MAX; } if ((size_t) cellInfo->cellId % edgeSize == 1) { minB[0] = -DBL_MAX; } if ((size_t) cellInfo->cellId % edgeSize == 0) { maxB[0] = DBL_MAX; } (cellInfo->cell)->Set(true, minB, maxB); } // start threads threadsDone = coreNoWorker; for (size_t i = 0; i < coreNoWorker; ++i) { partBufferR.emplace_back(make_shared( 2 * (globalMem / coreNoWorker) / 16, ttR)); partBufferS.emplace_back(make_shared( (globalMem / coreNoWorker) / 16, ttS)); joinThreads.emplace_back( CandidateWorker(13 * (globalMem / coreNoWorker) / 16, coreNoWorker, i, tupleBuffer, partBufferR.back(), partBufferS.back(), joinAttr, resize, resultTupleType, cellInfoVec[i]->cell)); } // Stream R bufferR.openRead(); Tuple* tuple = bufferR.readTuple(); while (tuple != nullptr) { Rect bbox = (*calcBbox)(tuple, joinAttr.first); for (CellInfo* cellInfo : cellInfoVec) { if ((cellInfo->cell)->Intersects(bbox)) { { std::lock_guard lockT(mutexTupleCounter_); tuple->IncReference(); } partBufferR[(cellInfo->cellId - 1)]->enqueue(tuple); } } { std::lock_guard lockT(mutexTupleCounter_); tuple->DeleteIfAllowed(); } tuple = bufferR.readTuple(); } for (size_t i = 0; i < coreNoWorker; ++i) { partBufferR[i]->enqueue(nullptr); } // grid partitioning S while (tupleS != nullptr) { Rect bbox = ((StandardSpatialAttribute<2>*) tupleS->GetAttribute(joinAttr.second))->BoundingBox(); for (CellInfo* cellInfo : cellInfoVec) { if ((cellInfo->cell)->Intersects(bbox)) { tupleS->IncReference(); partBufferS[(cellInfo->cellId - 1)]->enqueue(tupleS); } } { std::lock_guard lockT(mutexTupleCounter_); tupleS->DeleteIfAllowed(); } tupleS = streamS.request(); } streamS.close(); for (size_t i = 0; i < coreNoWorker; ++i) { partBufferS[i]->enqueue(nullptr); } for (size_t i = 0; i < coreNoWorker; ++i) { joinThreads[i].join(); } } else { tupleBuffer->enqueue(nullptr); } } ListExpr op_spatialJoin::spatialJoinTM(ListExpr args) { // mthreadedSpatialJoin has 4 arguments // 1: StreamR of Tuple with spatial Attr // 2: StreamS of Tuple with spatial Attr // 3: Attr R // 4: Attr S // 5: resize bbox by real if (MThreadedSingleton::getCoresToUse() < 3) { return listutils::typeError(" only works with >= 3 threads "); } string err = "stream(tuple) x stream(tuple) x attr1 x " "attr2 x real expected"; if (!nl->HasLength(args, 5)) { return listutils::typeError(err); } ListExpr stream1 = nl->First(args); ListExpr stream2 = nl->Second(args); ListExpr attr1 = nl->Third(args); ListExpr attr2 = nl->Fourth(args); ListExpr resize = nl->Fifth(args); if (!Stream::checkType(stream1)) { return listutils::typeError (err + " (first arg is not a tuple stream)"); } if (!Stream::checkType(stream2)) { return listutils::typeError (err + " (second arg is not a tuple stream)"); } if (!listutils::isSymbol(attr1)) { return listutils::typeError (err + " (first attrname is not valid)"); } if (!listutils::isSymbol(attr2)) { return listutils::typeError (err + " (second attrname is not valid)"); } if (!CcReal::checkType(resize)) { return listutils::typeError (err + " (last argument has to be real)"); } ListExpr attrList1 = nl->Second(nl->Second(stream1)); ListExpr attrList2 = nl->Second(nl->Second(stream2)); string attrname1 = nl->SymbolValue(attr1); string attrname2 = nl->SymbolValue(attr2); ListExpr attrType1; ListExpr attrType2; int index1 = listutils::findAttribute(attrList1, attrname1, attrType1); if (index1 == 0) { return listutils::typeError(attrname1 + " is not an attribute of the first stream"); } int index2 = listutils::findAttribute(attrList2, attrname2, attrType2); if (index2 == 0) { return listutils::typeError(attrname1 + " is not an attribute of the second stream"); } if (!listutils::isSpatialType(attrType1)) { return listutils::typeError(" first attribute not spatial "); } if (!listutils::isSpatialType(attrType2)) { return listutils::typeError(" second attribute not spatial "); } ListExpr resAttrList = listutils::concat(attrList1, attrList2); if (!listutils::isAttrList(resAttrList)) { return listutils::typeError ("Name conflicts in attributes found"); } ListExpr indexList = nl->TwoElemList( nl->IntAtom(index1 - 1), nl->IntAtom(index2 - 1)); //cout << nl->ToString(indexList) << endl; return nl->ThreeElemList(nl->SymbolAtom(Symbols::APPEND()), indexList, nl->TwoElemList( nl->SymbolAtom(Stream::BasicType()), nl->TwoElemList( nl->SymbolAtom(Tuple::BasicType()), resAttrList))); } // Operator SpatialJoin // args[0] : streamR // args[1] : streamS // args[5] : index of join attribute R // args[6] : index of join attribute S // args[4] : resize bbox int op_spatialJoin::spatialJoinVM(Word* args, Word &result, int message, Word &local, Supplier s) { std::pair attr = make_pair( (static_cast(args[5].addr))->GetIntval(), (static_cast(args[6].addr))->GetIntval()); double resize = (static_cast(args[4].addr))->GetRealval(); // create result type ListExpr resultType = qp->GetNumType(s); spatialJoinLI* li = (spatialJoinLI*) local.addr; switch (message) { case OPEN : if (li) { delete li; } local.addr = new spatialJoinLI(args[0], args[1], attr, resize, qp->GetMemorySize(s) * 1024 * 1024, resultType); return 0; case REQUEST: result.addr = li ? li->getNext() : 0; return result.addr ? YIELD : CANCEL; case CLOSE: if (li) { delete li; local.addr = 0; } this_thread::sleep_for(std::chrono::microseconds(100)); return 0; } return 0; } std::string op_spatialJoin::getOperatorSpec() { return OperatorSpec( " stream x stream x attr x attr x real -> stream", " streamR streamS mThreadedSpatialJoin(attr1, attr2, real)", " spatial join using >2 cores", " R feed {o} S feed {p} mThreadedSpatialJoin[X_o, Y_p, 0.0]" ).getStr(); } shared_ptr op_spatialJoin::getOperator() { return std::make_shared("mThreadedSpatialJoin", getOperatorSpec(), &op_spatialJoin::spatialJoinVM, Operator::SimpleSelect, &op_spatialJoin::spatialJoinTM); }