/* ---- This file is part of SECONDO. Copyright (C) 2015, Faculty of Mathematics and Computer Science, Database Systems for New Applications. SECONDO is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. SECONDO is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with SECONDO; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA ---- */ #include "ReadFromKafkaOperator.h" #include "KafkaClient.h" #include "log.hpp" using namespace std; namespace kafka { ListExpr ReadFromKafkaTM(ListExpr args) { // check number of arguments if (!nl->HasLength(args, 3)) { return listutils::typeError("wrong number of arguments"); } ListExpr brokerArg = nl->First(args); ListExpr brokerError = validateBrokerArg(brokerArg); if (brokerError) return brokerError; string broker = nl->StringValue(nl->Second(brokerArg)); ListExpr topicArg = nl->Second(args); ListExpr error = validateTopicArg(topicArg); if (error) return error; string topic = nl->StringValue(nl->Second(topicArg)); ListExpr booleanArg = nl->Third(args); ListExpr booleanError = validateBooleanArg(booleanArg); if (booleanError) return booleanError; std::string typeString = readTypeString(broker, topic); LOG(DEBUG) << "topicTypeString: " << typeString; ListExpr res = 0; if (!nl->ReadFromString(typeString, res)) { cout << "Error reading type line: " << typeString << endl; }; return res; } ListExpr validateTopicArg(ListExpr topicArg) { if (!nl->HasLength(topicArg, 2)) { return listutils::typeError("internal error, " "topicArg invalid"); } if (!CcString::checkType(nl->First(topicArg))) { return listutils::typeError( "String (as type for topic name) expected"); } ListExpr fn = nl->Second(topicArg); if (nl->AtomType(fn) != StringType) { return listutils::typeError("topic name not constant"); } return 0; } ListExpr validateBrokerArg(ListExpr topicArg) { if (!nl->HasLength(topicArg, 2)) { return listutils::typeError("internal error, " "BrokerArg invalid"); } if (!CcString::checkType(nl->First(topicArg))) { return listutils::typeError( "String (as type for Broker name) expected"); } ListExpr fn = nl->Second(topicArg); if (nl->AtomType(fn) != StringType) { return listutils::typeError("Broker name not constant"); } return 0; } ListExpr validateBooleanArg(ListExpr booleanArg) { if (!nl->HasLength(booleanArg, 2)) { return listutils::typeError("internal error, " "boolean invalid"); } if (!CcBool::checkType(nl->First(booleanArg))) { return listutils::typeError( "Boolean expected"); } ListExpr fn = nl->Second(booleanArg); if (nl->AtomType(fn) != BoolType) { return listutils::typeError("Boolean not BoolType"); } return 0; } std::string readTypeString(string broker, string topic) { LOG(DEBUG) << "readTypeString started. topic:" << topic; KafkaReaderClient kafkaReaderClient; kafkaReaderClient.Open(broker, topic); std::string *source = kafkaReaderClient.ReadSting(); kafkaReaderClient.Close(); if (source == nullptr) { LOG(DEBUG) << "readTypeString is null"; return ""; } std::string result = *source; delete source; LOG(DEBUG) << "readTypeString:" << result; return result; } class KafkaSourceLI { public: // constructor: initializes the class from the string argument KafkaSourceLI(CcString *brokerArg, CcString *topicArg, CcBool *continuousArg) : topic("") { def = topicArg->IsDefined(); if (def) { topic = topicArg->GetValue(); std::string broker = brokerArg->GetValue(); continuous = continuousArg->GetValue(); kafkaReaderClient.Open(broker, topic); std::string *typeString = kafkaReaderClient.ReadSting(); delete typeString; kafkaReaderClient.setExitOnTimeout(continuous); } } // destructor ~KafkaSourceLI() { if (def) { kafkaReaderClient.Close(); } } // this function returns the next result or null if the input is // exhausted Tuple *getNext(Supplier s) { if (!def) { return NULL; } std::string *source = kafkaReaderClient.ReadSting(); if (source == NULL) { return NULL; } ListExpr resultType = GetTupleResultType(s); TupleType *tupleType = new TupleType(nl->Second(resultType)); Tuple *res = new Tuple(tupleType); res->ReadFromBinStr(0, *source); delete source; return res; } bool isContinuous() { return continuous; } private: string topic; KafkaReaderClient kafkaReaderClient; bool def; bool continuous; }; int ReadFromKafkaVM(Word *args, Word &result, int message, Word &local, Supplier s) { KafkaSourceLI *li = (KafkaSourceLI *) local.addr; switch (message) { case OPEN : LOG(DEBUG) << "ReadFromKafkaVM open"; if (li) { delete li; } local.addr = new KafkaSourceLI((CcString *) args[0].addr, (CcString *) args[1].addr, (CcBool *) args[2].addr ); LOG(DEBUG) << "ReadFromKafkaVM opened"; return 0; case REQUEST: LOG(TRACE) << "ReadFromKafkaVM request"; if (li) { result.addr = li->getNext(s); if (li->isContinuous()) return YIELD; return result.addr ? YIELD : CANCEL; } else { result.addr = 0; return CANCEL; } case CLOSE: LOG(DEBUG) << "ReadFromKafkaVM closing"; if (li) { delete li; local.addr = 0; } LOG(DEBUG) << "ReadFromKafkaVM closed"; return 0; } return 0; } OperatorSpec ReadFromKafkaOpSpec( "string,string,boolean -> stream(string)", "readfromkafka(host,topic,continuousReading) ", "Reads steam of tuples from the kafka topic. " "Host - host name of Kafka broker eg. \"localhost\" or in case of " "multiple brokers comma-separated list of hosts, eg. " "\"host1:port1,host2:port2,...\" " "continuousReading - if false, reading is stopped when the topic " "is exhausted, otherwise, the operator waits for new data to " "arrive into the topic", "query readfromkafka(\"localhost\", \"KM\", false) count" ); Operator readFromKafkaOp( "readfromkafka", ReadFromKafkaOpSpec.getStr(), ReadFromKafkaVM, Operator::SimpleSelect, ReadFromKafkaTM ); }