/* ---- This file is part of SECONDO. Copyright (C) 2019, Faculty of Mathematics and Computer Science, Database Systems for New Applications. SECONDO is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. SECONDO is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with SECONDO; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ---- //[<] [\ensuremath{<}] //[>] [\ensuremath{>}] \setcounter{tocdepth}{3} \tableofcontents 1 Basic Operators Header File */ #pragma once #include #include #include "Operator.h" #include "vector" #include "thread" #include "condition_variable" #include #include namespace mthreaded { constexpr size_t BUFFERSIZE = 10; /* 1.1 The MThreaded MThreadedMergeSort Operator stream x attr x const (desc, incr) -> stream */ // used for showing which stream in pipelines is empty enum TupleEmpty { both, first, second }; class CompareByVector { private: const std::vector> sortAttr; public: // Constructor: Set compare Vector explicit CompareByVector(const std::vector> _sortAttr) : sortAttr(_sortAttr) {} ~CompareByVector() {} // Compare 2 tuples and return true of 1>=2 bool compTuple(const Tuple* a, const Tuple* b) const; // Debug: first attr index int firstAttr() const; }; class TournamentTree { private: struct node { Tuple* tuple; size_t leave_small; size_t leave_large; bool active; node(Tuple* _tuple, size_t _leave_small, size_t _leave_large, bool _active) : tuple(_tuple), leave_small(_leave_small), leave_large(_leave_large), active(_active) {} }; bool treeComplete; size_t countInactive; size_t maxMem; std::vector tree; std::shared_ptr compareClass; static constexpr size_t MEMINTREE = 2 * sizeof(void*) + 4 * sizeof(int) + 2 * sizeof(bool); // insert tuple in completed tree recursively void exchange(Tuple* tuple, const size_t pos, const bool active); public: // constructor TournamentTree( std::shared_ptr _compareClass, size_t _maxMem); ~TournamentTree() { tree.clear(); } // fill leaves void fillLeaves(Tuple* tuple); // build tree from leaves void buildTree(); // insert new tuple in tree and pull Tuple* replace(Tuple* tuple); // make all nodes active void makeActive(); // test if active bool isActive() const; bool isEmpty() const; // test if next tuple fits in memory bool testMemSizeFill(const Tuple* tuple) ; // test if next tuple fits in memory bool testMemSizeExchange(Tuple* tuple); // DEBUG void showTree() const; }; class MergeFeeder { private: std::shared_ptr compare; std::shared_ptr buf1; std::shared_ptr buf2; std::shared_ptr> mergeBuffer; TupleEmpty runEmpty; public: // Constructor: 2 incoming Buffer MergeFeeder(std::shared_ptr _buf1, std::shared_ptr _buf2, std::shared_ptr _compare, std::shared_ptr> _mergeBuffer); ~MergeFeeder() {} // Thread void operator()(); }; class NoMergeFeeder { private: std::shared_ptr buf; std::shared_ptr> mergeBuffer; public: // Constructor: 2 incoming Buffer NoMergeFeeder(std::shared_ptr _buf, std::shared_ptr> _mergeBuffer); ~NoMergeFeeder() {} // Thread void operator()(); }; class MergePipeline { private: std::shared_ptr compare; std::shared_ptr> mergeBuffer_f1; std::shared_ptr> mergeBuffer_f2; std::shared_ptr> mergeBuffer; public: MergePipeline( std::shared_ptr _compare, std::shared_ptr> _mergeBuffer_f1, std::shared_ptr> _mergeBuffer_f2, std::shared_ptr> _mergeBuffer); ~MergePipeline() {} // Thread void operator()(); }; class Suboptimal { private: size_t maxMem; std::shared_ptr> partBuffer; TupleType* tt; std::shared_ptr compare; std::vector> runs1; std::vector> runs2; size_t threadNumber; std::shared_ptr>> bufferTransfer; public: explicit Suboptimal( size_t _maxMem, std::shared_ptr> _partBuffer, std::shared_ptr _compare, TupleType* _tt, size_t _threadNumber, std::shared_ptr>> _bufferTransfer); ~Suboptimal(); // Thread void operator()(); // Replacement Selection Sort void replacementSelectionSort(std::shared_ptr sortTree); // merge runs std::shared_ptr merge( std::shared_ptr run1, std::shared_ptr run2); }; class mergeSortLI { private: Stream stream; const std::vector> sortAttr; std::shared_ptr>> mergeFn; size_t lastWorker; TupleType* tt; std::vector sortThreads; std::shared_ptr compareLI; Tuple* tupleNext1; Tuple* tupleNext2; TupleEmpty tupleEmpty; std::vector>> partBuffer; std::shared_ptr> tupleBufferIn1; std::shared_ptr> tupleBufferIn2; std::vector>> mergeBuffer; const size_t maxMem; size_t coreNo; size_t coreNoWorker; const size_t cores = MThreadedSingleton::getCoresToUse(); bool streamEmpty; public: //Constructor mergeSortLI( Word _stream, const std::vector> _sortAttr, const size_t _maxMem); //Destructor ~mergeSortLI(); //Output Tuple* getNext(); private: //Scheduler void DistributorCollector(); }; class op_mergeSort { static ListExpr mergeSortTM(ListExpr args); static int mergeSortVM(Word* args, Word &result, int message, Word &local, Supplier s); std::string getOperatorSpec(); public: explicit op_mergeSort() = default; ~op_mergeSort() = default; std::shared_ptr getOperator(); }; } // end of namespace mthreaded