Files
secondo/Algebras/CRel/Operators/ItHashJoin.cpp
2026-01-23 17:03:45 +08:00

640 lines
15 KiB
C++

/*
----
This file is part of SECONDO.
Copyright (C) 2004, University in Hagen, Department of Computer Science,
Database Systems for New Applications.
SECONDO is free software; you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation; either version 2 of the License, or
(at your option) any later version.
SECONDO is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with SECONDO; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
----
*/
#include "ItHashJoin.h"
#include <algorithm>
#include <iostream>
#include "ListExprUtils.h"
#include "LogMsg.h"
#include "OperatorUtils.h"
#include "Project.h"
#include "QueryProcessor.h"
#include <set>
#include "StandardTypes.h"
#include "StreamValueMapping.h"
#include <string>
#include "Symbols.h"
#include "TypeUtils.h"
using namespace CRelAlgebra;
using namespace CRelAlgebra::Operators;
using std::copy;
using std::cout;
using std::set;
using std::string;
using std::vector;
extern NestedList *nl;
extern QueryProcessor *qp;
//ItHashJoin--------------------------------------------------------------------
ItHashJoin::ItHashJoin() :
Operator(info, valueMappings, SelectValueMapping, TypeMapping)
{
SetUsesArgsInTypeMapping();
SetUsesMemory();
}
ValueMapping ItHashJoin::valueMappings[] =
{
StreamValueMapping<State<false>, CreateState<false>>,
StreamValueMapping<State<true>, CreateState<true>>,
nullptr
};
const OperatorInfo ItHashJoin::info = OperatorInfo(
"itHashJoin",
"stream(tblock(ma, ((na0, ca0) ... (nai, cai)))) x "
"stream(tblock(mb, ((nb0, cb0) ... (nbi, cbi)))) x "
"jna x jnb x (pn0 ... pni) x bc x ml x bs "
"-> stream(tblock(bs, ((pn0 c0) ... (pni ci)))) with:\n\n"
"jna / jnb: symbol. the column names from block type a / b to join on.\n\n"
"(pn0 ... pni): symbol(s). optional. column names to project the result on."
"omiting -> (na0 ... nai, nb0 ... nbi).\n\n"
"bc: int. optional. bucket count of the used hash map. omitting -> 1048576."
"\n\n"
"ml: int. optional. main memory limit for this operator in MiB. omitting -> "
"provided by Secondo.\n\n"
"bs: int. optional. block size. omitting -> ma.",
"_ _ itHashJoin[_, _, [list], _, _, _]",
"Executes a iterative hash join algorithm over two streams of tuple blocks."
"The second stream arument will be used for index creation and should "
"contain less entries than the first one. Optionally the resulting tuple "
"blocks can be projected on a selected subset of columns. This avoids "
"copying workload and is therefore recommended.", "");
ListExpr ItHashJoin::TypeMapping(ListExpr args)
{
const uint64_t argCount = nl->ListLength(args);
if (argCount < 4 || argCount > 8)
{
return GetTypeError("Expected four to seven arguments.");
}
//Check 'stream a' argument
string error;
TBlockTI blockAInfo = TBlockTI(false);
if (!IsBlockStream(nl->First(nl->First(args)), blockAInfo, error))
{
return GetTypeError(0, "stream a", error);
}
//Check 'stream b' argument
TBlockTI blockBInfo = TBlockTI(false);
if (!IsBlockStream(nl->First(nl->Second(args)), blockBInfo, error))
{
return GetTypeError(1, "stream b", error);
}
//Check 'column-name a'
string nameA;
if (!TryGetSymbolValue(nl->First(nl->Third(args)), nameA, error))
{
return GetTypeError(2, "column-name a", error);
}
uint64_t nameAIndex;
if (!GetIndexOfColumn(blockAInfo, nameA, nameAIndex))
{
return GetTypeError(2, "column-name a",
"Colum named '" + nameA + "' not found.");
}
//Check 'column-name b'
string nameB;
if (!TryGetSymbolValue(nl->First(nl->Fourth(args)), nameB, error))
{
return GetTypeError(3, "column-name b", error);
}
uint64_t nameBIndex;
if (!GetIndexOfColumn(blockBInfo, nameB, nameBIndex))
{
return GetTypeError(3, "column-name b",
"Colum named '" + nameB + "' not found.");
}
if (!nl->Equal(blockAInfo.columnInfos[nameAIndex].type,
blockBInfo.columnInfos[nameBIndex].type))
{
return GetTypeError("The columns to join on have different types.");
}
//Initialize the result type from both block-types
//Check for duplicate column names
TBlockTI resultBlockInfo = TBlockTI(false);
set<string> columnNames;
for (const TBlockTI::ColumnInfo &columnInfo : blockAInfo.columnInfos)
{
columnNames.insert(columnInfo.name);
resultBlockInfo.columnInfos.push_back(columnInfo);
}
for (const TBlockTI::ColumnInfo &columnInfo : blockBInfo.columnInfos)
{
if (!columnNames.insert(columnInfo.name).second)
{
return GetTypeError(1, "stream b", "Column name " + columnInfo.name +
" allready exists in stream a.");
}
resultBlockInfo.columnInfos.push_back(columnInfo);
}
//Check optional arguments
uint64_t argNo = 5;
ListExpr appendArgs = nl->TwoElemList(nl->IntAtom(nameAIndex),
nl->IntAtom(nameBIndex));
//Check 'projection' argument
if (argCount >= argNo)
{
Project::Info info(resultBlockInfo, nl->Second(nl->Nth(argNo, args)));
if (!info.HasError())
{
resultBlockInfo = info.GetBlockTypeInfo();
appendArgs = nl->ThreeElemList(ToIntListExpr(info.GetIndices()),
nl->IntAtom(nameAIndex),
nl->IntAtom(nameBIndex));
++argNo;
}
else
{
error = info.GetError();
if (argCount == 8)
{
return GetTypeError(argNo - 1, "projection", error);
}
}
}
//Check 'bucket count' argument
if (argCount >= argNo)
{
if (!CcInt::checkType(nl->First(nl->Nth(argNo, args))))
{
return GetTypeError(argNo - 1, "projection | bucket count",
error + "Not a int.");
}
++argNo;
}
//Check 'memory limit' argument
if (argCount >= argNo)
{
if (!CcInt::checkType(nl->First(nl->Nth(argNo, args))))
{
return GetTypeError(argNo - 1, "memory limit", "Not a int.");
}
++argNo;
}
//Check 'block size' argument
resultBlockInfo.SetDesiredBlockSize(blockAInfo.GetDesiredBlockSize());
if (argCount >= argNo)
{
const ListExpr arg = nl->Nth(argNo, args);
if (!CcInt::checkType(nl->First(arg)))
{
return GetTypeError(argNo - 1, "block size", "Not a int.");
}
const long blockSize = nl->IntValue(nl->Second(arg));
if (blockSize > 0)
{
resultBlockInfo.SetDesiredBlockSize(blockSize);
}
++argNo;
}
return nl->ThreeElemList(nl->SymbolAtom(Symbol::APPEND()), appendArgs,
resultBlockInfo.GetTypeExpr(true));
}
int ItHashJoin::SelectValueMapping(ListExpr args)
{
if (nl->HasMinLength(args, 5) && !CcInt::checkType(nl->Fifth(args)))
{
return 1;
}
return 0;
}
template<bool project>
ItHashJoin::State<project> *ItHashJoin::CreateState(ArgVector args, Supplier s)
{
static const uint64_t defaultBucketCount = 1024 * 1024;
const uint64_t argCount = qp->GetNoSons(s);
Supplier streamA = args[0].addr,
streamB = args[1].addr;
uint64_t joinIndexA = ((CcInt*)args[argCount - 2].addr)->GetValue(),
joinIndexB = ((CcInt*)args[argCount - 1].addr)->GetValue();
const TBlockTI blockTypeInfo = TBlockTI(qp->GetType(s), false);
uint64_t columnCountA =
TBlockTI(qp->GetType(streamA), false).columnInfos.size(),
columnCountB = TBlockTI(qp->GetType(streamB), false).columnInfos.size();
IndexProjection *projectionsA,
*projectionsB;
long bucketCount,
memLimit;
if (project)
{
bucketCount = argCount > 8 ? ((CcInt*)args[5].addr)->GetValue() : 0;
memLimit = (argCount > 9 ? ((CcInt*)args[6].addr)->GetValue() : 0) *
1024 * 1024;
vector<IndexProjection> tmpProjectionsA,
tmpProjectionsB;
uint64_t index = 0;
for (const Word &subArg : GetSubArgvector(args[argCount - 3].addr))
{
const uint64_t projectedIndex = ((CcInt*)subArg.addr)->GetValue();
if (projectedIndex < columnCountA)
{
tmpProjectionsA.push_back(IndexProjection(projectedIndex, index));
}
else
{
tmpProjectionsB.push_back(IndexProjection(projectedIndex - columnCountA,
index));
}
++index;
}
columnCountA = tmpProjectionsA.size();
columnCountB = tmpProjectionsB.size();
projectionsA = new IndexProjection[columnCountA];
projectionsB = new IndexProjection[columnCountB];
copy(tmpProjectionsA.begin(), tmpProjectionsA.end(), projectionsA);
copy(tmpProjectionsB.begin(), tmpProjectionsB.end(), projectionsB);
}
else
{
bucketCount = argCount > 6 ? ((CcInt*)args[4].addr)->GetValue() : 0;
memLimit = (argCount > 7 ? ((CcInt*)args[5].addr)->GetValue() : 0) *
1024 * 1024;
projectionsA = nullptr;
projectionsB = nullptr;
}
if (bucketCount <= 0)
{
bucketCount = defaultBucketCount;
}
if (memLimit <= 0)
{
memLimit = qp->GetMemorySize(s) * 1024 * 1024;
}
return new State<project>(streamA, streamB, joinIndexA, joinIndexB,
columnCountA, columnCountB, projectionsA,
projectionsB, bucketCount, memLimit, blockTypeInfo);
}
//ItHashJoin::State-------------------------------------------------------------
template<bool project>
ItHashJoin::State<project>::State(Supplier streamA, Supplier streamB,
uint64_t joinIndexA, uint64_t joinIndexB,
uint64_t columnCountA, uint64_t columnCountB,
IndexProjection *projectionsA,
IndexProjection *projectionsB,
uint64_t bucketCount, uint64_t memLimit,
const TBlockTI &blockTypeInfo) :
m_joinIndexA(joinIndexA),
m_joinIndexB(joinIndexB),
m_columnCountA(columnCountA),
m_columnCountB(columnCountB),
m_memLimit(memLimit),
m_blockSize(blockTypeInfo.GetDesiredBlockSize() * TBlockTI::blockSizeFactor),
m_map(bucketCount),
m_blockA(nullptr),
m_isBExhausted(false),
m_streamA(streamA),
m_streamB(streamB),
m_blockInfo(blockTypeInfo.GetBlockInfo()),
m_tuple(new AttrArrayEntry[blockTypeInfo.columnInfos.size()]),
m_projectionsA(projectionsA),
m_projectionsB(projectionsB),
m_iterations(0)
{
m_streamA.open();
m_streamB.open();
}
template<bool project>
ItHashJoin::State<project>::~State()
{
if (m_blockA != nullptr)
{
m_blockA->DecRef();
}
for (TBlock *block : m_blocksB)
{
block->DecRef();
}
delete[] m_tuple;
if (m_projectionsA != nullptr)
{
delete[] m_projectionsA;
}
if (m_projectionsB != nullptr)
{
delete[] m_projectionsB;
}
m_streamA.close();
m_streamB.close();
cout << "iterative hash join finished with " << m_iterations
<< " iterations\n";
}
template<bool project>
TBlock *ItHashJoin::State<project>::Request()
{
if (m_blocksB.empty() && !ProceedStreamB())
{
return nullptr;
}
const uint64_t columnCountA = m_columnCountA,
columnCountB = m_columnCountB;
if (m_blockA == nullptr)
{
if ((m_blockA = m_streamA.request()) == nullptr)
{
m_streamA.close();
m_streamA.open();
if ((m_blockA = m_streamA.request()) == nullptr || !ProceedStreamB())
{
return nullptr;
}
}
m_blockAIterator = m_blockA->GetFilteredIterator();
if (m_blockAIterator.IsValid())
{
const TBlockEntry &tuple = m_blockAIterator.Get();
m_mapResult = m_map.Get(tuple[m_joinIndexA]);
if (m_mapResult.IsValid())
{
for (uint64_t i = 0; i < columnCountA; ++i)
{
if (project)
{
const IndexProjection &projection = m_projectionsA[i];
m_tuple[projection.projection] = tuple[projection.index];
}
else
{
m_tuple[i] = tuple[i];
}
}
}
}
}
TBlock *block = new TBlock(m_blockInfo, 0, 0);
do
{
while (!m_mapResult.IsValid())
{
if (!m_blockAIterator.IsValid() || !m_blockAIterator.MoveToNext())
{
do
{
m_blockA->DecRef();
if ((m_blockA = m_streamA.request()) == nullptr)
{
if (!ProceedStreamB())
{
break;
}
m_streamA.close();
m_streamA.open();
if ((m_blockA = m_streamA.request()) == nullptr)
{
break;
}
}
m_blockAIterator = m_blockA->GetFilteredIterator();
}
while (!m_blockAIterator.IsValid());
}
if (m_blockA == nullptr || m_blocksB.empty())
{
break;
}
const TBlockEntry &tuple = m_blockAIterator.Get();
m_mapResult = m_map.Get(tuple[m_joinIndexA]);
if (m_mapResult.IsValid())
{
for (uint64_t i = 0; i < columnCountA; ++i)
{
if (project)
{
const IndexProjection &projection = m_projectionsA[i];
m_tuple[projection.projection] = tuple[projection.index];
}
else
{
m_tuple[i] = tuple[i];
}
}
break;
}
}
if (!m_mapResult.IsValid())
{
break;
}
const TBlockEntry tupleB = m_mapResult.GetValue();
for (uint64_t i = 0; i < columnCountB; ++i)
{
if (project)
{
const IndexProjection &projection = m_projectionsB[i];
m_tuple[projection.projection] = tupleB[projection.index];
}
else
{
m_tuple[columnCountA + i] = tupleB[i];
}
}
block->Append(m_tuple);
m_mapResult.MoveToNext();
}
while (block->GetSize() < m_blockSize);
return block;
}
template<bool project>
uint64_t ItHashJoin::State<project>::HashKey(const AttrArrayEntry &entry)
{
return entry.GetHash();
}
template<bool project>
bool ItHashJoin::State<project>::CompareKey(const AttrArrayEntry &a,
const AttrArrayEntry &b)
{
return a == b;
}
template<bool project>
bool ItHashJoin::State<project>::ProceedStreamB()
{
m_map.Clear();
for (TBlock *block : m_blocksB)
{
block->DecRef();
}
m_blocksB.clear();
if (!m_isBExhausted)
{
++m_iterations;
uint64_t size = sizeof(ItHashJoin::State<project>),
lastBlockSize = 0;
do
{
TBlock *block = m_streamB.request();
if (block == nullptr)
{
m_isBExhausted = true;
break;
}
else
{
m_blocksB.push_back(block);
for (const TBlockEntry &tuple : block->GetFilter())
{
m_map.Add(tuple[m_joinIndexB], tuple);
}
lastBlockSize = block->GetSize();
size += lastBlockSize;
}
}
while (size + m_map.GetSize() + lastBlockSize < m_memLimit);
}
return !m_blocksB.empty();
}