/* ---- This file is part of SECONDO. Copyright (C) 2019, Faculty of Mathematics and Computer Science, Database Systems for New Applications. SECONDO is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. SECONDO is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with SECONDO; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ---- //[<] [\ensuremath{<}] //[>] [\ensuremath{>}] \setcounter{tocdepth}{3} \tableofcontents 1 Implementation of the multithread HybridJoin Operator These Operators are: * mthreadedHybridJoin */ #include "opHybridHashJoin.h" #include "Attribute.h" // implementation of attribute types #include "NestedList.h" // required at many places #include "QueryProcessor.h" // needed for implementing value mappings #include "Operator.h" // for operator creation #include "StandardTypes.h" // provides int, real, string, bool type #include "Symbols.h" // predefined strings #include "ListUtils.h" // useful functions for nested lists #include "LogMsg.h" // send error messages #include #include using namespace mthreaded; using namespace std; extern NestedList* nl; extern QueryProcessor* qp; namespace hashJoinGlobal { size_t threadsDone; condition_variable workers; mutex workersDone_; bool workersDone; } using namespace hashJoinGlobal; HashTablePersist::HashTablePersist(const size_t _bucketsNo, const size_t _coreNoWorker, const size_t _maxMem, TupleType* _ttR, TupleType* _ttS, const pair _joinAttr) : bucketsNo(_bucketsNo), coreNoWorker(_coreNoWorker), maxMem(_maxMem), ttR(_ttR), ttS(_ttS), joinAttr(_joinAttr) { hashBucketsR.reserve(bucketsNo - 1); hashBucketsS.reserve(bucketsNo - 1); for (size_t i = 0; i < bucketsNo - 1; ++i) { hashBucketsR.push_back(make_shared(ttR)); hashBucketsS.push_back(make_shared(ttS)); sizeR.push_back(0); sizeS.push_back(0); hashBucketsOverflowS.push_back(make_shared(ttS)); } freeMem = maxMem; setSPersist = false; lastMemBufferR = bucketsNo - 2; lastMemBufferS = bucketsNo - 2; } HashTablePersist::~HashTablePersist() { hashBucketsR.clear(); hashBucketsS.clear(); hashBucketsOverflowS.clear(); } void HashTablePersist::PushR(Tuple* tuple, const size_t bucket) { size_t size = tuple->GetMemSize() + sizeof(void*); if (bucket <= lastMemBufferR) { freeMem -= size; } sizeR[bucket] += size; hashBucketsR[bucket]->appendTuple(tuple); // no memory if (freeMem > maxMem && lastMemBufferR <= bucket && lastMemBufferR < bucketsNo) { if (!setSPersist) { for (size_t i = 0; i < bucketsNo - 1; ++i) { hashBucketsS[i] = make_shared(ttS); } setSPersist = true; } shared_ptr tempFileBuffer = make_shared(ttR); Tuple* tupleNext = hashBucketsR[lastMemBufferR]->readTuple(); while (tupleNext != nullptr) { tempFileBuffer->appendTuple(tupleNext); tupleNext = hashBucketsR[lastMemBufferR]->readTuple(); } hashBucketsR[lastMemBufferR] = move(tempFileBuffer); freeMem += sizeR[lastMemBufferR]; --lastMemBufferR; } } void HashTablePersist::PushS(Tuple* tuple, const size_t bucket) { size_t size = tuple->GetMemSize() + sizeof(void*); if (!setSPersist && bucket <= lastMemBufferS) { freeMem -= size; } size_t partNo = tuple->HashValue(joinAttr.second) / coreNoWorker / bucketsNo % hashMod; if (partNo >= overflowBucketNo[bucket]) { // save in overflow hashBucketsOverflowS[bucket]->appendTuple(tuple); } else { sizeS[bucket] += size; hashBucketsS[bucket]->appendTuple(tuple); if (!setSPersist && freeMem > maxMem && lastMemBufferS < bucketsNo) { shared_ptr tempFileBuffer = make_shared(ttS); Tuple* tupleNext = hashBucketsS[lastMemBufferS]->readTuple(); while (tupleNext != nullptr) { tempFileBuffer->appendTuple(tupleNext); tupleNext = hashBucketsS[lastMemBufferS]->readTuple(); } hashBucketsS[lastMemBufferS] = move(tempFileBuffer); freeMem += sizeS[lastMemBufferS]; --lastMemBufferS; } } } Tuple* HashTablePersist::PullR(const size_t bucket) const { Tuple* tuple = hashBucketsR[bucket]->readTuple(); return tuple; } Tuple* HashTablePersist::PullS(const size_t bucket) const { Tuple* tuple = hashBucketsS[bucket]->readTuple(); return tuple; } void HashTablePersist::UseMemHashTable(size_t usedMem) { freeMem -= usedMem; } void HashTablePersist::SetHashMod(size_t hashMod) { this->hashMod = hashMod; } void HashTablePersist::CalcOverflow() { for (size_t i = 0; i < bucketsNo - 1; ++i) { if (sizeR[i] != 0) { overflowBucketNo.emplace_back( (size_t) floor(maxMem * 0.8) / floor(sizeR[i] / hashMod)); } else { overflowBucketNo.emplace_back(0); } } } shared_ptr HashTablePersist::GetOverflowS(const size_t bucket) const { return hashBucketsOverflowS[bucket]; } size_t HashTablePersist::GetOverflowBucketNo(const size_t bucket) const { return overflowBucketNo[bucket]; } void HashTablePersist::CloseWrite() { for (size_t i = 0; i < bucketsNo - 1; ++i) { hashBucketsR[i]->closeWrite(); hashBucketsS[i]->closeWrite(); hashBucketsOverflowS[i]->closeWrite(); } } size_t HashTablePersist::OpenRead(size_t bucket) { hashBucketsR[bucket]->openRead(); hashBucketsS[bucket]->openRead(); return overflowBucketNo[bucket]; } // Function object for threads HashJoinWorker::HashJoinWorker(size_t _maxMem, size_t _coreNoWorker, size_t _streamInNo, shared_ptr _tupleBuffer, shared_ptr> _partBufferR, shared_ptr> _partBufferS, pair _joinAttr, TupleType* _resultTupleType, TupleType* _ttS) : maxMem(_maxMem), coreNoWorker(_coreNoWorker), streamInNo(_streamInNo), tupleBuffer(_tupleBuffer), partBufferR(_partBufferR), partBufferS(_partBufferS), joinAttr(_joinAttr), resultTupleType(_resultTupleType), ttS(_ttS) { } HashJoinWorker::~HashJoinWorker() { } void HashJoinWorker::operator()() { // phase 1: size_t countOverflow; size_t hashMod = phase1(countOverflow); if (hashMod != 0) { // recursive overflow bucket 0 if (!overflowBufferR->empty()) { recursiveOverflow(overflowBufferR, overflowBufferS, countOverflow, 1); } // phase 2: for (size_t pos = 0; pos < bucketNo - 1; ++pos) { // non-overflow countOverflow = phase2(hashMod, pos); // recursive overflow buckets >0 if (!overflowBufferR->empty()) { recursiveOverflow(overflowBufferR, hashTablePersist->GetOverflowS(pos), countOverflow, 1); } } ttR->DeleteIfAllowed(); } --threadsDone; if (threadsDone == 0) { tupleBuffer->enqueue(nullptr); lock_guard lock(workersDone_); workersDone = true; workers.notify_all(); } else { std::unique_lock lock(workersDone_); workers.wait(lock, [&] { return workersDone; }); } } size_t HashJoinWorker::phase1(size_t &countOverflow) { vector> bucketInMemR(bucketsInMem1st); size_t memR = maxMem; Tuple* tupleNextR; tupleNextR = partBufferR->dequeue(); if (tupleNextR == nullptr) { return 0; } ttR = tupleNextR->GetTupleType(); ttR->IncReference(); hashTablePersist = make_shared(bucketNo, coreNoWorker, maxMem, ttR, ttS, joinAttr); size_t count = 0; size_t inMemBucketNo = bucketsInMem1st; overflowBufferR = make_shared(ttR); overflowBufferS = make_shared(ttS); size_t memROverflow = 0; countOverflow = 0; while (tupleNextR != nullptr) { size_t partNo = (tupleNextR->HashValue(joinAttr.first) / coreNoWorker) % bucketNo; if (partNo == 0) { size_t memRTupleNextR = tupleNextR->GetMemSize() + sizeof(void*); partNo = (tupleNextR->HashValue(joinAttr.first) / coreNoWorker / bucketNo) % bucketsInMem1st; if (partNo < inMemBucketNo) { memR -= memRTupleNextR; hashTablePersist->UseMemHashTable(memRTupleNextR); bucketInMemR[partNo].push_back(tupleNextR); } else { memROverflow += memRTupleNextR; ++countOverflow; overflowBufferR->appendTuple(tupleNextR); } // overflow if (memR > maxMem) { size_t memTransfered = 0; for (size_t i = inMemBucketNo / 2; i < inMemBucketNo; ++i) { for (Tuple* tupleTransfer : bucketInMemR[i]) { memTransfered += tupleTransfer->GetMemSize() + sizeof(void*); ++countOverflow; overflowBufferR->appendTuple(tupleTransfer); } } inMemBucketNo /= 2; if (inMemBucketNo == 0) { cout << "No bucket of the hash table fits in memory." << endl; cout << "Stop calculating this hash table." << endl; cout << "!!outcoming tuple stream incomplete!!" << endl; break; } memR -= memTransfered; memROverflow += memTransfered; hashTablePersist->UseMemHashTable(-memTransfered); } } else { hashTablePersist->PushR(tupleNextR, partNo - 1); } tupleNextR = partBufferR->dequeue(); ++count; } partBufferR.reset(); size_t hashMod = count / bucketNo; if (hashMod == 0) hashMod = 1; hashTablePersist->SetHashMod(hashMod); hashTablePersist->CalcOverflow(); Tuple* tupleNextS; tupleNextS = partBufferS->dequeue(); if (tupleNextS == nullptr) { return 0; } count = 0; while (tupleNextS != nullptr) { size_t partNo = (tupleNextS->HashValue(joinAttr.second) / coreNoWorker) % bucketNo; if (partNo == 0) { // join? partNo = (tupleNextS->HashValue(joinAttr.second) / coreNoWorker / bucketNo) % bucketsInMem1st; if (partNo < inMemBucketNo) { if (!bucketInMemR[partNo].empty()) { for (auto tupleR : bucketInMemR[partNo]) { if (tupleEqual(tupleR, tupleNextS)) { auto* result = new Tuple(resultTupleType); { Concat(tupleR, tupleNextS, result); } tupleBuffer->enqueue(result); } } } tupleNextS->DeleteIfAllowed(); } else { overflowBufferS->appendTuple(tupleNextS); } } else { hashTablePersist->PushS(tupleNextS, partNo - 1); } ++count; tupleNextS = partBufferS->dequeue(); } partBufferS.reset(); for (size_t i = 0; i < inMemBucketNo; ++i) { for (auto* tupleR : bucketInMemR[i]) { tupleR->DeleteIfAllowed(); } } bucketInMemR.clear(); hashTablePersist->CloseWrite(); overflowBufferR->closeWrite(); overflowBufferS->closeWrite(); return hashMod; } size_t HashJoinWorker::phase2(size_t hashMod, size_t pos) { vector> bucketInMemR(hashMod); size_t memR = maxMem; size_t overflow = hashTablePersist->OpenRead(pos); Tuple* tupleR = hashTablePersist->PullR(pos); size_t countOverflow = 0; while (tupleR != nullptr) { size_t partNo = (tupleR->HashValue(joinAttr.first) / coreNoWorker / bucketNo) % hashMod; if (partNo < overflow) { size_t memRTupleNextR = tupleR->GetMemSize() + sizeof(void*); memR -= memRTupleNextR; bucketInMemR[partNo].emplace_back(tupleR); } else { overflowBufferR->appendTuple(tupleR); ++countOverflow; } while (memR > maxMem) { for (Tuple* tupleTransfer : bucketInMemR[overflow - 1]) { memR += tupleTransfer->GetMemSize() + sizeof(void*); overflowBufferR->appendTuple(tupleTransfer); } --overflow; } tupleR = hashTablePersist->PullR(pos); } Tuple* tupleS = hashTablePersist->PullS(pos); while (tupleS != nullptr) { size_t partNo = (tupleS->HashValue(joinAttr.second) / coreNoWorker / bucketNo) % hashMod; if (!bucketInMemR[partNo].empty()) { for (auto tupleR : bucketInMemR[partNo]) { if (tupleEqual(tupleR, tupleS)) { auto* result = new Tuple(resultTupleType); { Concat(tupleR, tupleS, result); } tupleBuffer->enqueue(result); } } } if (tupleS->GetNumOfRefs() == 2) { tupleS->DeleteIfAllowed(); } tupleS->DeleteIfAllowed(); tupleS = hashTablePersist->PullS(pos); } for (size_t i = 0; i < hashMod; ++i) { for (auto* tuple : bucketInMemR[i]) { if (tuple->GetNumOfRefs() == 2) { tuple->DeleteIfAllowed(); } tuple->DeleteIfAllowed(); } } bucketInMemR.clear(); return countOverflow; } void HashJoinWorker::recursiveOverflow(shared_ptr overflowR, shared_ptr overflowS, size_t sizeR, size_t xMod) { // phase 1 size_t inMemBucketNo = sizeR / bucketNo; if (inMemBucketNo == 0) { inMemBucketNo = 1; } size_t const hashMod = inMemBucketNo; vector> bucketInMemR(inMemBucketNo); size_t memR = maxMem; ++xMod; overflowR->openRead(); Tuple* tupleNextR = overflowR->readTuple(); overflowS->openRead(); Tuple* tupleNextS = overflowS->readTuple(); shared_ptr recursiveHashTablePersist = make_shared( bucketNo, coreNoWorker, maxMem, ttR, ttS, joinAttr); shared_ptr recursiveOverflowBufferR = make_shared( ttR); shared_ptr recursiveOverflowBufferS = make_shared( ttS); size_t memROverflow = 0; size_t countOverflow = 0; while (tupleNextR != nullptr) { size_t partNo = tupleNextR->HashValue(joinAttr.first) / coreNoWorker; for (size_t i = 0; i < xMod; ++i) { partNo /= bucketNo; } size_t partNoBucket = partNo % bucketNo; if (partNoBucket == 0) { size_t memRTupleNextR = tupleNextR->GetMemSize() + sizeof(void*); partNo = partNo / bucketNo % hashMod; if (partNo < inMemBucketNo) { memR -= memRTupleNextR; recursiveHashTablePersist->UseMemHashTable(memRTupleNextR); bucketInMemR[partNo].push_back(tupleNextR); } else { memROverflow += memRTupleNextR; ++countOverflow; recursiveOverflowBufferR->appendTuple(tupleNextR); } // overflow if (memR > maxMem) { size_t memTransfered = 0; for (size_t i = inMemBucketNo / 2; i < inMemBucketNo; ++i) { for (Tuple* tupleTransfer : bucketInMemR[i]) { ++countOverflow; memTransfered += tupleTransfer->GetMemSize() + sizeof(void*); recursiveOverflowBufferR->appendTuple(tupleTransfer); } } inMemBucketNo /= 2; if (inMemBucketNo == 0) { cout << "Phase1" << endl; cout << "No bucket of the hash table fits in memory." << endl; cout << "Stop calculating this hash table." << endl; cout << "!!outcoming tuple stream incomplete!!" << endl; break; } memR -= memTransfered; memROverflow += memTransfered; recursiveHashTablePersist->UseMemHashTable(-memTransfered); } } else { recursiveHashTablePersist->PushR(tupleNextR, partNoBucket - 1); } tupleNextR = overflowR->readTuple(); } recursiveHashTablePersist->SetHashMod(hashMod); recursiveHashTablePersist->CalcOverflow(); while (tupleNextS != nullptr) { size_t partNo = tupleNextS->HashValue(joinAttr.second) / coreNoWorker; for (size_t i = 0; i < xMod; ++i) { partNo /= bucketNo; } size_t partNoBucket = partNo % bucketNo; if (partNoBucket == 0) { // join? partNo = partNo / bucketNo % hashMod; if (partNo < inMemBucketNo) { if (!bucketInMemR[partNo].empty()) { for (auto tupleR : bucketInMemR[partNo]) { if (tupleEqual(tupleR, tupleNextS)) { auto* result = new Tuple(resultTupleType); { Concat(tupleR, tupleNextS, result); } tupleBuffer->enqueue(result); } } } tupleNextS->DeleteIfAllowed(); } else { recursiveOverflowBufferS->appendTuple(tupleNextS); } } else { recursiveHashTablePersist->PushS(tupleNextS, partNoBucket - 1); } tupleNextS = overflowS->readTuple(); } for (size_t i = 0; i < inMemBucketNo; ++i) { for (auto* tuple : bucketInMemR[i]) { tuple->DeleteIfAllowed(); } } bucketInMemR.clear(); recursiveHashTablePersist->CloseWrite(); recursiveOverflowBufferR->closeWrite(); recursiveOverflowBufferS->closeWrite(); // R0 recursive if (!recursiveOverflowBufferR->empty()) { recursiveOverflow(recursiveOverflowBufferR, recursiveOverflowBufferS, countOverflow, xMod); } // phase 2 for (size_t pos = 0; pos < bucketNo - 1; ++pos) { recursiveOverflowBufferR = make_shared( ttR); memR = maxMem; vector> bucketInMemR(inMemBucketNo); size_t overflow = recursiveHashTablePersist->OpenRead(pos); Tuple* tupleR = recursiveHashTablePersist->PullR(pos); countOverflow = 0; while (tupleR != nullptr) { size_t partNo = tupleR->HashValue(joinAttr.first) / coreNoWorker; for (size_t i = 0; i < xMod; ++i) { partNo /= bucketNo; } partNo %= inMemBucketNo; if (partNo < overflow) { size_t memRTupleNextR = tupleR->GetMemSize() + sizeof(void*); memR -= memRTupleNextR; bucketInMemR[partNo].emplace_back(tupleR); } else { recursiveOverflowBufferR->appendTuple(tupleR); ++countOverflow; } while (memR > maxMem) { for (Tuple* tupleTransfer : bucketInMemR[overflow - 1]) { memR += tupleTransfer->GetMemSize() + sizeof(void*); recursiveOverflowBufferR->appendTuple(tupleTransfer); } --overflow; } tupleR = recursiveHashTablePersist->PullR(pos); } Tuple* tupleS = recursiveHashTablePersist->PullS(pos); while (tupleS != nullptr) { size_t partNo = tupleS->HashValue(joinAttr.second) / coreNoWorker; for (size_t i = 0; i < xMod; ++i) { partNo /= bucketNo; } partNo %= inMemBucketNo; if (!bucketInMemR[partNo].empty()) { for (auto tupleR : bucketInMemR[partNo]) { if (tupleEqual(tupleR, tupleS)) { auto* result = new Tuple(resultTupleType); { Concat(tupleR, tupleS, result); } tupleBuffer->enqueue(result); } } } tupleS->DeleteIfAllowed(); tupleS = recursiveHashTablePersist->PullS(pos); } for (size_t i = 0; i < inMemBucketNo; ++i) { for (auto* tuple : bucketInMemR[i]) { tuple->DeleteIfAllowed(); } } bucketInMemR.clear(); if (!recursiveOverflowBufferR->empty()) { recursiveOverflow(recursiveOverflowBufferR, recursiveHashTablePersist->GetOverflowS(pos), countOverflow, xMod); } } } bool HashJoinWorker::tupleEqual(Tuple* a, Tuple* b) const { const auto* aAttr = (const Attribute*) a->GetAttribute(joinAttr.first); const auto* bAttr = (const Attribute*) b->GetAttribute(joinAttr.second); return aAttr->Compare(bAttr) == 0; } //methods of the Local Info Class hybridHashJoinLI::hybridHashJoinLI( Word _streamR, Word _streamS, const pair _joinAttr, const size_t _maxMem, ListExpr resultType) : streamR(_streamR), streamS(_streamS), joinAttr(_joinAttr), maxMem(_maxMem), coreNo(MThreadedSingleton::getCoresToUse()), coreNoWorker(coreNo - 1) { resultTupleType = new TupleType(nl->Second(resultType)); resultTupleType->IncReference(); streamR.open(); streamS.open(); Scheduler(); } hybridHashJoinLI::~hybridHashJoinLI() { partBufferR.clear(); partBufferS.clear(); joinThreads.clear(); resultTupleType->DeleteIfAllowed(); streamR.close(); streamS.close(); } Tuple* hybridHashJoinLI::getNext() { Tuple* res; res = tupleBuffer->dequeue(); if (res != nullptr) { if (res->GetNumOfRefs() != 1) { res->DeleteIfAllowed(); } return res; } return 0; } void hybridHashJoinLI::Scheduler() { tupleBuffer = make_shared(maxMem / coreNoWorker, resultTupleType); Tuple* tupleR = streamR.request(); Tuple* tupleS = streamS.request(); if (tupleS != nullptr && tupleR != nullptr) { TupleType* ttS = tupleS->GetTupleType(); ttS->IncReference(); // start threads joinThreads.reserve(coreNoWorker); partBufferR.reserve(coreNoWorker); partBufferS.reserve(coreNoWorker); threadsDone = coreNoWorker; for (size_t i = 0; i < coreNoWorker; ++i) { partBufferR.push_back( make_shared>(i)); partBufferS.push_back( make_shared>(i)); resultTupleType->IncReference(); joinThreads.emplace_back( HashJoinWorker(maxMem / coreNoWorker, coreNoWorker, i, tupleBuffer, partBufferR.back(), partBufferS.back(), joinAttr, resultTupleType, ttS)); } // Stream R do { size_t partNo = tupleR->HashValue(joinAttr.first) % coreNoWorker; partBufferR[partNo]->enqueue(tupleR); } while ((tupleR = streamR.request())); for (size_t i = 0; i < coreNoWorker; ++i) { partBufferR[i]->enqueue(nullptr); } do { size_t partNo = tupleS->HashValue(joinAttr.second) % coreNoWorker; partBufferS[partNo]->enqueue(tupleS); } while ((tupleS = streamS.request())); for (size_t i = 0; i < coreNoWorker; ++i) { partBufferS[i]->enqueue(nullptr); } for (size_t i = 0; i < coreNoWorker; ++i) { joinThreads[i].join(); } ttS->DeleteIfAllowed(); } else { tupleBuffer->enqueue(nullptr); } } /* 1.1 The MThreaded mthreadedMergeSort Operator */ ListExpr op_hybridHashJoin::hybridHashJoinTM(ListExpr args) { // mthreadedHybridJoin has 4 arguments // 1: StreamR of Tuple // 2: StreamS of Tuple // 3: EquiAttributeR // 4: EquiAttributeR if (MThreadedSingleton::getCoresToUse() < 3) { return listutils::typeError(" only works with >= 3 threads "); } string err = "stream(tuple) x stream(tuple) x attr1 x " "attr2 expected"; if (!nl->HasLength(args, 4) ) { return listutils::typeError(err); } ListExpr stream1 = nl->First(args); ListExpr stream2 = nl->Second(args); ListExpr attr1 = nl->Third(args); ListExpr attr2 = nl->Fourth(args); if (!Stream::checkType(stream1)) { return listutils::typeError(err + " (first arg is not a tuple stream)"); } if (!Stream::checkType(stream2)) { return listutils::typeError(err + " (second arg is not a tuple stream)"); } if (!listutils::isSymbol(attr1)) { return listutils::typeError(err + "(first attrname is not valid)"); } if (!listutils::isSymbol(attr2)) { return listutils::typeError(err + "(second attrname is not valid)"); } ListExpr attrList1 = nl->Second(nl->Second(stream1)); ListExpr attrList2 = nl->Second(nl->Second(stream2)); string attrname1 = nl->SymbolValue(attr1); string attrname2 = nl->SymbolValue(attr2); ListExpr attrType1; ListExpr attrType2; int index1 = listutils::findAttribute(attrList1, attrname1, attrType1); if (index1 == 0) { return listutils::typeError(attrname1 + " is not an attribute of the first stream"); } int index2 = listutils::findAttribute(attrList2, attrname2, attrType2); if (index2 == 0) { return listutils::typeError(attrname2 + " is not an attribute of the second stream"); } if (!nl->Equal(attrType1, attrType2)) { return listutils::typeError("types of the selected attributes differ"); } ListExpr resAttrList = listutils::concat(attrList1, attrList2); if (!listutils::isAttrList(resAttrList)) { return listutils::typeError("Name conflicts in attributes found"); } ListExpr indexList = nl->TwoElemList( nl->IntAtom(index1 - 1), nl->IntAtom(index2 - 1)); return nl->ThreeElemList(nl->SymbolAtom(Symbols::APPEND()), indexList, nl->TwoElemList( nl->SymbolAtom(Stream::BasicType()), nl->TwoElemList( nl->SymbolAtom(Tuple::BasicType()), resAttrList))); } // Operator sort2 (firstArg = 1, param = false) // args[0] : streamR // args[1] : streamS // args[2] : index of join attribute R // args[3] : index of join attribute S int op_hybridHashJoin::hybridHashJoinVM(Word* args, Word &result, int message, Word &local, Supplier s) { //read append structure //(attribute number, sort direction) std::pair attr; CcInt* attrR = static_cast(args[4].addr); CcInt* attrS = static_cast(args[5].addr); attr = make_pair(attrR->GetIntval(), attrS->GetIntval()); // create result type ListExpr resultType = qp->GetNumType(s); hybridHashJoinLI* li = (hybridHashJoinLI*) local.addr; switch (message) { case OPEN : if (li) { delete li; } local.addr = new hybridHashJoinLI(args[0], args[1], attr, qp->GetMemorySize(s) * 1024 * 1024, resultType); return 0; case REQUEST: result.addr = li ? li->getNext() : 0; return result.addr ? YIELD : CANCEL; case CLOSE: if (li) { delete li; local.addr = 0; } usleep(100); return 0; } return 0; } std::string op_hybridHashJoin::getOperatorSpec() { return OperatorSpec( " stream x stream x attr x attr) -> stream", " streamR streamS mThreadedHybridJoin(attrR attrS)", " hybrid hash join using >2 cores", " R feed {o} S feed {p} mThreadedHybridJoin[X_o, Y_p]" ).getStr(); } std::shared_ptr op_hybridHashJoin::getOperator() { return std::make_shared("mThreadedHybridJoin", getOperatorSpec(), &op_hybridHashJoin::hybridHashJoinVM, Operator::SimpleSelect, &op_hybridHashJoin::hybridHashJoinTM); }