/* ---- This file is part of SECONDO. Copyright (C) 2009, University in Hagen, Faculty of Mathematics and Computer Science, Database Systems for New Applications. SECONDO is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. SECONDO is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with SECONDO; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ---- 1 Implementation File HybridHashJoin.cpp June 2009, Sven Jungnickel. Initial version 2 Includes and defines */ #include #include #include "stdlib.h" #include "LogMsg.h" #include "QueryProcessor.h" #include "StandardTypes.h" #include "RTuple.h" #include "GraceHashJoin.h" /* 3 External linking */ extern QueryProcessor* qp; using namespace std; /* 4 Auxiliary functions */ namespace extrel2 { /* 5 Implementation of class ~GraceHashJoinProgressLocalInfo~ */ GraceHashJoinProgressLocalInfo::GraceHashJoinProgressLocalInfo( Supplier s ) : ProgressLocalInfo() , maxOperatorMemory((qp->GetMemorySize(s) * 1024 * 1024)) , tuplesProcessedSinceLastResult(0) , traceMode(false) { // Load constants from ProgressConstants uHashJoin = ProgressConstants::getValue("ExtRelation2Algebra", "gracehashjoin", "uHashJoin"); vHashJoin = ProgressConstants::getValue("ExtRelation2Algebra", "gracehashjoin", "vHashJoin"); wHashJoin = ProgressConstants::getValue("ExtRelation2Algebra", "gracehashjoin", "wHashJoin"); t_read = ProgressConstants::getValue("ExtRelation2Algebra", "gracehashjoin", "tread"); t_write = ProgressConstants::getValue("ExtRelation2Algebra", "gracehashjoin", "twrite"); t_probe = ProgressConstants::getValue("ExtRelation2Algebra", "gracehashjoin", "tprobe"); t_hash = ProgressConstants::getValue("ExtRelation2Algebra", "gracehashjoin", "thash"); t_result = ProgressConstants::getValue("ExtRelation2Algebra", "gracehashjoin", "tresult"); } int GraceHashJoinProgressLocalInfo::CalcProgress( ProgressInfo& p1, ProgressInfo& p2, ProgressInfo* pRes, Supplier s ) { // calculate tuple size of join this->SetJoinSizes(p1, p2); // copy sizes to result pRes->CopySizes(this); if ( this->state == 1 ) { if ( traceMode ) { cmsg.info() << "-----------------------------------------------" << endl; cmsg.info() << "calcProgressGrace()" << endl; cmsg.send(); } calcProgressGrace(p1, p2, pRes, s); } else if ( state == 0 ) { if ( traceMode ) { cmsg.info() << "-----------------------------------------------" << endl; cmsg.info() << "calcProgressStd()" << endl; cmsg.send(); } calcProgressStd(p1, p2, pRes, s); } else { return CANCEL; } return YIELD; } void GraceHashJoinProgressLocalInfo::calcProgressStd( ProgressInfo& p1, ProgressInfo& p2, ProgressInfo* pRes, Supplier s ) { double sel; double m = (double)this->returned; double k1 = (double)this->readFirst; double k2 = (double)this->readSecond; // Calculate estimated selectivity if ( m > enoughSuccessesJoin ) { // warm state sel = m / ( k1 * k2 ); } else { // cold state sel = qp->GetSelectivity(s); } // calculate result cardinality pRes->Card = p1.Card * p2.Card * sel; // calculate total time pRes->Time = p1.Time + p2.Time + p2.Card * vHashJoin // reading stream B into hash table + p1.Card * uHashJoin // probing stream A against hash table + pRes->Card // output of result tuples * (p1.noAttrs + p2.noAttrs) * wHashJoin; // calculate total progress pRes->Progress = ( p1.Progress * p1.Time + p2.Progress * p2.Time + this->readSecond * vHashJoin + this->readFirst * uHashJoin + this->returned * wHashJoin * (p1.noAttrs + p2.noAttrs) ) / pRes->Time; // calculate time until first result tuple pRes->BTime = p1.BTime + p2.BTime + p2.Card * vHashJoin; // reading stream B into hash table // calculate blocking progress pRes->BProgress = ( p1.BProgress * p1.BTime + p2.BProgress * p2.BTime + this->readSecond * vHashJoin ) / pRes->BTime; if ( traceMode ) { cmsg.info() << "p1" << endl; PrintProgressInfo(cmsg.info(), p1); cmsg.info() << "p2" << endl; PrintProgressInfo(cmsg.info(), p2); cmsg.info() << "pRes" << endl; PrintProgressInfo(cmsg.info(), *pRes); cmsg.send(); } } void GraceHashJoinProgressLocalInfo::calcProgressGrace( ProgressInfo& p1, ProgressInfo& p2, ProgressInfo* pRes, Supplier s ) { double sel; double m = (double)this->returned; double k1 = (double)this->readFirst; double k2 = (double)this->readSecond; // ------------------------------------------- // Result cardinality // ------------------------------------------- // calculate estimated selectivity if ( m > enoughSuccessesJoin ) { // warm state sel = m / ( k1 * streamB.GetTotalProcessedTuples() ); if ( traceMode ) { cmsg.info() << "WARM state => " << ", m:" << m << ", k1:" << k1 << ", streamB.GetTotalProcessedTuples():" << streamB.GetTotalProcessedTuples() << ", sel: "<< sel << endl; cmsg.send(); } } else { // cold state sel = qp->GetSelectivity(s); if ( traceMode ) { cmsg.info() << "COLD state => " << ", m:" << m << ", k1:" << k1 << ", k2:" << k2 << ", sel: "<< sel << endl; cmsg.send(); } } // calculate result cardinality pRes->Card = p1.Card * p2.Card * sel; // ------------------------------------------- // Total time // ------------------------------------------- // calculate time needed for successors double t1 = p1.Time + p2.Time; if ( traceMode ) { cmsg.info() << "1: t1 => " << t1 << endl; cmsg.send(); } // calculate time for partitioning and processing of stream A double t2 = p1.Card * ( t_probe + t_hash + t_read + t_write ); if ( traceMode ) { cmsg.info() << "2: t2 => " << t2 << endl; cmsg.send(); } double t3 = 0; if ( streamA.IsValid() ) { for (size_t i = 0; i < streamA.partitionProgressInfo.size(); i++) { // cardinality of partition from A size_t cardA = streamA.partitionProgressInfo[i].tuples; // number of passes of the corresponding partition B size_t passesB = streamB.partitionProgressInfo[i].noOfPasses; if ( passesB > 1 ) { t3 += cardA * ( passesB - 1 ) * ( t_probe + t_read ); } } if ( traceMode ) { cmsg.info() << "3: t3 => " << t3 << endl; cmsg.send(); } } // calculate time for partitioning and processing of stream B double t4 = p2.Card * ( t_hash + t_read + t_write ); if ( traceMode ) { cmsg.info() << "4: t4 => " << t4 << endl; cmsg.send(); } // calculate time for sub-partitioning of stream B double t5 = streamB.subTotalTuples * ( t_read + t_write ); if ( traceMode ) { cmsg.info() << "5: t5 => " << t5 << endl; cmsg.send(); } // calculate time to create result tuples double t6 = pRes->Card * t_result; if ( traceMode ) { cmsg.info() << "6: t6 => " << t6 << endl; cmsg.send(); } pRes->Time = t1 + t2 + t3 + t4 + t5 + t6; if ( traceMode ) { cmsg.info() << "7: pRes->Time => " << pRes->Time << endl; cmsg.send(); } // ------------------------------------------- // Total progress // ------------------------------------------- // calculate current progress of successors double prog1 = p1.Progress * p1.Time + p2.Progress * p2.Time; if ( traceMode ) { cmsg.info() << "1: prog1 => " << prog1 << endl; cmsg.send(); } // calculate current progress of stream A double prog2 = k1 * ( t_probe + t_hash + t_read + t_write ); if ( traceMode ) { cmsg.info() << "2: prog2 => " << prog2 << endl; cmsg.send(); } double prog3 = 0; if ( streamA.IsValid() ) { for (size_t i = 0; i < streamA.partitionProgressInfo.size(); i++) { // cardinality of partition from A size_t cardA = streamA.partitionProgressInfo[i].tuples; // number of passes of the corresponding partition of B size_t passesB = streamB.partitionProgressInfo[i].noOfPasses; // number of processed tuples of partition from A size_t procA = streamA.partitionProgressInfo[i].tuplesProc; if ( passesB > 1 && procA > cardA ) { prog3 += ( procA - cardA ) * ( t_probe + t_read ); } } if ( traceMode ) { cmsg.info() << "3: prog3 => " << prog3 << endl; cmsg.send(); } } // calculate current progress of stream B double prog4 = streamB.GetTotalProcessedTuples() * ( t_hash + t_read + t_write ); if ( traceMode ) { cmsg.info() << "4: prog4 => " << prog4 << endl; cmsg.send(); } // calculate current progress for sub-partitioning of stream B double prog5 = streamB.subTuples * ( t_read + t_write ); if ( traceMode ) { cmsg.info() << "5: prog5 => " << prog5 << endl; cmsg.send(); } // calculate time to create result tuples double prog6 = m * t_result; if ( traceMode ) { cmsg.info() << "6: prog6 => " << prog6 << endl; cmsg.send(); } // calculate total progress pRes->Progress = ( prog1 + prog2 + prog3 + prog4 + prog5 + prog6 ) / pRes->Time; if ( traceMode ) { cmsg.info() << "7: pRes->Progress => " << pRes->Progress << endl << "8: streamB.subTotalTuples => " << streamB.subTotalTuples << endl << "9: streamB.subTuples => " << streamB.subTuples << endl; cmsg.send(); } // ------------------------------------------- // Blocking time // ------------------------------------------- // calculate blocking time for successors pRes->BTime = p1.BTime + p2.BTime; // calculate time until stream B is partitioned pRes->BTime += p2.Card * ( t_hash + t_read + t_write ) + streamB.subTotalTuples * ( t_read + t_write ); // ------------------------------------------- // Blocking Progress // ------------------------------------------- // calculate blocking progress for successors pRes->BProgress = p1.BProgress * p1.BTime + p2.BProgress * p2.BTime; // calculate progress of partitioning of stream B pRes->BProgress += k2 * ( t_hash + t_read + t_write ) + streamB.subTuples * ( t_read + t_write ); // calculate blocking progress pRes->BProgress /= pRes->BTime; if ( traceMode ) { cmsg.info() << "p1" << endl; PrintProgressInfo(cmsg.info(), p1); cmsg.info() << "p2" << endl; PrintProgressInfo(cmsg.info(), p2); cmsg.info() << "pRes" << endl; PrintProgressInfo(cmsg.info(), *pRes); cmsg.send(); } return; } ostream& GraceHashJoinProgressLocalInfo::Print(ostream& os) { os << "---------- Progress Information -----------" << endl << "k1: " << this->readFirst << ", k2: " << this->readSecond << ", m: " << this->returned << endl; if ( state == 1) { if ( streamA.IsValid() ) { os << "ProgressInformation - Stream A" << endl; streamA.Print(os); } if ( streamB.IsValid() ) { os << "ProgressInformation - Stream B" << endl; streamB.Print(os); } } return os; } /* 12 Implementation of class ~GraceHashJoinAlgorithm~ */ GraceHashJoinAlgorithm::GraceHashJoinAlgorithm( Word streamA, int indexAttrA, Word streamB, int indexAttrB, size_t buckets, Supplier s, GraceHashJoinProgressLocalInfo* p, size_t partitions, size_t maxMemSize, size_t ioBufferSize ) : streamA(streamA) , streamB(streamB) , attrIndexA(indexAttrA-1) , attrIndexB(indexAttrB-1) , MAX_MEMORY(0) , usedMemory(0) , resultTupleType(0) , tupleTypeA(0) , tupleTypeB(0) , nBuckets(0) , nPartitions(0) , pmA(0) , pmB(0) , tupleA(0) , fitsInMemory(false) , partitioning(false) , curPartition(0) , finishedPartitionB(false) , iterA(0) , hashTable(0) , progress(p) , traceMode(RTFlag::isActive("ERA:TraceGraceHashJoin")) , subpartition(!RTFlag::isActive("ERA:GraceHashJoinNoSubpartitioning")) { Word wTuple(Address(0)); // currently we are in internal mode progress->state = 0; // Set operator's main memory setMemory(maxMemSize, s); // Set I/O buffer size for tuple buffers setIoBuffer(ioBufferSize); // Check number of buckets (must be divisible by two) setBuckets(MAX_MEMORY, buckets); // Check number of partitions setPartitions(partitions); if ( traceMode ) { cmsg.info() << "-------------------- GRACE Hash-Join ------------------" << endl << "Buckets: \t\t\t" << nBuckets << endl << "Partitions: \t\t\t" << nPartitions << endl << "Memory: \t\t\t" << MAX_MEMORY / 1024 << " KByte" << endl << "I/O Buffer: \t\t\t" << PartitionManager::GetIOBufferSize() << " Byte" << endl << "Join attribute index A: \t" << attrIndexA << " (0 based)" << endl << "Join attribute index B: \t" << attrIndexB << " (0 based)" << endl << endl; cmsg.send(); timer.start(); } // create hash function instances HashFunction* hashFuncA = new HashFunction(this->nBuckets, this->attrIndexA); HashFunction* hashFuncB = new HashFunction(this->nBuckets, this->attrIndexB); // create tuple comparison function instance JoinTupleCompareFunction cmp( this->attrIndexA, this->attrIndexB); // create result type ListExpr resultType = qp->GetNumType(s); resultTupleType = new TupleType( nl->Second( resultType ) ); // create hash table hashTable = new HashTable( this->nBuckets, *hashFuncB, *hashFuncA, cmp); // Read tuples from stream B until memory is full or stream B is finished progress->readSecond += hashTable->ReadFromStream(streamB, MAX_MEMORY, fitsInMemory); if ( !fitsInMemory ) { if ( traceMode ) { cmsg.info() << "Switching to external hash-join algorithm!" << endl; cmsg.send(); } // create partitions for stream B pmB = new PartitionManager( hashFuncB, MAX_MEMORY, nBuckets, nPartitions, 0, &progress->streamB); // now we are in external mode progress->state = 1; // load current hash table content into partitions of stream B pmB->InitPartitions(hashTable); // partition the rest of stream B partitionB(); // sub-partition stream B with maximum recursion level 3 if ( subpartition ) { pmB->Subpartition(); } // create partitions for stream A according to partitioning of stream B pmA = new PartitionManager(hashFuncA, *pmB, &progress->streamA); // set current state partitioning = true; if ( traceMode ) { cmsg.info() << "Partitioning of stream B.." << " (timer: " << timer.diffSecondsReal() << ")" << endl << *pmB; cmsg.send(); } } else { delete hashFuncA; delete hashFuncB; } // read first tuple from stream A tupleA.setTuple( nextTupleA() ); } /* It may happen, that the localinfo object will be destroyed before all internal buffered tuples are delivered stream upwards, e.g. queries which use a ~head~ operator. In this case we need to delete also all tuples stored in memory. */ GraceHashJoinAlgorithm::~GraceHashJoinAlgorithm() { if ( traceMode ) { float sel = (float)progress->returned / ( (float)progress->readFirst * (float)progress->readSecond ); cmsg.info() << "C1: " << progress->readFirst << endl << "C2: " << progress->readSecond << endl << "m: " << progress->returned << endl << "Selectivity: " << sel << endl; cmsg.send(); } if ( hashTable ) { delete hashTable; hashTable = 0; } if ( iterA ) { delete iterA; iterA = 0; } if ( resultTupleType ) { resultTupleType->DeleteIfAllowed(); resultTupleType = 0; } if ( tupleTypeA ) { tupleTypeA->DeleteIfAllowed(); tupleTypeA = 0; } if ( tupleTypeB ) { tupleTypeB->DeleteIfAllowed(); tupleTypeB = 0; } if ( pmA ) { delete pmA; pmA = 0; } if ( pmB ) { delete pmB; pmB = 0; } } void GraceHashJoinAlgorithm::setIoBuffer(size_t bytes) { if ( bytes == UINT_MAX ) { // set buffer to system's page size PartitionManager::SetIOBufferSize( WinUnix::getPageSize() ); } else { // set buffer size PartitionManager::SetIOBufferSize(bytes); } } void GraceHashJoinAlgorithm::setMemory(size_t maxMemory, Supplier s) { if ( maxMemory == UINT_MAX ) { MAX_MEMORY = (qp->GetMemorySize(s) * 1024 * 1024); } else if ( maxMemory < MIN_USER_DEF_MEMORY ) { MAX_MEMORY = MIN_USER_DEF_MEMORY; } else { MAX_MEMORY = maxMemory; } progress->maxOperatorMemory = MAX_MEMORY; } void GraceHashJoinAlgorithm::setBuckets(size_t maxMemory, size_t n) { // calculate maximum number of buckets size_t maxBuckets = maxMemory / 1024; // make bucket number divisible by two n = n + n % 2; // check upper limit n = ( n > maxBuckets ) ? maxBuckets : n; // check lower limit n = ( n < MIN_BUCKETS ) ? MIN_BUCKETS : n; nBuckets = n; } void GraceHashJoinAlgorithm::setPartitions(size_t n) { assert(nBuckets >= MIN_BUCKETS); // calculate maximum number of partitions -> nBuckets/2 size_t maxPartitions = nBuckets / 2; if(maxPartitions > MAX_PARTITIONS){ maxPartitions = MAX_PARTITIONS; } // check if we should use default number of partitions if ( n == UINT_MAX ) { // default value is the number of inner nodes // in a binary tree with nBucket leafs and height h // on level h/2 n = 1 << (int)( log2(nBuckets) / 2.0 ); } // check lower limit n = n < MIN_PARTITIONS ? MIN_PARTITIONS : n; // check upper limit n = n > maxPartitions ? maxPartitions : n; nPartitions = n; } Tuple* GraceHashJoinAlgorithm::nextTupleA() { Word wTuple(Address(0)); qp->Request(streamA.addr, wTuple); if ( qp->Received(streamA.addr) ) { progress->readFirst++; Tuple* t = static_cast( wTuple.addr ); if ( tupleTypeA == 0 ) { tupleTypeA = t->GetTupleType(); tupleTypeA->IncReference(); } return t; } return NULL; } Tuple* GraceHashJoinAlgorithm::nextTupleB() { Word wTuple(Address(0)); qp->Request(streamB.addr, wTuple); if ( qp->Received(streamB.addr) ) { progress->readSecond++; Tuple* t = static_cast( wTuple.addr ); if ( tupleTypeB == 0 ) { tupleTypeB = t->GetTupleType(); tupleTypeB->IncReference(); } return t; } return NULL; } void GraceHashJoinAlgorithm::partitionB() { Tuple* t; while ( ( t = nextTupleB() ) ) { pmB->Insert(t); t->DeleteIfAllowed(); } return; } Tuple* GraceHashJoinAlgorithm::partitionA() { while ( tupleA.tuple ) { pmA->Insert(tupleA.tuple); tupleA.setTuple( nextTupleA() ); } // change state partitioning = false; // load first partition from B into memory finishedPartitionB = pmB->LoadPartition(curPartition, hashTable, MAX_MEMORY); if ( traceMode ) { //cmsg.info() << "Hash table content" << *hashTable << endl; cmsg.info() << "Partitioning of stream A.." << " (timer: " << timer.diffSecondsReal() << ")" << endl << *pmA << "finishedPartitionB" << finishedPartitionB << endl; cmsg.send(); } // start scan of corresponding partition A if(iterA){ delete iterA; } iterA = pmA->GetPartition(curPartition)->MakeScan(); tupleA.setTuple( iterA->GetNextTuple() ); return processPartitions(); } Tuple* GraceHashJoinAlgorithm::processPartitions() { while ( (int)curPartition < pmB->GetNoPartitions() ) { while ( tupleA.tuple ) { Tuple* tupleB = hashTable->Probe(tupleA.tuple); if ( tupleB ) { Tuple *result = new Tuple( resultTupleType ); Concat(tupleA.tuple, tupleB, result); progress->returned++; progress->tuplesProcessedSinceLastResult = 0; return result; } PartitionProgressInfo& pinfo = progress->streamA.partitionProgressInfo[curPartition]; pinfo.tuplesProc++; pinfo.curPassNo = (int)ceil( (double)pinfo.tuplesProc / (double) pinfo.tuples); progress->CheckProgressSinceLastResult(); tupleA.setTuple( iterA->GetNextTuple() ); } // Proceed to next partition if B(i) is finished if ( finishedPartitionB ) { curPartition++; } if ( (int)curPartition < pmB->GetNoPartitions() ) { // Load next part of partition B(i) into memory finishedPartitionB = pmB->LoadPartition(curPartition, hashTable, MAX_MEMORY); // Start scan of corresponding partition A(i) if(iterA){ delete iterA; } iterA = pmA->GetPartition(curPartition)->MakeScan(); // Read first tuple from A(i) tupleA.setTuple( iterA->GetNextTuple() ); } } return 0; } Tuple* GraceHashJoinAlgorithm::NextResultTuple() { if ( fitsInMemory ) { // Standard in-memory hash-join while ( tupleA.tuple ) { Tuple* tupleB = hashTable->Probe(tupleA.tuple); if ( tupleB ) { Tuple *result = new Tuple( resultTupleType ); Concat(tupleA.tuple, tupleB, result); progress->returned++; progress->tuplesProcessedSinceLastResult = 0; return result; } else { tupleA.setTuple( nextTupleA() ); } } return 0; } else if ( partitioning ) { return partitionA(); } else { return processPartitions(); } return 0; } /* 13 Implementation of value mapping function of operator ~gracehashjoin~ */ template int GraceHashJoinValueMap( Word* args, Word& result, int message, Word& local, Supplier s) { // if ( param = false ) // args[0] : stream A // args[1] : stream B // args[2] : attribute name of join attribute for stream A // args[3] : attribute name join attribute for stream B // args[4] : number of buckets // args[5] : attribute index of join attribute for stream A // args[6] : attribute index of join attribute for stream B // if ( param = true ) // args[0] : stream A // args[1] : stream B // args[2] : attribute name of join attribute for stream A // args[3] : attribute name join attribute for stream B // args[4] : number of buckets // args[5] : number of partitions (only if param is true) // args[6] : usable main memory in bytes (only if param is true) // args[7] : I/O buffer size in bytes (only if param is true) // args[8] : attribute index of join attribute for stream A // args[9] : attribute index of join attribute for stream B GraceHashJoinLocalInfo* li; li = static_cast( local.addr ); switch(message) { case OPEN: { if (li) { delete li; } li = new GraceHashJoinLocalInfo( s ); local.addr = li; // at this point the local value is well defined // afterwards progress request calls are // allowed. li->ptr = 0; qp->Open(args[0].addr); qp->Open(args[1].addr); return 0; } case REQUEST: { if ( li->ptr == 0 ) { int nBuckets = StdTypes::GetInt( args[4] ); if ( param ) { int nPartitions = StdTypes::GetInt( args[5] ); int maxMem = StdTypes::GetInt( args[6] ); int ioBufferSize = StdTypes::GetInt( args[7] ); int indexAttrA = StdTypes::GetInt( args[8] ); int indexAttrB = StdTypes::GetInt( args[9] ); li->ptr = new GraceHashJoinAlgorithm( args[0], indexAttrA, args[1], indexAttrB, nBuckets, s, li, nPartitions, maxMem, ioBufferSize ); } else { int indexAttrA = StdTypes::GetInt( args[5] ); int indexAttrB = StdTypes::GetInt( args[6] ); li->ptr = new GraceHashJoinAlgorithm( args[0], indexAttrA, args[1], indexAttrB, nBuckets, s, li ); } } GraceHashJoinAlgorithm* algo = li->ptr; result.setAddr( algo->NextResultTuple() ); return result.addr != 0 ? YIELD : CANCEL; } case CLOSE: { qp->Close(args[0].addr); qp->Close(args[1].addr); if (li) { delete li; local.addr = 0; } return 0; } } return 0; } /* 14 Instantiation of Template Functions For some reasons the compiler cannot expand these template functions in the file ~ExtRelation2Algebra.cpp~, thus the value mapping functions are instantiated here. */ template int GraceHashJoinValueMap( Word* args, Word& result, int message, Word& local, Supplier s ); template int GraceHashJoinValueMap( Word* args, Word& result, int message, Word& local, Supplier s); } // end of namespace extrel2