753 lines
16 KiB
C++
753 lines
16 KiB
C++
/*
|
|
----
|
|
This file is part of SECONDO.
|
|
|
|
Copyright (C) 2014, University in Hagen, Faculty of Mathematics and
|
|
Computer Science, Database Systems for New Applications.
|
|
|
|
SECONDO is free software; you can redistribute it and/or modify
|
|
it under the terms of the GNU General Public License as published
|
|
by
|
|
the Free Software Foundation; either version 2 of the License, or
|
|
(at your option) any later version.
|
|
|
|
SECONDO is distributed in the hope that it will be useful,
|
|
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
|
GNU General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with SECONDO; if not, write to the Free Software
|
|
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
|
|
----
|
|
|
|
Jan Kristof Nidzwetzki
|
|
|
|
//paragraph [1] Title: [{\Large \bf \begin{center}] [\end{center}}]
|
|
//paragraph [10] Footnote: [{\footnote{] [}}]
|
|
//[TOC] [\tableofcontents]
|
|
|
|
[TOC]
|
|
|
|
0 Overview
|
|
|
|
This file contains an abstraction layer for the cassandra algebra. The
|
|
cassandra algebra does not communicate with cassandra directly. The algebra
|
|
will only use the functions defined here.
|
|
|
|
In addition, this file contains some helper classes: E.g. for representing
|
|
token-ranges or parsing consistency levels.
|
|
|
|
1 Includes and defines
|
|
|
|
*/
|
|
|
|
#ifndef _CASSANDRA_H
|
|
#define _CASSANDRA_H
|
|
|
|
#include <stdio.h>
|
|
#include <string.h>
|
|
#include <stdlib.h>
|
|
#include <iostream>
|
|
#include <map>
|
|
#include <stdlib.h>
|
|
#include <cassert>
|
|
#include <cassandra.h>
|
|
#include <algorithm>
|
|
#include <limits.h>
|
|
#include <vector>
|
|
|
|
#include "CassandraTuplePrefetcher.h"
|
|
#include "CassandraHelper.h"
|
|
#include "CassandraResult.h"
|
|
|
|
using namespace std;
|
|
|
|
//namespace to avoid name conflicts
|
|
namespace cassandra {
|
|
|
|
// Prototype classes
|
|
class CassandraResult;
|
|
class CassandraToken;
|
|
class CassandraTuplePrefetcher;
|
|
|
|
/*
|
|
2.4 Helper Class Token Interval
|
|
|
|
*/
|
|
|
|
class TokenRange {
|
|
|
|
public:
|
|
|
|
/*
|
|
2.4.1 Construct a new token interval
|
|
|
|
*/
|
|
TokenRange(long long myStart, long long myEnd, string myIp) :
|
|
start(myStart), end(myEnd), ip(myIp) {}
|
|
|
|
/*
|
|
2.4.1 Construct a new token interval
|
|
|
|
*/
|
|
TokenRange(long long myStart, long long myEnd,
|
|
string myIp, string myQueryuuid) :
|
|
start(myStart), end(myEnd), ip(myIp), queryuuid(myQueryuuid) {}
|
|
|
|
/*
|
|
2.4.1 Copy constructor
|
|
|
|
*/
|
|
TokenRange(const TokenRange &ref) {
|
|
start = ref.getStart();
|
|
end = ref.getEnd();
|
|
ip = ref.getIp();
|
|
queryuuid = ref.getQueryUUID();
|
|
}
|
|
|
|
/*
|
|
2.4.2 Get interval start
|
|
|
|
*/
|
|
long long getStart() const {
|
|
return start;
|
|
}
|
|
|
|
/*
|
|
2.4.3 Get interval end
|
|
|
|
*/
|
|
long long getEnd() const {
|
|
return end;
|
|
}
|
|
|
|
/*
|
|
2.4.4 Get interval end
|
|
|
|
*/
|
|
string getIp() const {
|
|
return ip;
|
|
}
|
|
|
|
/*
|
|
2.4.5 Get interval end
|
|
|
|
*/
|
|
string getQueryUUID() const {
|
|
return queryuuid;
|
|
}
|
|
|
|
/*
|
|
2.4.6 Is this a local interval?
|
|
|
|
*/
|
|
bool isLocalTokenRange(string myIp = "127.0.0.1") const {
|
|
return (ip.compare(myIp) == 0);
|
|
}
|
|
|
|
/*
|
|
2.4.7 Get interval end
|
|
|
|
*/
|
|
long long getSize() {
|
|
if(start >= 0 && end >= 0) {
|
|
return end - start;
|
|
}
|
|
|
|
if(start <= 0 && end <= 0) {
|
|
return llabs(start - end);
|
|
}
|
|
|
|
return llabs(start) + llabs(end);
|
|
}
|
|
|
|
/*
|
|
2.4.8 Operator <
|
|
|
|
*/
|
|
bool operator<( const TokenRange& val ) const {
|
|
return start < val.getStart();
|
|
}
|
|
|
|
/*
|
|
2.4.9 Operator >
|
|
|
|
*/
|
|
bool operator>( const TokenRange& val ) const {
|
|
return start > val.getStart();
|
|
}
|
|
|
|
/*
|
|
2.4.10 Operator ==
|
|
|
|
*/
|
|
inline bool operator== (const TokenRange &interval) {
|
|
|
|
if((interval.getStart() == getStart()) &&
|
|
(interval.getEnd() == getEnd())) {
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
private:
|
|
// Interval start
|
|
long long start;
|
|
|
|
// Interval end
|
|
long long end;
|
|
|
|
// Ip assigned to this interval
|
|
string ip;
|
|
|
|
// QueryUUID that processed this token range
|
|
string queryuuid;
|
|
};
|
|
|
|
/*
|
|
2.4.4 Implementation for "toString"
|
|
|
|
*/
|
|
inline std::ostream& operator<<(std::ostream &strm,
|
|
const cassandra::TokenRange &tokenInterval) {
|
|
|
|
return strm << "TokenRange[" << tokenInterval.getStart()
|
|
<< ";" << tokenInterval.getEnd() << "; "
|
|
<< "ip=" << tokenInterval.getIp() << "]";
|
|
}
|
|
|
|
/*
|
|
2.4 CassandraQuery wrapper class
|
|
|
|
*/
|
|
class CassandraQuery {
|
|
public:
|
|
CassandraQuery(size_t myQueryId, string &myQuery, time_t myVersion)
|
|
: queryId(myQueryId), query(myQuery), version(myVersion) {
|
|
|
|
}
|
|
|
|
size_t getQueryId() {
|
|
return queryId;
|
|
}
|
|
|
|
string getQuery() {
|
|
return query;
|
|
}
|
|
|
|
time_t getVersion() {
|
|
return version;
|
|
}
|
|
|
|
private:
|
|
size_t queryId;
|
|
string query;
|
|
time_t version;
|
|
};
|
|
|
|
/*
|
|
2.2.1 Helper functions
|
|
|
|
*/
|
|
struct QueryComperator {
|
|
bool operator()(CassandraQuery a, CassandraQuery b) const {
|
|
return (a.getQueryId() < b.getQueryId());
|
|
}
|
|
};
|
|
|
|
/*
|
|
2.3 Adapter for cassandra
|
|
|
|
*/
|
|
class CassandraAdapter {
|
|
|
|
public:
|
|
|
|
/*
|
|
2.3.0 Static variables
|
|
|
|
|
|
*/
|
|
static const string METADATA_TUPLETYPE;
|
|
|
|
|
|
/*
|
|
2.3.1 Constructor
|
|
|
|
1. Parameter the contactpoint of the cassadra cluster
|
|
2. Parameter the keyspace to use (e.g. secondo)
|
|
|
|
*/
|
|
CassandraAdapter(string myContactpoint, string myKeyspace)
|
|
: contactpoint(myContactpoint), keyspace(myKeyspace),
|
|
errorFlag(false) {
|
|
|
|
}
|
|
|
|
virtual ~CassandraAdapter() {
|
|
disconnect();
|
|
}
|
|
|
|
/*
|
|
2.3.2 Open a connection to the cassandra cluster. If the 1st parameter
|
|
is set to ~true~, the load balancing feature of the driver will be
|
|
disabled. Only connections to the specified cassandra node will
|
|
be established. If set to ~false~, the loadbalancing feature
|
|
of the driver will be enabled.
|
|
|
|
*/
|
|
void connect(bool singleNodeLoadBalancing);
|
|
|
|
/*
|
|
2.3.2 Get error flag
|
|
|
|
*/
|
|
bool getErrorFlag() {
|
|
return errorFlag;
|
|
}
|
|
|
|
/*
|
|
2.3.2 Clear error flag
|
|
|
|
*/
|
|
void clearErrorFlag() {
|
|
errorFlag = false;
|
|
}
|
|
|
|
/*
|
|
2.3.3 Write a tuple to the cluster
|
|
|
|
1. Parameter is the name of the relation (e.g. plz)
|
|
2. Parameter is the partiton value of the tuple
|
|
3. Parameter is the name of the node
|
|
4. Parameter is the unique key of the data
|
|
5. Parameter is the data
|
|
6. Parameter is the consistence level used for writing
|
|
7. Parameter specifies to use synchronus or asynchronus writes
|
|
|
|
*/
|
|
bool writeDataToCassandra(string relation,
|
|
string partition, string node, string key, string value,
|
|
string consistenceLevel, bool sync);
|
|
|
|
/*
|
|
2.3.4 Same as writeDataToCassandra, but with prepared statements
|
|
|
|
*/
|
|
bool writeDataToCassandraPrepared(const CassPrepared* preparedStatement,
|
|
string partition, string node, string key, char* value,
|
|
size_t value_length, string consistenceLevel, bool sync);
|
|
|
|
/*
|
|
2.3.4 Inform the CassandraAdapter about the fact that the last tuple for
|
|
the relaion is written. Batch writes can be commited and prepared
|
|
statements can be freed.
|
|
|
|
*/
|
|
void relationCompleteCallback(string relation);
|
|
|
|
/*
|
|
2.3.5 Fetch a full table from cassandra
|
|
|
|
1. Parameter is the relation to read
|
|
2. Parameter is the consistence level used for writing
|
|
|
|
*/
|
|
CassandraTuplePrefetcher* readTable(string relation,
|
|
string consistenceLevel);
|
|
|
|
/*
|
|
2.3.5 Fetch partial table from cassandra. Read only
|
|
the tuples stored on the local node
|
|
|
|
1. Parameter is the relation to read
|
|
2. Parameter is the consistence level used for writing
|
|
|
|
*/
|
|
CassandraTuplePrefetcher* readTableLocal(string relation,
|
|
string consistenceLevel);
|
|
|
|
/*
|
|
2.3.5 Fetch partial table from cassandra. Read only
|
|
the tuples inside the given token interval
|
|
|
|
1. Parameter is the relation to read
|
|
2. Parameter is the consistence level used for writing
|
|
3. Parameter is the begin token range
|
|
4. Parameter is the end token range
|
|
|
|
*/
|
|
CassandraTuplePrefetcher* readTableRange(string relation,
|
|
string consistenceLevel, string begin, string end);
|
|
|
|
|
|
/*
|
|
2.3.6 Fetch partial table from cassandra. The table is
|
|
produced by the query ~queryId~. Fetch only data
|
|
from fault free nodes.
|
|
|
|
1. Parameter is the relation to read
|
|
2. Parameter is the consistence level used for writing
|
|
3. Parameter is the queryId
|
|
|
|
*/
|
|
CassandraTuplePrefetcher* readTableCreatedByQuery(string relation,
|
|
string consistenceLevel, int queryId);
|
|
|
|
|
|
/*
|
|
2.3.6 Create a new relation in cassandra and write some
|
|
metadata (e.g. tupletype) for the table.
|
|
|
|
Returns true if the relation could be created, false otherwise.
|
|
|
|
1. Parameter is the name of the relation
|
|
2. Parameter is the tuple type of the stored tuples
|
|
in nested list representation
|
|
|
|
*/
|
|
bool createTable(string tablename, string tupletype);
|
|
|
|
/*
|
|
2.3.7 Remove a relation from the cassandra cluster. Returns true if
|
|
the relation could be successfully removed, false otherwise.
|
|
|
|
1. Parameter is the name of the relation
|
|
|
|
*/
|
|
bool dropTable(string tablename);
|
|
|
|
/*
|
|
2.3.8 Disconnect from cassandra cluster. This method waits for
|
|
all pending requests to finish before the connection is
|
|
closed. So the call can take some time.
|
|
|
|
*/
|
|
void disconnect();
|
|
|
|
/*
|
|
2.3.9 Is the connection to the cluster open? Return true if the
|
|
connection is open. False otherweise.
|
|
|
|
*/
|
|
bool isConnected();
|
|
|
|
/*
|
|
2.3.10 Get the token list of the current node
|
|
1. Parameter is the token result list
|
|
|
|
*/
|
|
bool getLocalTokens(vector <cassandra::CassandraToken> &result);
|
|
|
|
/*
|
|
2.3.11 Get the token list of all peer nodes
|
|
1. Parameter is the token result list
|
|
|
|
*/
|
|
bool getPeerTokens(vector <cassandra::CassandraToken> &result);
|
|
|
|
/*
|
|
2.3.12 Get all tables from the keyspace specified
|
|
by the 1th paramter
|
|
|
|
*/
|
|
CassandraResult* getAllTables(string keyspace);
|
|
|
|
/*
|
|
2.3.12 Get the TupleType (in nested list format) from
|
|
the table specified by the 1th paramter. The
|
|
result will be stored in the 2nd parameter.
|
|
|
|
Returns true, if the 2nd parameter contains
|
|
a valid result. False otherweise
|
|
|
|
*/
|
|
bool getTupleTypeFromTable(string table, string &result);
|
|
|
|
/*
|
|
2.3.12 Execute the cql statement and return the result
|
|
|
|
*/
|
|
CassandraResult* readDataFromCassandra(string cql,
|
|
CassConsistency consistenceLevel,
|
|
bool printError = true);
|
|
|
|
/*
|
|
2.3.12 Get a list with all token ranges
|
|
|
|
1st parameter is a vector with the token ranges
|
|
2nd parameter is a vector with the tokens of the local system (optional)
|
|
3rd parameter is a vector with the tokens of the other systems (optional)
|
|
|
|
*/
|
|
bool getAllTokenRanges(vector<TokenRange> &allTokenRange);
|
|
|
|
bool getAllTokenRanges(vector<TokenRange> &allTokenRange,
|
|
vector <CassandraToken> &localTokens,
|
|
vector <CassandraToken> &peerTokens);
|
|
|
|
/*
|
|
2.3.12 Get a list with local token ranges
|
|
|
|
1st parameter is a vector with the local token ranges
|
|
2nd parameter is a vector with the tokens of the local system
|
|
3rd parameter is a vector with the tokens of the other systems
|
|
|
|
*/
|
|
|
|
bool getLocalTokenRanges(vector<TokenRange> &localTokenRange,
|
|
vector <CassandraToken> &localTokens,
|
|
vector <CassandraToken> &peerTokens);
|
|
|
|
/*
|
|
2.3.12 Execute the cql statement with a given consistence level synchronus
|
|
|
|
*/
|
|
bool executeCQLSync(string cql, CassConsistency consistency);
|
|
|
|
/*
|
|
2.3.13 Execute the cql statement with a given consistence level asynchronus
|
|
|
|
*/
|
|
bool executeCQLASync(string cql, CassConsistency consistency);
|
|
|
|
|
|
/*
|
|
2.3.15 Truncate metatables
|
|
|
|
*/
|
|
bool truncateMetatables();
|
|
|
|
|
|
/*
|
|
2.3.16 Get a cassandraResult with queries to execute
|
|
|
|
*/
|
|
void getQueriesToExecute(vector<CassandraQuery> &result);
|
|
|
|
/*
|
|
2.3.17 Quote CQL Statement
|
|
|
|
Replace all single quotes with double quotes
|
|
|
|
*/
|
|
void quoteCqlStatement(string &query);
|
|
|
|
/*
|
|
2.3.18 Get the global query state
|
|
|
|
1. parameter is the consistency level (optional)
|
|
2. parameter determines whether errors are printed (optional)
|
|
|
|
*/
|
|
CassandraResult* getGlobalQueryState(
|
|
CassConsistency consistency = CASS_CONSISTENCY_ALL,
|
|
bool printError = true);
|
|
|
|
/*
|
|
2.3.19 Get processed token ranges for query
|
|
|
|
1. parameter is the result vector
|
|
2. parameter is the id of the query
|
|
3. parameter is the consistency level (optional)
|
|
4. parameter determines whether errors are printed (optional)
|
|
|
|
*/
|
|
bool getProcessedTokenRangesForQuery (
|
|
vector<TokenRange> &result, int queryId,
|
|
CassConsistency consistency = CASS_CONSISTENCY_QUORUM,
|
|
bool printError = true);
|
|
|
|
/*
|
|
2.3.20 Get tokenranges from query
|
|
|
|
1. parameter result
|
|
2. parameter the query
|
|
3. parameter the consistency level (optional)
|
|
4. parameter determines whether errors are printed (optional)
|
|
|
|
*/
|
|
bool getTokenrangesFromQuery (
|
|
vector<TokenRange> &result, string query,
|
|
CassConsistency consistency = CASS_CONSISTENCY_QUORUM,
|
|
bool printError = true);
|
|
|
|
/*
|
|
2.3.21 Get tokenranges from system table
|
|
|
|
1. parameter result
|
|
|
|
*/
|
|
bool getTokenRangesFromSystemtable (
|
|
vector<TokenRange> &result);
|
|
|
|
/*
|
|
2.3.22 Get heartbeat data
|
|
|
|
Result is a map:
|
|
|
|
IP to Lastheatbeat message
|
|
|
|
*/
|
|
bool getHeartbeatData(map<string, time_t> &result);
|
|
|
|
/*
|
|
2.3.23 Get node data
|
|
|
|
Result is a map:
|
|
|
|
IP to Noodename
|
|
|
|
*/
|
|
bool getNodeData(map<string, string> &result);
|
|
|
|
/*
|
|
2.3.24 Copy tokenranges to systemtable
|
|
|
|
*/
|
|
bool copyTokenRangesToSystemtable(string localip);
|
|
|
|
|
|
/*
|
|
2.3.25 Wait for pending futures
|
|
|
|
*/
|
|
void waitForPendingFutures();
|
|
|
|
|
|
/*
|
|
2.3.25 Wait for pending futures if needed
|
|
|
|
*/
|
|
void waitForPendingFuturesIfNeeded();
|
|
|
|
/*
|
|
2.3.26 Create a pepared statement for inserting data into the
|
|
relation spoecified in the first parameter.
|
|
|
|
*/
|
|
const CassPrepared* prepareCQLInsert(string relation);
|
|
|
|
|
|
/*
|
|
2.3.27 Free a prepared statement
|
|
|
|
1. Parameter is the prepared statement
|
|
|
|
*/
|
|
void freePreparedStatement(const CassPrepared* preparedStatement);
|
|
|
|
/*
|
|
2.3.28 Insert a pending token range processing
|
|
|
|
*/
|
|
bool insertPendingTokenRange(size_t queryId, string ip,
|
|
TokenRange *tokenrange);
|
|
|
|
/*
|
|
2.3.29 Delete a pending token range processing
|
|
|
|
*/
|
|
void deletePendingTokenRange(size_t queryId, string ip,
|
|
TokenRange *tokenrange);
|
|
|
|
/*
|
|
2.3.30 Test if a token range processing is pending.
|
|
Returns true if the token range is currently
|
|
processed by QPN. The ip of the QPN will be
|
|
strored in the variable cassandraNode.
|
|
Returns false otherwise
|
|
|
|
*/
|
|
bool getNodeForPendingTokenRange(string &cassandraNode,
|
|
size_t queryId, TokenRange *tokenrange);
|
|
|
|
protected:
|
|
|
|
/*
|
|
2.3.28 Execute the given cql future and check for errors. Returns
|
|
true if the future is executed successfully. False otherwise.
|
|
|
|
*/
|
|
bool executeCQLFutureSync(CassFuture* cqlFuture);
|
|
|
|
/*
|
|
2.3.29 Execute the given cql. Returns a future containing the
|
|
Query result.
|
|
|
|
*/
|
|
CassFuture*
|
|
executeCQL(string cql, CassConsistency consistency);
|
|
|
|
/*
|
|
2.3.30 Returns a CQL statement for inserting a new row. The
|
|
first parameter is the relation for this request. The
|
|
second parameter is the parition key, the third parameter
|
|
is the node id. The fourth parameter is the key of the tuple.
|
|
The fith parameter is the value of the tuple.
|
|
|
|
*/
|
|
string getInsertCQL(string relation, string partition,
|
|
string node, string key, string value);
|
|
|
|
/*
|
|
2.3.31 Iterate over all pending futures (e.g. writes), report
|
|
errors and remove finished futures from the future list.
|
|
Normally a cleanup is started only if the condition
|
|
list.length % 100 = 0 is true.
|
|
With the parameter force, you can override this policy.
|
|
|
|
*/
|
|
void removeFinishedFutures(bool force = false);
|
|
|
|
/*
|
|
2.3.32 Execute a CQL query and extract result tokens
|
|
|
|
1. Parameter is the query to execute
|
|
2. Parameter is the token result list
|
|
3. Parameter is the default peer
|
|
|
|
*/
|
|
bool getTokensFromQuery (string query, vector <CassandraToken> &result,
|
|
string peer);
|
|
|
|
/*
|
|
2.3.33 Connect to the cassandra cluster
|
|
|
|
1. Parameter is the session instance
|
|
2. Parameter is the cassandra cluster
|
|
|
|
*/
|
|
CassError connect_session(CassSession* session,
|
|
const CassCluster* cluster);
|
|
|
|
private:
|
|
|
|
// Our cassandra contact point
|
|
string contactpoint;
|
|
|
|
// Our keyspace
|
|
string keyspace;
|
|
|
|
// Our relation
|
|
string relation;
|
|
|
|
// Cassandra cluster
|
|
CassCluster* cluster;
|
|
|
|
// Cassandra session
|
|
CassSession* session;
|
|
|
|
// Pending futures (e.g. write requests)
|
|
std::vector<CassFuture*> pendingFutures;
|
|
|
|
// Error flag
|
|
bool errorFlag;
|
|
};
|
|
|
|
} // Namespace
|
|
|
|
#endif
|