/* 1 Class FlobManager The only class using the FlobManager class is Flob. For this reason, all functions are declared as private and Flob is declared to be a friend of that class. */ #include "SecondoSMI.h" #include #include #include "Flob.h" #include "assert.h" #include #include "NativeFlobCache.h" #include "PersistentFlobCache.h" #include "Stack.h" #include #include "ExternalFileCache.h" #include "WinUnix.h" #ifdef THREAD_SAFE #include #endif #undef __TRACE_ENTER__ #undef __TRACE_LEAVE__ #define __TRACE_ENTER__ #define __TRACE_LEAVE__ /* #define __TRACE_ENTER__ std::cerr << "Enter : " << \ __PRETTY_FUNCTION__ << std::endl; #define __TRACE_LEAVE__ std::cerr << "Leave : " << \ __PRETTY_FUNCTION__ << "@" << __LINE__ << std::endl; */ FlobManager* FlobManager::instance = 0; static NativeFlobCache* nativeFlobCache = 0; static PersistentFlobCache* persistentFlobCache = 0; static Stack* DestroyedFlobs = 0; static ExternalFileCache* externalFileCache = 0; // some value for cache sizes static size_t NATIVE_CACHE_MAXSIZE = 64 * 1024 * 1024; static size_t NATIVE_CACHE_SLOTSIZE = 16 * 1024 * 1024; static size_t NATIVE_CACHE_AVGSIZE = 512; static size_t PERSISTENT_CACHE_MAXSIZE = 64 * 1024 * 1024; static size_t PERSISTENT_CACHE_SLOTSIZE = 16 * 1024 * 1024; static size_t PERSISTENT_CACHE_AVGSIZE = 512; static size_t FILEID_CACHE_MAXSIZE = 64 * 1024 * 1024; /* The FlobManager is realized to be a singleton. So, not the contructor but this getInstance function is to call to get the only one FlobManager instance. */ #define FM_useStats #ifdef FM_useStats static size_t createdFlobs; static size_t reusedFlobs; #endif void FlobManager::printStatistics(std::ostream &out) const { #ifndef FM_useStats out << "no statistics available" << std::endl; #else out << "created Flobs" << createdFlobs << std::endl; out << "reusedFlobs " << reusedFlobs << std::endl; #endif } void FlobManager::resetStatistics() const { #ifdef FM_useStats createdFlobs = 0; reusedFlobs = 0; #endif } FlobManager &FlobManager::getInstance() { __TRACE_ENTER__ if (!instance) { instance = new FlobManager(); instance->resetStatistics(); } __TRACE_LEAVE__ return *instance; } bool FlobManager::destroyInstance() { __TRACE_ENTER__ if (!instance) { __TRACE_LEAVE__ return false; } else { delete instance; instance = 0; __TRACE_LEAVE__ return true; } } /* ~Destructor~ The destructor should never be called from outside. Instead the destroyInstance function should be called. By calling the destructor, all open files are closed and the nativFlobFile is deleted. */ FlobManager::~FlobManager() { __TRACE_ENTER__ if (nativeFlobCache) { delete nativeFlobCache; nativeFlobCache = 0; } if (!nativeFlobFile->Close()) { std::cerr << "Problem in closing nativeFlobFile" << std::endl; } if (!nativeFlobFile->Remove()) { std::cerr << "Problem in deleting nativeFlobFile" << std::endl; } delete nativeFlobFile; nativeFlobFile = 0; // close all files stored in Map std::map, SmiRecordFile*>::iterator iter; for (iter = openFiles.begin(); iter != openFiles.end(); iter++) { if (iter->first.first != nativeFlobs && iter->first.second) { iter->second->Close(); delete iter->second; } } nativeFlobs = 0; openFiles.clear(); // kill persistent Flob cache if (persistentFlobCache) { delete persistentFlobCache; persistentFlobCache = 0; } DestroyedFlobs->makeEmpty(); delete DestroyedFlobs; DestroyedFlobs = 0; if (externalFileCache) { delete externalFileCache; externalFileCache = 0; } __TRACE_LEAVE__ } SmiRecordFile* FlobManager::getFile(const SmiFileId &fileId, const char mode) { __TRACE_ENTER__ assert((mode == 0) || (mode == 1) || (mode == 2)); bool isTemp = (mode != 0); if (fileId == nativeFlobs && isTemp) { __TRACE_LEAVE__ return nativeFlobFile; } SmiRecordFile* file(0); #ifdef THREAD_SAFE boost::lock_guard guard(omtx); #endif std::pair finder(fileId, isTemp); std::map, SmiRecordFile*>::iterator it = openFiles.find(finder); if (it == openFiles.end()) { // file not found file = new SmiRecordFile(false, 0, isTemp); openFiles[finder] = file; bool openOk = file->Open(fileId); assert(openOk); } else { file = it->second; } __TRACE_LEAVE__ return file; } /* ~dropfile~ If Flob data is stored into a file, the flobmanager will create a new File from the file id and keep the open file. If the file should be deleted or manipulated by other code, the flobmanger has to give up the control over that file. This can be realized by calling the ~dropFile~ function. */ bool FlobManager::dropFile(const SmiFileId &id, const char mode) { __TRACE_ENTER__ if ((mode == 2) || (mode == 3)) { __TRACE_LEAVE__ return false; // never drop non berkeley db files } bool isTemp = mode == 1; if (isTemp && id == nativeFlobs) { //never give up the control of native flobs __TRACE_LEAVE__ return false; } #ifdef THREAD_SAFE boost::lock_guard guard(omtx); #endif std::pair finder(id, isTemp); std::map, SmiRecordFile*>::iterator it = openFiles.find(finder); if (it != openFiles.end()) { SmiRecordFile* file; file = it->second; file->Close(); delete file; openFiles.erase(it); __TRACE_LEAVE__ __TRACE_LEAVE__ return true; } else { return false; // file not handled by fm __TRACE_LEAVE__ } __TRACE_LEAVE__ } bool FlobManager::dropFiles() { __TRACE_ENTER__ #ifdef THREAD_SAFE boost::lock_guard guard(omtx); #endif std::map, SmiRecordFile*>::iterator it = openFiles.begin(); int count = 0; while (it != openFiles.end()) { if (it->first.first != nativeFlobs || !it->first.second) { it->second->Close(); delete it->second; count++; } it++; } openFiles.clear(); __TRACE_LEAVE__ return count > 0; } void FlobManager::clearCaches() { __TRACE_ENTER__ #ifdef THREAD_SAFE {boost::lock_guard guard(ncmtx); #endif if (nativeFlobCache) { nativeFlobCache->clear(); } #ifdef THREAD_SAFE } {boost::lock_guard guard(pcmtx); #endif if (persistentFlobCache) { persistentFlobCache->clear(); } #ifdef THREAD_SAFE } #endif __TRACE_LEAVE__ } bool FlobManager::makeControllable(Flob &flob) { __TRACE_ENTER__ if (flob.dataPointer) { assert(flob.id.isDestroyed()); // found an evil flob, create a nice one from it char* dp = flob.dataPointer; flob.dataPointer = 0; if (!create(flob.size, flob)) { assert(false); __TRACE_LEAVE__ return false; } if (!putData(flob, dp, 0, flob.size, false)) { assert(false); __TRACE_LEAVE__ return false; } free(dp); } __TRACE_LEAVE__ return true; } void FlobManager::createFromBlock(Flob &result, const char* buffer, const SmiSize &size, const bool autodestroy) { __TRACE_ENTER__ if (autodestroy) { result.id.destroy(); } assert(result.id.isDestroyed()); assert(result.dataPointer == 0); result.dataPointer = (char*) malloc(size); result.size = size; memcpy(result.dataPointer, buffer, size); __TRACE_LEAVE__ } /* ~resize~ Changes the size of flob. */ bool FlobManager::resize(Flob &flob, const SmiSize &newSize, const bool ignoreCache/*=false*/) { __TRACE_ENTER__ if (flob.dataPointer) { makeControllable(flob); } if (newSize == flob.size) { __TRACE_LEAVE__ return true; } assert(!flob.id.isDestroyed()); FlobId id = flob.id; assert((id.mode == 0) || (id.mode == 1)); // don't allow resizing of // non berkeley db flobs SmiFileId fileId = id.fileId; SmiRecordId recordId = id.recordId; SmiSize offset = id.offset; bool isTemp = id.mode == 1; if (!isTemp || (fileId != nativeFlobs)) { // the allocated memory for the slot may be too small now std::cerr << "Warning: Resize a persistent Flob" << std::endl; #ifdef THREAD_SAFE boost::lock_guard guard(pcmtx); #endif persistentFlobCache->killLastSlot(flob); } if (isTemp && (fileId == nativeFlobs) && !ignoreCache) { #ifdef THREAD_SAFE boost::lock_guard guard(ncmtx); #endif bool res = nativeFlobCache->resize(flob, newSize); __TRACE_LEAVE__ return res; } // resize the record containing the flob SmiRecord record; SmiRecordFile* file = getFile(fileId, id.mode); bool ok; #ifdef THREAD_SAFE {boost::lock_guard guard(omtx); #endif ok = file->SelectRecord(recordId, record, SmiFile::Update); #ifdef THREAD_SAFE } #endif if (!ok) { std::cerr << __PRETTY_FUNCTION__ << "@" << __LINE__ << "Select Record failed:" << flob << std::endl; assert(false); __TRACE_LEAVE__ return false; } #ifdef THREAD_SAFE {boost::lock_guard guard(omtx); #endif if (record.Size() != newSize) { if (record.Resize(offset + newSize)) { record.Finish(); flob.size = newSize; __TRACE_LEAVE__ return true; } } else { __TRACE_LEAVE__ return true; } #ifdef THREAD_SAFE } #endif std::cerr << "Resize failed" << std::endl; __TRACE_LEAVE__ return false; } /* ~getData~ The getData function retrieves Data from a Flob. The __dest__ buffer must be provided by the caller. The requested content is copyied to that buffer. */ bool FlobManager::getData( const Flob &flob, // Flob containing the data char* dest, // destination buffer const SmiSize &offset, // offset within the Flob const SmiSize &size, const bool ignoreCache /* = false*/ ) { // requested data size __TRACE_ENTER__ assert(offset + size <= flob.size); if (size == 0) { __TRACE_LEAVE__ return true; } if (flob.dataPointer) { memcpy(dest, flob.dataPointer + offset, size); __TRACE_LEAVE__ return true; } assert(!flob.id.isDestroyed()); FlobId id = flob.id; assert((id.mode == 0) || (id.mode == 1) || (id.mode == 2)); if (id.mode < 2) { SmiFileId fileId = id.fileId; bool isTemp = id.mode == 1; if (!ignoreCache) { if (fileId != nativeFlobs || !isTemp) { #ifdef THREAD_SAFE boost::lock_guard guard(pcmtx); #endif bool res = persistentFlobCache->getData(flob, dest, offset, size); __TRACE_LEAVE__ return res; } else { #ifdef THREAD_SAFE boost::lock_guard guard(ncmtx); #endif bool res = nativeFlobCache->getData(flob, dest, offset, size); __TRACE_LEAVE__ return res; } } // retrieve data from disk SmiRecordId recordId = id.recordId; SmiSize floboffset = id.offset; SmiRecord record; SmiRecordFile* file = getFile(fileId, id.mode); SmiSize recOffset = floboffset + offset; SmiSize actRead; bool ok; #ifdef THREAD_SAFE {boost::lock_guard guard(omtx); #endif ok = file->Read(recordId, dest, size, recOffset, actRead); #ifdef THREAD_SAFE } #endif if (!ok) { std::cerr << " error in getting data from flob " << flob << std::endl; std::cerr << " actSize = " << actRead << std::endl; std::cerr << "try to read = " << size << std::endl; std::string err; SmiEnvironment::GetLastErrorCode(err); std::cerr << " err " << err << std::endl; } assert(ok); if (actRead != size && file == nativeFlobFile) { __TRACE_LEAVE__ return true; } //assert(actRead == size); } else if (id.mode == 2) { // retrieve data from external disk file SmiRecordId recordId = id.recordId; SmiSize flobOffset = id.offset; SmiSize recOffset = flobOffset + offset; SmiSize actRead; #ifdef THREAD_SAFE {boost::lock_guard guard(omtx); #endif std::ifstream* tupleFile = externalFileCache->getFile(recordId); /* Each time read part data within the Flob, decided by the recOffset and the size */ tupleFile->seekg(recOffset, std::ios::beg); tupleFile->read(dest, size); SmiSize curr = tupleFile->tellg(); actRead = curr - recOffset; #ifdef THREAD_SAFE } #endif if (actRead == size) { __TRACE_LEAVE__ return true; } else { std::cerr << " error in getting data from flob " << flob << std::endl; std::cerr << " mode = 2" << std::endl; std::cerr << " flobOffset = " << flobOffset << std::endl; std::cerr << " actSize = " << actRead << std::endl; std::cerr << " try to read = " << size << std::endl; } } else if (id.mode == 3) { // todo: something about the PSLocation } __TRACE_LEAVE__ return true; } /* ~destroy~ Frees all resources occupied by the Flob. After destroying a flob. No data can be accessed. */ bool FlobManager::destroy(Flob &victim) { __TRACE_ENTER__ if (victim.dataPointer) { assert(victim.id.isDestroyed()); // should not have a valid id free(victim.dataPointer); victim.dataPointer = 0; __TRACE_LEAVE__ return true; } FlobId id = victim.id; if ((id.mode == 2) || (id.mode == 3)) { std::cerr << "FlobManager::destroy(Flob& victim)" << std::endl; // do not destroy flob data within non berkeley db files. victim.id.destroy(); __TRACE_LEAVE__ return true; } assert(!victim.id.isDestroyed()); bool isTemp = id.mode == 1; if (victim.id.fileId == nativeFlobs && isTemp) { /* std::cout << "Destroy native Flob " << victim.id << std::endl; char* buffer = new char[2048]; WinUnix::stacktrace("SecondoBDB", buffer); std::cout << "Stacktrace : " << buffer << std::endl << std::endl; delete[] buffer; */ #ifdef THREAD_SAFE {boost::lock_guard guard(ncmtx); #endif nativeFlobCache->erase(victim); // delete from cache #ifdef THREAD_SAFE } {boost::lock_guard guard(omtx); #endif DestroyedFlobs->push(victim); #ifdef THREAD_SAFE } #endif // if(DestroyedFlobs->getSize() > 64000){ // cerr << "Stack of destroyed flobs reaches a critical size " << endl; // } victim.id.destroy(); __TRACE_LEAVE__ return true; } SmiSize size = victim.getSize(); SmiRecordId recordId = id.recordId; SmiSize offset = id.offset; // possible kill flob from persistent flob cache // or wait until cache is removed automatically = current state SmiRecordFile* file = getFile(id.fileId, id.mode); bool ok; SmiRecord record; #ifdef THREAD_SAFE {boost::lock_guard guard(omtx); #endif ok = file->SelectRecord(recordId, record, SmiFile::Update); #ifdef THREAD_SAFE } #endif if (!ok) { // record not found in file std::cerr << __PRETTY_FUNCTION__ << "@" << __LINE__ << "Select Record failed:" << victim << std::endl; assert(false); __TRACE_LEAVE__ victim.id.destroy(); return false; } if (id.fileId == nativeFlobs && isTemp) { // each native flob is exlusive owner of an record #ifdef THREAD_SAFE {boost::lock_guard guard1(ncmtx); boost::lock_guard guard2(omtx); #endif nativeFlobCache->erase(victim); file->DeleteRecord(recordId); #ifdef THREAD_SAFE } #endif victim.id.destroy(); __TRACE_LEAVE__ return true; } // check whether the flob occupies the whole record SmiSize recordSize = record.Size(); if (offset != 0) { // flob doesn't start at the begin of the record if (offset + size != recordSize) { std::cout << "cannot destroy flob, because after the flob data are" " available" << std::endl; victim.id.destroy(); __TRACE_LEAVE__ return false; } else { // truncate record #ifdef THREAD_SAFE {boost::lock_guard guard(omtx); #endif record.Truncate(offset); record.Finish(); #ifdef THREAD_SAFE } #endif } } else { if ((recordSize != size) && (id.fileId != nativeFlobs)) { std::cout << "cannot destroy flob, because after the flob data are" " available" << std::endl; victim.id.destroy(); __TRACE_LEAVE__ return false; } else { // record stores only the flob #ifdef THREAD_SAFE {boost::lock_guard guard(omtx); #endif file->DeleteRecord(recordId); #ifdef THREAD_SAFE } #endif } } victim.id.destroy(); __TRACE_LEAVE__ return true; } bool FlobManager::destroyIfNonPersistent(Flob &victim) { __TRACE_ENTER__ if (victim.id.fileId == nativeFlobs && victim.id.mode == 1) { __TRACE_LEAVE__ return destroy(victim); } __TRACE_LEAVE__ return true; } /* ~saveTo~ Save the content of a flob into a file at a specific position (given as recordId and offset). This will result in another Flob which is returned as the result of this function. A record with the corresponding id must already exist in the file. Initial implementation, should be changed to support Flobs larger than the available main memory. */ bool FlobManager::saveTo(const Flob &src, // Flob to save const SmiFileId fileId, // target file id const SmiRecordId recordId, // target record id const SmiSize offset, const char mode, Flob &result) { // offset within the record __TRACE_ENTER__ assert(fileId != nativeFlobs); assert((mode == 0) || (mode == 1)); // no non-berkeley files SmiRecordFile* file = getFile(fileId, mode); __TRACE_LEAVE__ return saveTo(src, file, recordId, offset, result); } /* ~saveTo~ Save the content of a flob into a file at a specific position (given as recordId and offset). This will result in another Flob which is returned as the result of this function. A record with the corresponding id must already exist in the file. Initial implementation, should be changed to support Flobs larger than the available main memory. */ bool FlobManager::saveTo(const Flob &src, // Flob to save SmiRecordFile* file, // target file const SmiRecordId &recordId, // target record id const SmiSize &offset, Flob &result) { // offset within the record __TRACE_ENTER__ //if(src.size==0){ // __TRACE_LEAVE__ // return false; //} assert(file->GetFileId() != nativeFlobs || !file->IsTemp()); char* buffer = new char[src.size]; getData(src, buffer, 0, src.size); SmiSize written; bool ok; #ifdef THREAD_SAFE {boost::lock_guard guard(omtx); #endif ok = file->Write(recordId, buffer, src.size, offset, written); #ifdef THREAD_SAFE } #endif assert(ok); delete[] buffer; char mode = file->IsTemp() ? 1 : 0; FlobId id(file->GetFileId(), recordId, offset, mode); result.id = id; result.size = src.size; if (result.dataPointer) { free(result.dataPointer); result.dataPointer = 0; } __TRACE_LEAVE__ return true; } /* ~saveTo~ Saves a Flob into a specific file. The flob is saved into a new created record in the file at offset 0. By saving the old Flob, a new Flob is created which is the result of this function. Must be changed to support real large Flobs */ bool FlobManager::saveTo( const Flob &src, // flob to save const SmiFileId fileId, // target file const char mode, // environment Flob &result) { // result __TRACE_ENTER__ assert(fileId != nativeFlobs); assert((mode == 0) || (mode == 1)); SmiRecordFile* file = getFile(fileId, mode); __TRACE_LEAVE__ return saveTo(src, file, result); } bool FlobManager::saveTo( const Flob &src, // flob to save SmiRecordFile* file, // target file Flob &result) { // result __TRACE_ENTER__ assert(file->GetFileId() != nativeFlobs || !file->IsTemp()); SmiRecordId recId; SmiRecord rec; char mode; #ifdef THREAD_SAFE {boost::lock_guard guard(omtx); #endif if (!file->AppendRecord(recId, rec)) { __TRACE_LEAVE__ return false; } mode = file->IsTemp() ? 1 : 0; if (src.size == 0) { // empty Flob rec.Finish(); FlobId fid(file->GetFileId(), recId, 0, mode); result.id = fid; result.size = src.size; __TRACE_LEAVE__ return true; } #ifdef THREAD_SAFE } #endif // write data char* buffer = new char[src.size + 1]; buffer[src.size] = '\0'; getData(src, buffer, 0, src.size); #ifdef THREAD_SAFE {boost::lock_guard guard(omtx); #endif rec.Write(buffer, src.size, 0); delete[] buffer; rec.Finish(); #ifdef THREAD_SAFE } #endif FlobId fid(file->GetFileId(), recId, 0, mode); result.id = fid; result.size = src.size; if (result.dataPointer) { free(result.dataPointer); result.dataPointer = 0; } __TRACE_LEAVE__ return true; } /* ~putData~ Puts data into a flob. */ bool FlobManager::putData(Flob &dest, // destination flob const char* buffer, // source buffer const SmiSize &targetoffset, // offset within the Flob const SmiSize &length, const bool ignoreCache) { // data size __TRACE_ENTER__ if (dest.dataPointer) { makeControllable(dest); } assert(!dest.id.isDestroyed()); FlobId id = dest.id; SmiFileId fileId = id.fileId; SmiRecordId recordId = id.recordId; SmiSize offset = id.offset; assert(targetoffset + length <= dest.size); // do not allow putting data into non berkeley db flobs assert((id.mode == 0) || (id.mode == 1)); bool isTemp = id.mode == 1; if (!ignoreCache) { if (fileId != nativeFlobs || !isTemp) { std::cerr << "Warning: Manipulate a persistent Flob" << std::endl; #ifdef THREAD_SAFE boost::lock_guard guard(pcmtx); #endif bool res = persistentFlobCache->putData(dest, buffer, targetoffset, length); __TRACE_LEAVE__ return res; } else { #ifdef THREAD_SAFE boost::lock_guard guard(ncmtx); #endif bool res = nativeFlobCache->putData(dest, buffer, targetoffset,length); __TRACE_LEAVE__ return res; } } // put data to disk SmiRecordFile* file = getFile(fileId, id.mode); SmiSize written; bool ok; #ifdef THREAD_SAFE {boost::lock_guard guard(omtx); #endif ok = file->Write(recordId, buffer, length, offset + targetoffset, written); #ifdef THREAD_SAFE } #endif assert(ok); __TRACE_LEAVE__ return true; } void FlobManager::changeMode(Flob* flob, const char mode) { flob->id.mode = mode; } bool FlobManager::putData(const FlobId &id, // destination flob const char* buffer, // source buffer const SmiSize &targetoffset, // offset within the Flob const SmiSize &length ) { // data size __TRACE_ENTER__ assert(!id.isDestroyed()); SmiFileId fileId = id.fileId; SmiRecordId recordId = id.recordId; SmiSize offset = id.offset; // avoid putting data into non-berkeley db flobs assert((id.mode == 0) || (id.mode == 1) || (id.mode == 2)); bool ok; SmiSize written; #ifdef THREAD_SAFE {boost::lock_guard guard(omtx); #endif SmiRecordFile* file = getFile(fileId, id.mode); ok = file->Write(recordId, buffer, length, offset + targetoffset, written); #ifdef THREAD_SAFE } #endif assert(ok); __TRACE_LEAVE__ return true; } bool FlobManager::setExFile(Flob &flob, const std::string &flobFile, const SmiSize size, const SmiSize flobOffset) { __TRACE_ENTER__ flob.id.destroy(); assert(flob.id.isDestroyed()); if (!create(size, flob)) { assert(false); __TRACE_LEAVE__ return false; } #ifdef THREAD_SAFE {boost::lock_guard guard(omtx); #endif SmiRecordId recordId = flob.id.recordId; externalFileCache->cacheRecord(recordId, flobFile); #ifdef THREAD_SAFE } #endif flob.id.mode = 2; flob.id.offset = flobOffset; __TRACE_LEAVE__ return true; } bool FlobManager::SwitchToMode1(Flob &flob, const std::string &flobFile, const SmiSize size, const SmiSize flobOffset) { __TRACE_ENTER__ if (flob.id.mode < 2) { __TRACE_LEAVE__ return true; } assert(!flobFile.empty()); assert(flob.id.mode == 2 || flob.id.mode == 3); Flob newFlob(size); //Read data from the disk file std::ifstream* tupleFile = 0; char flobBlock[size]; #ifdef THREAD_SAFE {boost::lock_guard guard(omtx); #endif if (flob.id.mode == 3) { /* In Mode 3, the record id is used to identify the remote DS, it is impossible to cache the file input stream based on its value. Therefore, we use the record id from the newly created flob structure. */ SmiRecordId recordId = newFlob.id.recordId; externalFileCache->cacheRecord(recordId, flobFile, true); tupleFile = externalFileCache->getFile(recordId); } else { // mode 2 SmiRecordId recordId = flob.id.recordId; tupleFile = externalFileCache->getFile(recordId); } tupleFile->seekg(flobOffset, std::ios::beg); tupleFile->read(flobBlock, size); #ifdef THREAD_SAFE } #endif if ((SmiSize) tupleFile->gcount() != size) { std::cerr << "Error!! read " << tupleFile->gcount() << " from " << flobFile << " at " << flobOffset << ", need " << size << std::endl; assert(false); } //Cache data into nativeFlobCache newFlob.write(flobBlock, size); flob = newFlob; __TRACE_LEAVE__ return true; } /* ~create~ Creates a new Flob with a given size which is assigned to a temporal file. Warning: this function does not change the dataPointer. */ bool FlobManager::create(const SmiSize size, Flob &result) { // result flob #ifdef FM_useStats createdFlobs++; #endif __TRACE_ENTER__ SmiRecord rec; SmiRecordId recId; if (size > 536870912) { // 512 MB std::cerr << "Warning: Try to cretae a very big flob , size = " << size << std::endl; //assert(false); } #ifdef THREAD_SAFE {boost::lock_guard guard(omtx); #endif if (!DestroyedFlobs->isEmpty()) { #ifdef FM_useStats reusedFlobs++; #endif result = DestroyedFlobs->pop(); result.size = size; if (!nativeFlobCache->create(result)) { if (size > 0) { resize(result, size, true); } } __TRACE_LEAVE__ return true; } // create a record from the flob to get an id if (!(nativeFlobFile->AppendRecord(recId, rec))) { __TRACE_LEAVE__ return false; } FlobId fid(nativeFlobs, recId, 0, 1); result.id = fid; result.size = size; #ifdef THREAD_SAFE } #endif #ifdef THREAD_SAFE {boost::lock_guard guard(ncmtx); #endif if (!nativeFlobCache->create(result)) { if (size > 0) { resize(result, size, true); } } #ifdef THREAD_SAFE } #endif __TRACE_LEAVE__ return true; } /* ~create~ Creates a new flob with given file and position. File and record must exists. */ bool FlobManager::create(const SmiFileId &fileId, // target file const SmiRecordId &recordId, // target record id const SmiSize &offset, // offset within the record const char mode, const SmiSize &size, Flob &result) { // initial size of the Flob __TRACE_ENTER__ assert((mode == 0) || (mode == 1)); #ifdef FM_useStats createdFlobs++; #endif bool isTemp = mode == 1; assert(fileId != nativeFlobs || !isTemp); SmiRecord record; #ifdef THREAD_SAFE {boost::lock_guard guard(omtx); #endif SmiRecordFile* file = getFile(fileId, mode); if (!file->SelectRecord(recordId, record)) { __TRACE_LEAVE__ return false; } #ifdef THREAD_SAFE } #endif FlobId fid(fileId, recordId, offset, mode); result.id = fid; result.size = size; if (result.dataPointer) { free(result.dataPointer); result.dataPointer = 0; } __TRACE_LEAVE__ return true; } bool FlobManager::copyData(const Flob &src, Flob &dest) { __TRACE_ENTER__ char* buffer = new char[src.size]; if (!getData(src, buffer, 0, src.size)) { delete[] buffer; __TRACE_LEAVE__ return false; } if (dest.size != src.size) { resize(dest, src.size); } if (!putData(dest, buffer, 0, src.size)) { delete[] buffer; __TRACE_LEAVE__ return false; } delete[] buffer; __TRACE_LEAVE__ return true; } /* ~createFrom~ return a Flob with persistent storage allocated and defined elsewhere */ Flob FlobManager::createFrom(const SmiFileId &fid, const SmiRecordId &rid, const SmiSize &offset, const char mode, const SmiSize &size) { //assert(fid!=nativeFlobs); __TRACE_ENTER__ Flob flob; FlobId flob_id(fid, rid, offset, mode); flob.id = flob_id; flob.size = size; flob.dataPointer = 0; __TRACE_LEAVE__ return flob; }; void FlobManager::killNativeFlobs() { __TRACE_ENTER__ bool ok; #ifdef THREAD_SAFE {boost::lock_guard guard(ncmtx); #endif if (nativeFlobCache) { nativeFlobCache->clear(); } ok = nativeFlobFile->ReCreate(); #ifdef THREAD_SAFE } {boost::lock_guard guard(omtx); #endif DestroyedFlobs->makeEmpty(); #ifdef THREAD_SAFE } #endif assert(ok); __TRACE_LEAVE__ } /* ~constructor~ Because Flobmanager is a singleton, the constructor should only be used by the FlobManager class itself. */ FlobManager::FlobManager() : openFiles() { __TRACE_ENTER__ assert(instance == 0); // the constructor should only called one time // construct the temporarly FlobFile // not fixed size, dummy, temporarly #ifdef THREAD_SAFE {boost::lock_guard guard(ncmtx); #endif nativeFlobFile = new SmiRecordFile(false, 0, true); bool created = nativeFlobFile->Create("NativeFlobFile", "Default"); // bool created = nativeFlobFile->Create(); assert(created); nativeFlobs = nativeFlobFile->GetFileId(); nativeFlobCache = new NativeFlobCache(NATIVE_CACHE_MAXSIZE, NATIVE_CACHE_SLOTSIZE, NATIVE_CACHE_AVGSIZE); #ifdef THREAD_SAFE } {boost::lock_guard guard(pcmtx); #endif persistentFlobCache = new PersistentFlobCache(PERSISTENT_CACHE_MAXSIZE, PERSISTENT_CACHE_SLOTSIZE, PERSISTENT_CACHE_AVGSIZE); #ifdef THREAD_SAFE } {boost::lock_guard guard(omtx); #endif DestroyedFlobs = new Stack(); externalFileCache = new ExternalFileCache(FILEID_CACHE_MAXSIZE); #ifdef THREAD_SAFE } #endif __TRACE_LEAVE__ } void FlobManager::SetNativeCache(const size_t maxSize, const size_t slotSize, const size_t avgSize) { __TRACE_ENTER__ assert(FlobManager::instance == 0); NATIVE_CACHE_MAXSIZE = maxSize; NATIVE_CACHE_SLOTSIZE = slotSize; NATIVE_CACHE_AVGSIZE = avgSize; __TRACE_LEAVE__ } void FlobManager::SetPersistentCache(const size_t maxSize, const size_t slotSize, const size_t avgSize) { __TRACE_ENTER__ assert(FlobManager::instance == 0); PERSISTENT_CACHE_MAXSIZE = maxSize; PERSISTENT_CACHE_SLOTSIZE = slotSize; PERSISTENT_CACHE_AVGSIZE = avgSize; __TRACE_LEAVE__ } std::ostream &operator<<(std::ostream &os, const FlobId &fid) { return fid.print(os); } std::ostream &operator<<(std::ostream &os, const Flob &f) { return f.print(os); } std::ostream &operator<<(std::ostream &o, const NativeCacheEntry entry) { return entry.print(o); }