Files
secondo/Tools/Flob/FlobManager.cpp

1431 lines
33 KiB
C++
Raw Permalink Normal View History

2026-01-23 17:03:45 +08:00
/*
1 Class FlobManager
The only class using the FlobManager class is Flob. For this reason,
all functions are declared as private and Flob is declared to be a friend
of that class.
*/
#include "SecondoSMI.h"
#include <stdlib.h>
#include <utility>
#include "Flob.h"
#include "assert.h"
#include <iostream>
#include "NativeFlobCache.h"
#include "PersistentFlobCache.h"
#include "Stack.h"
#include <fstream>
#include "ExternalFileCache.h"
#include "WinUnix.h"
#ifdef THREAD_SAFE
#include <boost/thread.hpp>
#endif
#undef __TRACE_ENTER__
#undef __TRACE_LEAVE__
#define __TRACE_ENTER__
#define __TRACE_LEAVE__
/*
#define __TRACE_ENTER__ std::cerr << "Enter : " << \
__PRETTY_FUNCTION__ << std::endl;
#define __TRACE_LEAVE__ std::cerr << "Leave : " << \
__PRETTY_FUNCTION__ << "@" << __LINE__ << std::endl;
*/
FlobManager* FlobManager::instance = 0;
static NativeFlobCache* nativeFlobCache = 0;
static PersistentFlobCache* persistentFlobCache = 0;
static Stack<Flob>* DestroyedFlobs = 0;
static ExternalFileCache* externalFileCache = 0;
// some value for cache sizes
static size_t NATIVE_CACHE_MAXSIZE = 64 * 1024 * 1024;
static size_t NATIVE_CACHE_SLOTSIZE = 16 * 1024 * 1024;
static size_t NATIVE_CACHE_AVGSIZE = 512;
static size_t PERSISTENT_CACHE_MAXSIZE = 64 * 1024 * 1024;
static size_t PERSISTENT_CACHE_SLOTSIZE = 16 * 1024 * 1024;
static size_t PERSISTENT_CACHE_AVGSIZE = 512;
static size_t FILEID_CACHE_MAXSIZE = 64 * 1024 * 1024;
/*
The FlobManager is realized to be a singleton. So, not the contructor
but this getInstance function is to call to get the only one
FlobManager instance.
*/
#define FM_useStats
#ifdef FM_useStats
static size_t createdFlobs;
static size_t reusedFlobs;
#endif
void FlobManager::printStatistics(std::ostream &out) const {
#ifndef FM_useStats
out << "no statistics available" << std::endl;
#else
out << "created Flobs" << createdFlobs << std::endl;
out << "reusedFlobs " << reusedFlobs << std::endl;
#endif
}
void FlobManager::resetStatistics() const {
#ifdef FM_useStats
createdFlobs = 0;
reusedFlobs = 0;
#endif
}
FlobManager &FlobManager::getInstance() {
__TRACE_ENTER__
if (!instance) {
instance = new FlobManager();
instance->resetStatistics();
}
__TRACE_LEAVE__
return *instance;
}
bool FlobManager::destroyInstance() {
__TRACE_ENTER__
if (!instance) {
__TRACE_LEAVE__
return false;
} else {
delete instance;
instance = 0;
__TRACE_LEAVE__
return true;
}
}
/*
~Destructor~
The destructor should never be called from outside. Instead the destroyInstance
function should be called. By calling the destructor, all open files are closed
and the nativFlobFile is deleted.
*/
FlobManager::~FlobManager() {
__TRACE_ENTER__
if (nativeFlobCache) {
delete nativeFlobCache;
nativeFlobCache = 0;
}
if (!nativeFlobFile->Close()) {
std::cerr << "Problem in closing nativeFlobFile" << std::endl;
}
if (!nativeFlobFile->Remove()) {
std::cerr << "Problem in deleting nativeFlobFile" << std::endl;
}
delete nativeFlobFile;
nativeFlobFile = 0;
// close all files stored in Map
std::map<std::pair<SmiFileId, bool>, SmiRecordFile*>::iterator iter;
for (iter = openFiles.begin(); iter != openFiles.end(); iter++) {
if (iter->first.first != nativeFlobs && iter->first.second) {
iter->second->Close();
delete iter->second;
}
}
nativeFlobs = 0;
openFiles.clear();
// kill persistent Flob cache
if (persistentFlobCache) {
delete persistentFlobCache;
persistentFlobCache = 0;
}
DestroyedFlobs->makeEmpty();
delete DestroyedFlobs;
DestroyedFlobs = 0;
if (externalFileCache) {
delete externalFileCache;
externalFileCache = 0;
}
__TRACE_LEAVE__
}
SmiRecordFile* FlobManager::getFile(const SmiFileId &fileId, const char mode) {
__TRACE_ENTER__
assert((mode == 0) || (mode == 1) || (mode == 2));
bool isTemp = (mode != 0);
if (fileId == nativeFlobs && isTemp) {
__TRACE_LEAVE__
return nativeFlobFile;
}
SmiRecordFile* file(0);
#ifdef THREAD_SAFE
boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
std::pair<SmiFileId, bool> finder(fileId, isTemp);
std::map<std::pair<SmiFileId, bool>, SmiRecordFile*>::iterator it =
openFiles.find(finder);
if (it == openFiles.end()) { // file not found
file = new SmiRecordFile(false, 0, isTemp);
openFiles[finder] = file;
bool openOk = file->Open(fileId);
assert(openOk);
} else {
file = it->second;
}
__TRACE_LEAVE__
return file;
}
/*
~dropfile~
If Flob data is stored into a file, the flobmanager will
create a new File from the file id and keep the open file.
If the file should be deleted or manipulated by other code,
the flobmanger has to give up the control over that file.
This can be realized by calling the ~dropFile~ function.
*/
bool FlobManager::dropFile(const SmiFileId &id, const char mode) {
__TRACE_ENTER__
if ((mode == 2) || (mode == 3)) {
__TRACE_LEAVE__
return false; // never drop non berkeley db files
}
bool isTemp = mode == 1;
if (isTemp &&
id == nativeFlobs) { //never give up the control of native flobs
__TRACE_LEAVE__
return false;
}
#ifdef THREAD_SAFE
boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
std::pair<SmiFileId, bool> finder(id, isTemp);
std::map<std::pair<SmiFileId, bool>, SmiRecordFile*>::iterator it =
openFiles.find(finder);
if (it != openFiles.end()) {
SmiRecordFile* file;
file = it->second;
file->Close();
delete file;
openFiles.erase(it);
__TRACE_LEAVE__
__TRACE_LEAVE__
return true;
} else {
return false; // file not handled by fm
__TRACE_LEAVE__
}
__TRACE_LEAVE__
}
bool FlobManager::dropFiles() {
__TRACE_ENTER__
#ifdef THREAD_SAFE
boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
std::map<std::pair<SmiFileId, bool>, SmiRecordFile*>::iterator it =
openFiles.begin();
int count = 0;
while (it != openFiles.end()) {
if (it->first.first != nativeFlobs || !it->first.second) {
it->second->Close();
delete it->second;
count++;
}
it++;
}
openFiles.clear();
__TRACE_LEAVE__
return count > 0;
}
void FlobManager::clearCaches() {
__TRACE_ENTER__
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(ncmtx);
#endif
if (nativeFlobCache) {
nativeFlobCache->clear();
}
#ifdef THREAD_SAFE
}
{boost::lock_guard<boost::recursive_mutex> guard(pcmtx);
#endif
if (persistentFlobCache) {
persistentFlobCache->clear();
}
#ifdef THREAD_SAFE
}
#endif
__TRACE_LEAVE__
}
bool FlobManager::makeControllable(Flob &flob) {
__TRACE_ENTER__
if (flob.dataPointer) {
assert(flob.id.isDestroyed());
// found an evil flob, create a nice one from it
char* dp = flob.dataPointer;
flob.dataPointer = 0;
if (!create(flob.size, flob)) {
assert(false);
__TRACE_LEAVE__
return false;
}
if (!putData(flob, dp, 0, flob.size, false)) {
assert(false);
__TRACE_LEAVE__
return false;
}
free(dp);
}
__TRACE_LEAVE__
return true;
}
void FlobManager::createFromBlock(Flob &result, const char* buffer,
const SmiSize &size, const bool autodestroy) {
__TRACE_ENTER__
if (autodestroy) {
result.id.destroy();
}
assert(result.id.isDestroyed());
assert(result.dataPointer == 0);
result.dataPointer = (char*) malloc(size);
result.size = size;
memcpy(result.dataPointer, buffer, size);
__TRACE_LEAVE__
}
/*
~resize~
Changes the size of flob.
*/
bool FlobManager::resize(Flob &flob, const SmiSize &newSize,
const bool ignoreCache/*=false*/) {
__TRACE_ENTER__
if (flob.dataPointer) {
makeControllable(flob);
}
if (newSize == flob.size) {
__TRACE_LEAVE__
return true;
}
assert(!flob.id.isDestroyed());
FlobId id = flob.id;
assert((id.mode == 0) || (id.mode == 1)); // don't allow resizing of
// non berkeley db flobs
SmiFileId fileId = id.fileId;
SmiRecordId recordId = id.recordId;
SmiSize offset = id.offset;
bool isTemp = id.mode == 1;
if (!isTemp || (fileId != nativeFlobs)) {
// the allocated memory for the slot may be too small now
std::cerr << "Warning: Resize a persistent Flob" << std::endl;
#ifdef THREAD_SAFE
boost::lock_guard<boost::recursive_mutex> guard(pcmtx);
#endif
persistentFlobCache->killLastSlot(flob);
}
if (isTemp && (fileId == nativeFlobs) && !ignoreCache) {
#ifdef THREAD_SAFE
boost::lock_guard<boost::recursive_mutex> guard(ncmtx);
#endif
bool res = nativeFlobCache->resize(flob, newSize);
__TRACE_LEAVE__
return res;
}
// resize the record containing the flob
SmiRecord record;
SmiRecordFile* file = getFile(fileId, id.mode);
bool ok;
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
ok = file->SelectRecord(recordId, record, SmiFile::Update);
#ifdef THREAD_SAFE
}
#endif
if (!ok) {
std::cerr << __PRETTY_FUNCTION__ << "@" << __LINE__
<< "Select Record failed:" << flob << std::endl;
assert(false);
__TRACE_LEAVE__
return false;
}
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
if (record.Size() != newSize) {
if (record.Resize(offset + newSize)) {
record.Finish();
flob.size = newSize;
__TRACE_LEAVE__
return true;
}
} else {
__TRACE_LEAVE__
return true;
}
#ifdef THREAD_SAFE
}
#endif
std::cerr << "Resize failed" << std::endl;
__TRACE_LEAVE__
return false;
}
/*
~getData~
The getData function retrieves Data from a Flob.
The __dest__ buffer must be provided by the caller. The requested
content is copyied
to that buffer.
*/
bool FlobManager::getData(
const Flob &flob, // Flob containing the data
char* dest, // destination buffer
const SmiSize &offset, // offset within the Flob
const SmiSize &size,
const bool ignoreCache /* = false*/ ) { // requested data size
__TRACE_ENTER__
assert(offset + size <= flob.size);
if (size == 0) {
__TRACE_LEAVE__
return true;
}
if (flob.dataPointer) {
memcpy(dest, flob.dataPointer + offset, size);
__TRACE_LEAVE__
return true;
}
assert(!flob.id.isDestroyed());
FlobId id = flob.id;
assert((id.mode == 0) || (id.mode == 1) || (id.mode == 2));
if (id.mode < 2) {
SmiFileId fileId = id.fileId;
bool isTemp = id.mode == 1;
if (!ignoreCache) {
if (fileId != nativeFlobs || !isTemp) {
#ifdef THREAD_SAFE
boost::lock_guard<boost::recursive_mutex> guard(pcmtx);
#endif
bool res = persistentFlobCache->getData(flob, dest, offset, size);
__TRACE_LEAVE__
return res;
} else {
#ifdef THREAD_SAFE
boost::lock_guard<boost::recursive_mutex> guard(ncmtx);
#endif
bool res = nativeFlobCache->getData(flob, dest, offset, size);
__TRACE_LEAVE__
return res;
}
}
// retrieve data from disk
SmiRecordId recordId = id.recordId;
SmiSize floboffset = id.offset;
SmiRecord record;
SmiRecordFile* file = getFile(fileId, id.mode);
SmiSize recOffset = floboffset + offset;
SmiSize actRead;
bool ok;
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
ok = file->Read(recordId, dest, size, recOffset, actRead);
#ifdef THREAD_SAFE
}
#endif
if (!ok) {
std::cerr << " error in getting data from flob " << flob << std::endl;
std::cerr << " actSize = " << actRead << std::endl;
std::cerr << "try to read = " << size << std::endl;
std::string err;
SmiEnvironment::GetLastErrorCode(err);
std::cerr << " err " << err << std::endl;
}
assert(ok);
if (actRead != size && file == nativeFlobFile) {
__TRACE_LEAVE__
return true;
}
//assert(actRead == size);
} else if (id.mode == 2) {
// retrieve data from external disk file
SmiRecordId recordId = id.recordId;
SmiSize flobOffset = id.offset;
SmiSize recOffset = flobOffset + offset;
SmiSize actRead;
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
std::ifstream* tupleFile = externalFileCache->getFile(recordId);
/*
Each time read part data within the Flob, decided by the recOffset and the size
*/
tupleFile->seekg(recOffset, std::ios::beg);
tupleFile->read(dest, size);
SmiSize curr = tupleFile->tellg();
actRead = curr - recOffset;
#ifdef THREAD_SAFE
}
#endif
if (actRead == size) {
__TRACE_LEAVE__
return true;
} else {
std::cerr << " error in getting data from flob " << flob << std::endl;
std::cerr << " mode = 2" << std::endl;
std::cerr << " flobOffset = " << flobOffset << std::endl;
std::cerr << " actSize = " << actRead << std::endl;
std::cerr << " try to read = " << size << std::endl;
}
} else if (id.mode == 3) {
// todo: something about the PSLocation
}
__TRACE_LEAVE__
return true;
}
/*
~destroy~
Frees all resources occupied by the Flob. After destroying a flob.
No data can be accessed.
*/
bool FlobManager::destroy(Flob &victim) {
__TRACE_ENTER__
if (victim.dataPointer) {
assert(victim.id.isDestroyed()); // should not have a valid id
free(victim.dataPointer);
victim.dataPointer = 0;
__TRACE_LEAVE__
return true;
}
FlobId id = victim.id;
if ((id.mode == 2) || (id.mode == 3)) {
std::cerr << "FlobManager::destroy(Flob& victim)" << std::endl;
// do not destroy flob data within non berkeley db files.
victim.id.destroy();
__TRACE_LEAVE__
return true;
}
assert(!victim.id.isDestroyed());
bool isTemp = id.mode == 1;
if (victim.id.fileId == nativeFlobs && isTemp) {
/*
std::cout << "Destroy native Flob " << victim.id << std::endl;
char* buffer = new char[2048];
WinUnix::stacktrace("SecondoBDB", buffer);
std::cout << "Stacktrace : " << buffer << std::endl << std::endl;
delete[] buffer;
*/
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(ncmtx);
#endif
nativeFlobCache->erase(victim); // delete from cache
#ifdef THREAD_SAFE
}
{boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
DestroyedFlobs->push(victim);
#ifdef THREAD_SAFE
}
#endif
// if(DestroyedFlobs->getSize() > 64000){
// cerr << "Stack of destroyed flobs reaches a critical size " << endl;
// }
victim.id.destroy();
__TRACE_LEAVE__
return true;
}
SmiSize size = victim.getSize();
SmiRecordId recordId = id.recordId;
SmiSize offset = id.offset;
// possible kill flob from persistent flob cache
// or wait until cache is removed automatically = current state
SmiRecordFile* file = getFile(id.fileId, id.mode);
bool ok;
SmiRecord record;
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
ok = file->SelectRecord(recordId, record, SmiFile::Update);
#ifdef THREAD_SAFE
}
#endif
if (!ok) { // record not found in file
std::cerr << __PRETTY_FUNCTION__ << "@" << __LINE__
<< "Select Record failed:" << victim << std::endl;
assert(false);
__TRACE_LEAVE__
victim.id.destroy();
return false;
}
if (id.fileId == nativeFlobs && isTemp) {
// each native flob is exlusive owner of an record
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard1(ncmtx);
boost::lock_guard<boost::recursive_mutex> guard2(omtx);
#endif
nativeFlobCache->erase(victim);
file->DeleteRecord(recordId);
#ifdef THREAD_SAFE
}
#endif
victim.id.destroy();
__TRACE_LEAVE__
return true;
}
// check whether the flob occupies the whole record
SmiSize recordSize = record.Size();
if (offset != 0) { // flob doesn't start at the begin of the record
if (offset + size != recordSize) {
std::cout << "cannot destroy flob, because after the flob data are"
" available" << std::endl;
victim.id.destroy();
__TRACE_LEAVE__
return false;
} else { // truncate record
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
record.Truncate(offset);
record.Finish();
#ifdef THREAD_SAFE
}
#endif
}
} else {
if ((recordSize != size) && (id.fileId != nativeFlobs)) {
std::cout << "cannot destroy flob, because after the flob data are"
" available" << std::endl;
victim.id.destroy();
__TRACE_LEAVE__
return false;
} else {
// record stores only the flob
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
file->DeleteRecord(recordId);
#ifdef THREAD_SAFE
}
#endif
}
}
victim.id.destroy();
__TRACE_LEAVE__
return true;
}
bool FlobManager::destroyIfNonPersistent(Flob &victim) {
__TRACE_ENTER__
if (victim.id.fileId == nativeFlobs && victim.id.mode == 1) {
__TRACE_LEAVE__
return destroy(victim);
}
__TRACE_LEAVE__
return true;
}
/*
~saveTo~
Save the content of a flob into a file at a specific position (given
as recordId and offset). This will result in another Flob which is
returned as the result of this function. A record with the
corresponding id must already exist in the file.
Initial implementation, should be changed to support Flobs larger than the
available main memory.
*/
bool FlobManager::saveTo(const Flob &src, // Flob to save
const SmiFileId fileId, // target file id
const SmiRecordId recordId, // target record id
const SmiSize offset,
const char mode,
Flob &result) { // offset within the record
__TRACE_ENTER__
assert(fileId != nativeFlobs);
assert((mode == 0) || (mode == 1)); // no non-berkeley files
SmiRecordFile* file = getFile(fileId, mode);
__TRACE_LEAVE__
return saveTo(src, file, recordId, offset, result);
}
/*
~saveTo~
Save the content of a flob into a file at a specific position (given
as recordId and offset). This will result in another Flob which is
returned as the result of this function. A record with the corresponding
id must already exist in the file.
Initial implementation, should be changed to support Flobs larger than the
available main memory.
*/
bool FlobManager::saveTo(const Flob &src, // Flob to save
SmiRecordFile* file, // target file
const SmiRecordId &recordId, // target record id
const SmiSize &offset,
Flob &result) { // offset within the record
__TRACE_ENTER__
//if(src.size==0){
// __TRACE_LEAVE__
// return false;
//}
assert(file->GetFileId() != nativeFlobs || !file->IsTemp());
char* buffer = new char[src.size];
getData(src, buffer, 0, src.size);
SmiSize written;
bool ok;
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
ok = file->Write(recordId, buffer, src.size, offset, written);
#ifdef THREAD_SAFE
}
#endif
assert(ok);
delete[] buffer;
char mode = file->IsTemp() ? 1 : 0;
FlobId id(file->GetFileId(), recordId, offset, mode);
result.id = id;
result.size = src.size;
if (result.dataPointer) {
free(result.dataPointer);
result.dataPointer = 0;
}
__TRACE_LEAVE__
return true;
}
/*
~saveTo~
Saves a Flob into a specific file. The flob is saved into a new created record
in the file at offset 0. By saving the old Flob, a new Flob is created which
is the result of this function.
Must be changed to support real large Flobs
*/
bool FlobManager::saveTo(
const Flob &src, // flob to save
const SmiFileId fileId, // target file
const char mode, // environment
Flob &result) { // result
__TRACE_ENTER__
assert(fileId != nativeFlobs);
assert((mode == 0) || (mode == 1));
SmiRecordFile* file = getFile(fileId, mode);
__TRACE_LEAVE__
return saveTo(src, file, result);
}
bool FlobManager::saveTo(
const Flob &src, // flob to save
SmiRecordFile* file, // target file
Flob &result) { // result
__TRACE_ENTER__
assert(file->GetFileId() != nativeFlobs || !file->IsTemp());
SmiRecordId recId;
SmiRecord rec;
char mode;
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
if (!file->AppendRecord(recId, rec)) {
__TRACE_LEAVE__
return false;
}
mode = file->IsTemp() ? 1 : 0;
if (src.size == 0) { // empty Flob
rec.Finish();
FlobId fid(file->GetFileId(), recId, 0, mode);
result.id = fid;
result.size = src.size;
__TRACE_LEAVE__
return true;
}
#ifdef THREAD_SAFE
}
#endif
// write data
char* buffer = new char[src.size + 1];
buffer[src.size] = '\0';
getData(src, buffer, 0, src.size);
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
rec.Write(buffer, src.size, 0);
delete[] buffer;
rec.Finish();
#ifdef THREAD_SAFE
}
#endif
FlobId fid(file->GetFileId(), recId, 0, mode);
result.id = fid;
result.size = src.size;
if (result.dataPointer) {
free(result.dataPointer);
result.dataPointer = 0;
}
__TRACE_LEAVE__
return true;
}
/*
~putData~
Puts data into a flob.
*/
bool FlobManager::putData(Flob &dest, // destination flob
const char* buffer, // source buffer
const SmiSize &targetoffset, // offset within the Flob
const SmiSize &length,
const bool ignoreCache) { // data size
__TRACE_ENTER__
if (dest.dataPointer) {
makeControllable(dest);
}
assert(!dest.id.isDestroyed());
FlobId id = dest.id;
SmiFileId fileId = id.fileId;
SmiRecordId recordId = id.recordId;
SmiSize offset = id.offset;
assert(targetoffset + length <= dest.size);
// do not allow putting data into non berkeley db flobs
assert((id.mode == 0) || (id.mode == 1));
bool isTemp = id.mode == 1;
if (!ignoreCache) {
if (fileId != nativeFlobs || !isTemp) {
std::cerr << "Warning: Manipulate a persistent Flob" << std::endl;
#ifdef THREAD_SAFE
boost::lock_guard<boost::recursive_mutex> guard(pcmtx);
#endif
bool res = persistentFlobCache->putData(dest, buffer,
targetoffset, length);
__TRACE_LEAVE__
return res;
} else {
#ifdef THREAD_SAFE
boost::lock_guard<boost::recursive_mutex> guard(ncmtx);
#endif
bool res = nativeFlobCache->putData(dest, buffer, targetoffset,length);
__TRACE_LEAVE__
return res;
}
}
// put data to disk
SmiRecordFile* file = getFile(fileId, id.mode);
SmiSize written;
bool ok;
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
ok = file->Write(recordId, buffer, length,
offset + targetoffset, written);
#ifdef THREAD_SAFE
}
#endif
assert(ok);
__TRACE_LEAVE__
return true;
}
void FlobManager::changeMode(Flob* flob, const char mode) {
flob->id.mode = mode;
}
bool FlobManager::putData(const FlobId &id, // destination flob
const char* buffer, // source buffer
const SmiSize &targetoffset, // offset within the Flob
const SmiSize &length
) { // data size
__TRACE_ENTER__
assert(!id.isDestroyed());
SmiFileId fileId = id.fileId;
SmiRecordId recordId = id.recordId;
SmiSize offset = id.offset;
// avoid putting data into non-berkeley db flobs
assert((id.mode == 0) || (id.mode == 1) || (id.mode == 2));
bool ok;
SmiSize written;
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
SmiRecordFile* file = getFile(fileId, id.mode);
ok = file->Write(recordId, buffer, length,
offset + targetoffset, written);
#ifdef THREAD_SAFE
}
#endif
assert(ok);
__TRACE_LEAVE__
return true;
}
bool FlobManager::setExFile(Flob &flob, const std::string &flobFile,
const SmiSize size, const SmiSize flobOffset) {
__TRACE_ENTER__
flob.id.destroy();
assert(flob.id.isDestroyed());
if (!create(size, flob)) {
assert(false);
__TRACE_LEAVE__
return false;
}
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
SmiRecordId recordId = flob.id.recordId;
externalFileCache->cacheRecord(recordId, flobFile);
#ifdef THREAD_SAFE
}
#endif
flob.id.mode = 2;
flob.id.offset = flobOffset;
__TRACE_LEAVE__
return true;
}
bool FlobManager::SwitchToMode1(Flob &flob, const std::string &flobFile,
const SmiSize size, const SmiSize flobOffset) {
__TRACE_ENTER__
if (flob.id.mode < 2) {
__TRACE_LEAVE__
return true;
}
assert(!flobFile.empty());
assert(flob.id.mode == 2 || flob.id.mode == 3);
Flob newFlob(size);
//Read data from the disk file
std::ifstream* tupleFile = 0;
char flobBlock[size];
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
if (flob.id.mode == 3) {
/*
In Mode 3, the record id is used to identify the remote DS,
it is impossible to cache the file input stream based on its value.
Therefore, we use the record id from the newly created flob structure.
*/
SmiRecordId recordId = newFlob.id.recordId;
externalFileCache->cacheRecord(recordId, flobFile, true);
tupleFile = externalFileCache->getFile(recordId);
} else {
// mode 2
SmiRecordId recordId = flob.id.recordId;
tupleFile = externalFileCache->getFile(recordId);
}
tupleFile->seekg(flobOffset, std::ios::beg);
tupleFile->read(flobBlock, size);
#ifdef THREAD_SAFE
}
#endif
if ((SmiSize) tupleFile->gcount() != size) {
std::cerr << "Error!! read " << tupleFile->gcount()
<< " from " << flobFile
<< " at " << flobOffset
<< ", need " << size << std::endl;
assert(false);
}
//Cache data into nativeFlobCache
newFlob.write(flobBlock, size);
flob = newFlob;
__TRACE_LEAVE__
return true;
}
/*
~create~
Creates a new Flob with a given size which is assigned to a temporal file.
Warning: this function does not change the dataPointer.
*/
bool FlobManager::create(const SmiSize size, Flob &result) { // result flob
#ifdef FM_useStats
createdFlobs++;
#endif
__TRACE_ENTER__
SmiRecord rec;
SmiRecordId recId;
if (size > 536870912) { // 512 MB
std::cerr << "Warning: Try to cretae a very big flob , size = "
<< size << std::endl;
//assert(false);
}
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
if (!DestroyedFlobs->isEmpty()) {
#ifdef FM_useStats
reusedFlobs++;
#endif
result = DestroyedFlobs->pop();
result.size = size;
if (!nativeFlobCache->create(result)) {
if (size > 0) {
resize(result, size, true);
}
}
__TRACE_LEAVE__
return true;
}
// create a record from the flob to get an id
if (!(nativeFlobFile->AppendRecord(recId, rec))) {
__TRACE_LEAVE__
return false;
}
FlobId fid(nativeFlobs, recId, 0, 1);
result.id = fid;
result.size = size;
#ifdef THREAD_SAFE
}
#endif
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(ncmtx);
#endif
if (!nativeFlobCache->create(result)) {
if (size > 0) {
resize(result, size, true);
}
}
#ifdef THREAD_SAFE
}
#endif
__TRACE_LEAVE__
return true;
}
/*
~create~
Creates a new flob with given file and position. File and record
must exists.
*/
bool FlobManager::create(const SmiFileId &fileId, // target file
const SmiRecordId &recordId, // target record id
const SmiSize &offset, // offset within the record
const char mode,
const SmiSize &size,
Flob &result) { // initial size of the Flob
__TRACE_ENTER__
assert((mode == 0) || (mode == 1));
#ifdef FM_useStats
createdFlobs++;
#endif
bool isTemp = mode == 1;
assert(fileId != nativeFlobs || !isTemp);
SmiRecord record;
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
SmiRecordFile* file = getFile(fileId, mode);
if (!file->SelectRecord(recordId, record)) {
__TRACE_LEAVE__
return false;
}
#ifdef THREAD_SAFE
}
#endif
FlobId fid(fileId, recordId, offset, mode);
result.id = fid;
result.size = size;
if (result.dataPointer) {
free(result.dataPointer);
result.dataPointer = 0;
}
__TRACE_LEAVE__
return true;
}
bool FlobManager::copyData(const Flob &src, Flob &dest) {
__TRACE_ENTER__
char* buffer = new char[src.size];
if (!getData(src, buffer, 0, src.size)) {
delete[] buffer;
__TRACE_LEAVE__
return false;
}
if (dest.size != src.size) {
resize(dest, src.size);
}
if (!putData(dest, buffer, 0, src.size)) {
delete[] buffer;
__TRACE_LEAVE__
return false;
}
delete[] buffer;
__TRACE_LEAVE__
return true;
}
/*
~createFrom~
return a Flob with persistent storage allocated and defined elsewhere
*/
Flob FlobManager::createFrom(const SmiFileId &fid,
const SmiRecordId &rid,
const SmiSize &offset,
const char mode,
const SmiSize &size) {
//assert(fid!=nativeFlobs);
__TRACE_ENTER__
Flob flob;
FlobId flob_id(fid, rid, offset, mode);
flob.id = flob_id;
flob.size = size;
flob.dataPointer = 0;
__TRACE_LEAVE__
return flob;
};
void FlobManager::killNativeFlobs() {
__TRACE_ENTER__
bool ok;
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(ncmtx);
#endif
if (nativeFlobCache) {
nativeFlobCache->clear();
}
ok = nativeFlobFile->ReCreate();
#ifdef THREAD_SAFE
}
{boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
DestroyedFlobs->makeEmpty();
#ifdef THREAD_SAFE
}
#endif
assert(ok);
__TRACE_LEAVE__
}
/*
~constructor~
Because Flobmanager is a singleton, the constructor should only be used
by the FlobManager class itself.
*/
FlobManager::FlobManager() : openFiles() {
__TRACE_ENTER__
assert(instance == 0); // the constructor should only called one time
// construct the temporarly FlobFile
// not fixed size, dummy, temporarly
#ifdef THREAD_SAFE
{boost::lock_guard<boost::recursive_mutex> guard(ncmtx);
#endif
nativeFlobFile = new SmiRecordFile(false, 0, true);
bool created = nativeFlobFile->Create("NativeFlobFile", "Default");
// bool created = nativeFlobFile->Create();
assert(created);
nativeFlobs = nativeFlobFile->GetFileId();
nativeFlobCache = new NativeFlobCache(NATIVE_CACHE_MAXSIZE,
NATIVE_CACHE_SLOTSIZE,
NATIVE_CACHE_AVGSIZE);
#ifdef THREAD_SAFE
}
{boost::lock_guard<boost::recursive_mutex> guard(pcmtx);
#endif
persistentFlobCache = new PersistentFlobCache(PERSISTENT_CACHE_MAXSIZE,
PERSISTENT_CACHE_SLOTSIZE,
PERSISTENT_CACHE_AVGSIZE);
#ifdef THREAD_SAFE
}
{boost::lock_guard<boost::recursive_mutex> guard(omtx);
#endif
DestroyedFlobs = new Stack<Flob>();
externalFileCache = new ExternalFileCache(FILEID_CACHE_MAXSIZE);
#ifdef THREAD_SAFE
}
#endif
__TRACE_LEAVE__
}
void FlobManager::SetNativeCache(const size_t maxSize,
const size_t slotSize,
const size_t avgSize) {
__TRACE_ENTER__
assert(FlobManager::instance == 0);
NATIVE_CACHE_MAXSIZE = maxSize;
NATIVE_CACHE_SLOTSIZE = slotSize;
NATIVE_CACHE_AVGSIZE = avgSize;
__TRACE_LEAVE__
}
void FlobManager::SetPersistentCache(const size_t maxSize,
const size_t slotSize,
const size_t avgSize) {
__TRACE_ENTER__
assert(FlobManager::instance == 0);
PERSISTENT_CACHE_MAXSIZE = maxSize;
PERSISTENT_CACHE_SLOTSIZE = slotSize;
PERSISTENT_CACHE_AVGSIZE = avgSize;
__TRACE_LEAVE__
}
std::ostream &operator<<(std::ostream &os, const FlobId &fid) {
return fid.print(os);
}
std::ostream &operator<<(std::ostream &os, const Flob &f) {
return f.print(os);
}
std::ostream &operator<<(std::ostream &o, const NativeCacheEntry entry) {
return entry.print(o);
}