Kaynağa Gözat

Update Makefile.conf and fix Line Endings

JDierkse 5 yıl önce
ebeveyn
işleme
94739511dd

+ 89 - 89
Application/DataImporter.cc

@@ -1,89 +1,89 @@
-#include "DataInterface/DataImporter.h"
-#include <clipp.h>
-#include <INIReader.h>
-#include <Logging.h>
-#include <MySQLClient.h>
-#include <SQLiteClient.h>
-#include <fstream>
-#include <iostream>
-#include <sstream>
-
-
-int main(int argc, char** argv)
-{
-	try
-	{
-		Logging::OpenLog();
-		Logging::SetLogMask(Logging::Severity::Debug);
-
-		std::string databaseFile = "";
-		clipp::group cli {
-			clipp::value("SQLite Database file", databaseFile)
-		};
-
-		if (!clipp::parse(argc, argv, cli))
-		{
-			std::stringstream ss;
-			ss << clipp::make_man_page(cli, argv[0]);
-			Logging::Log(Logging::Severity::Info, ss.str());
-
-			Logging::CloseLog();
-
-			return 1;
-		}
-
-		{
-			std::ifstream f(databaseFile);
-			if (!f.good())
-				throw std::runtime_error("Can't read SQLite Database file.");
-		}
-
-		INIReader iniReader("DataImporter.ini");
-		if (iniReader.ParseError() != 0)
-			throw std::runtime_error("Can't read DataImporter.ini.");
-
-		MySQL::MySQLClient mySQLClient;
-		{
-			std::string hostname = iniReader.Get("MySQL", "Hostname", "");
-			if (hostname.empty())
-				throw std::runtime_error("MySQL Hostname directive missing in ini file.");
-
-			std::string username = iniReader.Get("MySQL", "Username", "");
-			if (username.empty())
-				throw std::runtime_error("MySQL Username directive missing in ini file.");
-
-			std::string password = iniReader.Get("MySQL", "Password", "");
-			if (password.empty())
-				throw std::runtime_error("MySQL Password directive missing in ini file.");
-
-			std::string database = iniReader.Get("MySQL", "Database", "");
-			if (database.empty())
-				throw std::runtime_error("MySQL Database directive missing in ini file.");
-
-			mySQLClient.Connect(hostname, username, password, database);
-		}
-
-		SQLite::SQLiteClient sqliteClient("./datalog.db");
-
-		DataStorageInterface::DataInterface::DataImporter importer(&mySQLClient, &sqliteClient);
-		importer.ImportData();
-
-		Logging::CloseLog();
-	}
-	catch (const std::exception& e)
-	{
-		std::cerr << "Exception caught" << std::endl;
-
-		std::stringstream ss;
-		ss << "Type : " << typeid(e).name() << std::endl;
-		ss << "ERROR: " << e.what() << std::endl;
-
-		Logging::Log(Logging::Severity::Error, ss.str());
-
-		Logging::CloseLog();
-
-		return -1;
-	}
-
-	return 0;
-}
+#include "DataInterface/DataImporter.h"
+#include <clipp.h>
+#include <INIReader.h>
+#include <Logging.h>
+#include <MySQLClient.h>
+#include <SQLiteClient.h>
+#include <fstream>
+#include <iostream>
+#include <sstream>
+
+
+int main(int argc, char** argv)
+{
+	try
+	{
+		Logging::OpenLog();
+		Logging::SetLogMask(Logging::Severity::Debug);
+
+		std::string databaseFile = "";
+		clipp::group cli {
+			clipp::value("SQLite Database file", databaseFile)
+		};
+
+		if (!clipp::parse(argc, argv, cli))
+		{
+			std::stringstream ss;
+			ss << clipp::make_man_page(cli, argv[0]);
+			Logging::Log(Logging::Severity::Info, ss.str());
+
+			Logging::CloseLog();
+
+			return 1;
+		}
+
+		{
+			std::ifstream f(databaseFile);
+			if (!f.good())
+				throw std::runtime_error("Can't read SQLite Database file.");
+		}
+
+		INIReader iniReader("DataImporter.ini");
+		if (iniReader.ParseError() != 0)
+			throw std::runtime_error("Can't read DataImporter.ini.");
+
+		MySQL::MySQLClient mySQLClient;
+		{
+			std::string hostname = iniReader.Get("MySQL", "Hostname", "");
+			if (hostname.empty())
+				throw std::runtime_error("MySQL Hostname directive missing in ini file.");
+
+			std::string username = iniReader.Get("MySQL", "Username", "");
+			if (username.empty())
+				throw std::runtime_error("MySQL Username directive missing in ini file.");
+
+			std::string password = iniReader.Get("MySQL", "Password", "");
+			if (password.empty())
+				throw std::runtime_error("MySQL Password directive missing in ini file.");
+
+			std::string database = iniReader.Get("MySQL", "Database", "");
+			if (database.empty())
+				throw std::runtime_error("MySQL Database directive missing in ini file.");
+
+			mySQLClient.Connect(hostname, username, password, database);
+		}
+
+		SQLite::SQLiteClient sqliteClient("./datalog.db");
+
+		DataStorageInterface::DataInterface::DataImporter importer(&mySQLClient, &sqliteClient);
+		importer.ImportData();
+
+		Logging::CloseLog();
+	}
+	catch (const std::exception& e)
+	{
+		std::cerr << "Exception caught" << std::endl;
+
+		std::stringstream ss;
+		ss << "Type : " << typeid(e).name() << std::endl;
+		ss << "ERROR: " << e.what() << std::endl;
+
+		Logging::Log(Logging::Severity::Error, ss.str());
+
+		Logging::CloseLog();
+
+		return -1;
+	}
+
+	return 0;
+}

+ 77 - 77
Application/DataStorageAPI.cc

@@ -1,77 +1,77 @@
-#include "API/WebAPI.h"
-#include "DataInterface/GraphClient.h"
-#include <DataStorageClient.h>
-#include <HttpServer.h>
-#include <INIReader.h>
-#include <Logging.h>
-#include <MySQLClient.h>
-#include <sstream>
-
-
-int main(int argc, char** argv)
-{
-	try
-	{
-		Logging::OpenLog();
-		Logging::SetLogMask(Logging::Severity::Info);
-
-		INIReader iniReader("DataStorageAPI.ini");
-		if (iniReader.ParseError() != 0)
-			throw std::runtime_error("Can't read DataLogger.ini.");
-
-		Logging::Log(Logging::Severity::Info, "Starting DataStorageAPI");
-
-		int port = iniReader.GetInteger("DataStorageAPI", "Port", 0);
-		if (port == 0)
-			throw std::runtime_error("Port directive missing in ini file.");
-
-		std::string table = iniReader.Get("DataStorageAPI", "Table", "");
-		if (table.empty())
-			throw std::runtime_error("Table directive missing in ini file.");
-
-		std::shared_ptr<MySQL::MySQLClient> pMySQLClient = std::make_shared<MySQL::MySQLClient>();
-		{
-			std::string hostname = iniReader.Get("MySQL", "Hostname", "");
-			if (hostname.empty())
-				throw std::runtime_error("MySQL Hostname directive missing in ini file.");
-
-			std::string username = iniReader.Get("MySQL", "Username", "");
-			if (username.empty())
-				throw std::runtime_error("MySQL Username directive missing in ini file.");
-
-			std::string password = iniReader.Get("MySQL", "Password", "");
-			if (password.empty())
-				throw std::runtime_error("MySQL Password directive missing in ini file.");
-
-			std::string database = iniReader.Get("MySQL", "Database", "");
-			if (database.empty())
-				throw std::runtime_error("MySQL Database directive missing in ini file.");
-
-			pMySQLClient->Connect(hostname, username, password, database);
-		}
-
-		DataStorage::DataStorageClient dataStorageClient(pMySQLClient, table);
-		DataStorageInterface::DataInterface::GraphClient graphClient(pMySQLClient.get());
-		auto callback = std::bind(&DataStorageInterface::API::WebAPI::ProcessQuery, &dataStorageClient, &graphClient, std::placeholders::_1, std::placeholders::_2);
-
-		Http::HttpServer server(port, callback);
-		Logging::Log(Logging::Severity::Info, "Startup Complete");
-		server.Wait();
-
-		Logging::Log(Logging::Severity::Info, "Stopping DataStorageAPI...");
-
-		Logging::CloseLog();
-	}
-	catch (const std::exception& e)
-	{
-		std::stringstream ss;
-		ss << "ERROR: " << e.what() << std::endl;
-
-		Logging::Log(Logging::Severity::Error, ss.str());
-		Logging::CloseLog();
-
-		return -1;
-	}
-
-	return 0;
-}
+#include "API/WebAPI.h"
+#include "DataInterface/GraphClient.h"
+#include <DataStorageClient.h>
+#include <HttpServer.h>
+#include <INIReader.h>
+#include <Logging.h>
+#include <MySQLClient.h>
+#include <sstream>
+
+
+int main(int argc, char** argv)
+{
+	try
+	{
+		Logging::OpenLog();
+		Logging::SetLogMask(Logging::Severity::Info);
+
+		INIReader iniReader("DataStorageAPI.ini");
+		if (iniReader.ParseError() != 0)
+			throw std::runtime_error("Can't read DataLogger.ini.");
+
+		Logging::Log(Logging::Severity::Info, "Starting DataStorageAPI");
+
+		int port = iniReader.GetInteger("DataStorageAPI", "Port", 0);
+		if (port == 0)
+			throw std::runtime_error("Port directive missing in ini file.");
+
+		std::string table = iniReader.Get("DataStorageAPI", "Table", "");
+		if (table.empty())
+			throw std::runtime_error("Table directive missing in ini file.");
+
+		std::shared_ptr<MySQL::MySQLClient> pMySQLClient = std::make_shared<MySQL::MySQLClient>();
+		{
+			std::string hostname = iniReader.Get("MySQL", "Hostname", "");
+			if (hostname.empty())
+				throw std::runtime_error("MySQL Hostname directive missing in ini file.");
+
+			std::string username = iniReader.Get("MySQL", "Username", "");
+			if (username.empty())
+				throw std::runtime_error("MySQL Username directive missing in ini file.");
+
+			std::string password = iniReader.Get("MySQL", "Password", "");
+			if (password.empty())
+				throw std::runtime_error("MySQL Password directive missing in ini file.");
+
+			std::string database = iniReader.Get("MySQL", "Database", "");
+			if (database.empty())
+				throw std::runtime_error("MySQL Database directive missing in ini file.");
+
+			pMySQLClient->Connect(hostname, username, password, database);
+		}
+
+		DataStorage::DataStorageClient dataStorageClient(pMySQLClient, table);
+		DataStorageInterface::DataInterface::GraphClient graphClient(pMySQLClient.get());
+		auto callback = std::bind(&DataStorageInterface::API::WebAPI::ProcessQuery, &dataStorageClient, &graphClient, std::placeholders::_1, std::placeholders::_2);
+
+		Http::HttpServer server(port, callback);
+		Logging::Log(Logging::Severity::Info, "Startup Complete");
+		server.Wait();
+
+		Logging::Log(Logging::Severity::Info, "Stopping DataStorageAPI...");
+
+		Logging::CloseLog();
+	}
+	catch (const std::exception& e)
+	{
+		std::stringstream ss;
+		ss << "ERROR: " << e.what() << std::endl;
+
+		Logging::Log(Logging::Severity::Error, ss.str());
+		Logging::CloseLog();
+
+		return -1;
+	}
+
+	return 0;
+}

+ 7 - 8
DataCondenser.ini

@@ -1,8 +1,7 @@
-[DataCondenser]
-
-[MySQL]
-Hostname =
-Username =
-Password =
-Database =
-
+[DataCondenser]
+
+[MySQL]
+Hostname =
+Username =
+Password =
+Database =

+ 7 - 8
DataImporter.ini

@@ -1,8 +1,7 @@
-[DataImporter]
-
-[MySQL]
-Hostname =
-Username =
-Password =
-Database =
-
+[DataImporter]
+
+[MySQL]
+Hostname =
+Username =
+Password =
+Database =

+ 369 - 407
DataInterface/DataCondenser.cpp

@@ -1,407 +1,369 @@
-#include "DataCondenser.h"
-#include "Util/Util.h"
-#include <DataId.h>
-#include <iostream>
-#include <limits>
-#include <map>
-#include <memory>
-#include <sstream>
-
-
-namespace DataStorageInterface {
-namespace DataInterface {
-
-DataCondenser::DataCondenser(MySQL::MySQLClient* pMySQLClient) :
-	m_pMySQLClient(pMySQLClient)
-{
-}
-
-void DataCondenser::CondenseData()
-{
-	auto deviceIDs = GetDeviceIDs();
-
-	for (int dataId = DataStorage::DataId::Min(); dataId <= DataStorage::DataId::Max(); ++dataId)
-	{
-		std::cout << "DataID: " << dataId << std::endl;
-
-		//std::cout << "Daily" << std::endl;
-		//for (auto& deviceId : deviceIDs)
-		//	WriteReducedData(deviceId, dataId, RRA::Daily);
-		std::cout << "Weekly" << std::endl;
-		for (auto& deviceId : deviceIDs)
-			WriteReducedData(deviceId, dataId, RRA::Weekly);
-		std::cout << "Monthly" << std::endl;
-		for (auto& deviceId : deviceIDs)
-			WriteReducedData(deviceId, dataId, RRA::Monthly);
-		std::cout << "Yearly" << std::endl;
-		for (auto& deviceId : deviceIDs)
-			WriteReducedData(deviceId, dataId, RRA::Yearly);
-	}
-
-	CleanData();
-}
-
-int DataCondenser::GetPeriodStamp(int timestamp, const RRA::type& rra)
-{
-	int modulo = timestamp % static_cast<int>(rra);
-	return timestamp - modulo;
-}
-
-std::vector<int> DataCondenser::GetDeviceIDs()
-{
-	std::stringstream query;
-	query << "SELECT DISTINCT `device_id` FROM `datalog` ORDER BY `device_id`;";
-	auto resultSet = m_pMySQLClient->ExecuteQuery(query.str());
-
-	std::vector<int> returnValue;
-	while (resultSet.Next())
-		returnValue.push_back(resultSet.Int("device_id"));
-
-	return returnValue;
-}
-
-int DataCondenser::GetNewestPeriodStamp(const std::string& sourceTable, int deviceId, int dataId)
-{
-	std::stringstream query;
-	query << "SELECT MAX(UNIX_TIMESTAMP(`timestamp`)) AS timestamp FROM `" << sourceTable << "` WHERE `device_id` = '" << deviceId <<"' AND `data_id` = '" << dataId << "';";
-	auto resultSet = m_pMySQLClient->ExecuteQuery(query.str());
-	if (resultSet.Next())
-		return resultSet.Int("timestamp");
-
-	return 0;
-}
-
-MySQL::MySQLResultSet DataCondenser::GetRawData(const std::string& sourceTable, int deviceId, int dataId, int newestPeriodStamp)
-{
-	std::stringstream query;
-	query << "SELECT UNIX_TIMESTAMP(`timestamp`) AS timestamp, `value` AS `min-value`, `value` AS `mean-value`, `value` AS `max-value` FROM `" << sourceTable << "` WHERE `device_id` = '" << deviceId <<"' AND `data_id` = '" << dataId << "' ";
-	if (newestPeriodStamp == -1)
-		query << "LIMIT 1;";
-	else
-		query << "AND `timestamp` > FROM_UNIXTIME(" << newestPeriodStamp << ") ORDER BY `timestamp` ASC;";
-
-	return m_pMySQLClient->ExecuteQuery(query.str());
-}
-
-MySQL::MySQLResultSet DataCondenser::GetReducedData(const std::string& sourceTable, int deviceId, int dataId, int newestPeriodStamp)
-{
-	std::stringstream query;
-	query << "SELECT UNIX_TIMESTAMP(`timestamp`) AS timestamp, `min-value`, `mean-value`, `max-value` FROM `" << sourceTable << "` WHERE `device_id` = '" << deviceId <<"' AND `data_id` = '" << dataId << "' ";
-	if (newestPeriodStamp == -1)
-		query << "LIMIT 1;";
-	else
-		query << "AND `timestamp` > FROM_UNIXTIME(" << newestPeriodStamp << ") ORDER BY `timestamp` ASC;";
-
-	return m_pMySQLClient->ExecuteQuery(query.str());
-}
-
-int DataCondenser::GetMaximumMySQLPacketSize()
-{
-	std::stringstream query;
-	query << "show variables like 'max_allowed_packet';";
-	MySQL::MySQLResultSet resultSet = m_pMySQLClient->ExecuteQuery(query.str());
-
-	if (resultSet.RowsCount() != 1)
-		throw std::runtime_error("Unexpected reply when retrieving max_allowed_packet.");
-
-	resultSet.Next();
-	return resultSet.Int("Value");
-}
-
-DataType::type DataCondenser::GetDataType(int deviceId, int dataId)
-{
-	auto resultSet = GetRawData("datalog", deviceId, dataId, -1);
-	if (resultSet.Next())
-		return DataInterface::GetDataType(resultSet.String("mean-value"));
-
-	return DataType::None;
-}
-
-std::string DataCondenser::GetSourceTable(const RRA::type& rra)
-{
-	switch (rra)
-	{
-	case RRA::Daily:
-		return "datalog";
-	case RRA::Weekly:
-		return "datalog";
-	case RRA::Monthly:
-		return "datalog-weekly";
-	case RRA::Yearly:
-		return "datalog-monthly";
-	default:
-	case RRA::Raw:
-		return std::string();
-	}
-}
-
-std::string DataCondenser::GetDestinationTable(const RRA::type& rra)
-{
-	switch (rra)
-	{
-	case RRA::Daily:
-		return "datalog-daily";
-	case RRA::Weekly:
-		return "datalog-weekly";
-	case RRA::Monthly:
-		return "datalog-monthly";
-	case RRA::Yearly:
-		return "datalog-yearly";
-	default:
-	case RRA::Raw:
-		return std::string();
-	}
-}
-
-int DataCondenser::StringStreamSize(std::stringstream& ss)
-{
-	ss.seekp(0, std::ios_base::end);
-	return ss.tellp();
-}
-
-void DataCondenser::InitializeInsertQuery(std::stringstream& query, const std::string& table)
-{
-	query.str("");
-	query << "INSERT INTO `" << table << "` (`device_id`, `data_id`, `timestamp`, `min-value`, `mean-value`, `max-value`) VALUES ";
-}
-
-void DataCondenser::ExecuteInsertQuery(std::stringstream& query)
-{
-	query.seekp(-1, std::ios_base::end);
-	query << ";";
-	m_pMySQLClient->Execute(query.str());
-}
-
-void DataCondenser::WriteReducedData(int deviceId, int dataId, const RRA::type& rra)
-{
-	DataType::type dataType = GetDataType(deviceId, dataId);
-
-	if (dataType == DataType::None)
-		std::cout << " DataType: None" << std::endl;
-	else if (dataType == DataType::Float)
-		std::cout << " DataType: Float" << std::endl;
-	else if (dataType == DataType::String)
-		std::cout << " DataType: String" << std::endl;
-
-	switch (dataType)
-	{
-	case DataType::Float:
-		WriteReducedFloatData(deviceId, dataId, rra);
-		break;
-	case DataType::String:
-		WriteReducedStringData(deviceId, dataId, rra);
-		break;
-	default:
-	case DataType::None:
-		break;
-	}
-}
-
-void DataCondenser::WriteReducedFloatData(int deviceId, int dataId, const RRA::type& rra)
-{
-	int maxMySQLPacketSize = GetMaximumMySQLPacketSize() - 2;
-
-	std::string sourceTable = GetSourceTable(rra);
-	std::string destinationTable = GetDestinationTable(rra);
-
-	int newestSourcePeriodStamp = GetNewestPeriodStamp(sourceTable, deviceId, dataId);
-	int newestDestinationPeriodStamp = GetNewestPeriodStamp(destinationTable, deviceId, dataId);
-
-	std::cout << " DeviceID: " << deviceId << std::endl;
-	std::cout << " DataID: " << dataId << std::endl;
-	std::cout << "  Newest Src : " << newestSourcePeriodStamp << std::endl;
-	std::cout << "  Newest Dest: " << newestDestinationPeriodStamp << std::endl;
-
-	std::stringstream query;
-	MySQL::MySQLResultSet resultSet;
-	if (sourceTable == "datalog")
-		resultSet = GetRawData(sourceTable, deviceId, dataId, newestDestinationPeriodStamp);
-	else
-		resultSet = GetReducedData(sourceTable, deviceId, dataId, newestDestinationPeriodStamp);
-
-	std::cout << " Source Count: " << resultSet.RowsCount() << std::endl;
-
-	int samples = 0;
-	float min = std::numeric_limits<float>::max();
-	float mean = 0;
-	float max = std::numeric_limits<float>::min();
-
-	int writeCount = 0;
-
-	InitializeInsertQuery(query, destinationTable);
-
-	std::stringstream queryPart;
-
-	int periodStamp = 0;
-	while (resultSet.Next())
-	{
-		int currentPeriodStamp = GetPeriodStamp(resultSet.Int("timestamp"), rra);
-		if (currentPeriodStamp != periodStamp)
-		{
-			if (samples > 0)
-			{
-				mean = mean / samples;
-
-				queryPart.str("");
-				queryPart << "('" << deviceId << "', '" << dataId << "', FROM_UNIXTIME(" << currentPeriodStamp << "), '" << min << "', '" << mean << "', '" << max << "'),";
-				++writeCount;
-
-				int packetSize = StringStreamSize(query) + StringStreamSize(queryPart);
-				if (packetSize > maxMySQLPacketSize)
-				{
-					ExecuteInsertQuery(query);
-					InitializeInsertQuery(query, destinationTable);
-					std::cout << " Write Count: " << writeCount << std::endl;
-					writeCount = 0;
-				}
-
-				query << queryPart.str();
-			}
-
-			if (currentPeriodStamp > newestSourcePeriodStamp)
-				break;
-
-			samples = 0;
-			min = std::numeric_limits<float>::max();
-			mean = 0;
-			max = std::numeric_limits<float>::min();
-			periodStamp = currentPeriodStamp;
-		}
-
-		float minValue, meanValue, maxValue;
-		minValue = static_cast<float>(resultSet.Double("min-value"));
-		meanValue = static_cast<float>(resultSet.Double("mean-value"));
-		maxValue = static_cast<float>(resultSet.Double("max-value"));
-
-		if (minValue < min)
-			min = minValue;
-		if (maxValue > max)
-			max = maxValue;
-		mean += meanValue;
-		++samples;
-	}
-
-	if (writeCount > 0)
-		ExecuteInsertQuery(query);
-
-	std::cout << " Write Count: " << writeCount << std::endl << std::endl;
-}
-
-void DataCondenser::WriteReducedStringData(int deviceId, int dataId, const RRA::type& rra)
-{
-	int maxMySQLPacketSize = GetMaximumMySQLPacketSize() - 2;
-
-	std::string sourceTable = GetSourceTable(rra);
-	std::string destinationTable = GetDestinationTable(rra);
-
-	int newestSourcePeriodStamp = GetNewestPeriodStamp(sourceTable, deviceId, dataId);
-	int newestDestinationPeriodStamp = GetNewestPeriodStamp(destinationTable, deviceId, dataId);
-
-	std::cout << " DeviceID: " << deviceId << std::endl;
-	std::cout << " DataID: " << dataId << std::endl;
-	std::cout << "  Newest Src : " << newestSourcePeriodStamp << std::endl;
-	std::cout << "  Newest Dest: " << newestDestinationPeriodStamp << std::endl;
-
-	std::stringstream query;
-	MySQL::MySQLResultSet resultSet;
-	if (sourceTable == "datalog")
-		resultSet = GetRawData(sourceTable, deviceId, dataId, newestDestinationPeriodStamp);
-	else
-		resultSet = GetReducedData(sourceTable, deviceId, dataId, newestDestinationPeriodStamp);
-
-	std::cout << " Source Count: " << resultSet.RowsCount() << std::endl;
-
-	int samples = 0;
-	std::vector<std::string> values;
-
-	int writeCount = 0;
-
-	InitializeInsertQuery(query, destinationTable);
-
-	std::stringstream queryPart;
-
-	int periodStamp = 0;
-	while (resultSet.Next())
-	{
-		int currentPeriodStamp = GetPeriodStamp(resultSet.Int("timestamp"), rra);
-		if (currentPeriodStamp != periodStamp)
-		{
-			if (samples > 0)
-			{
-				std::map<std::string, int> map;
-				for (auto& value : values)
-				{
-					auto iterator = map.find(value);
-					if (iterator == map.end())
-						map.insert(std::pair<std::string, int>(value, 1));
-					else
-						map[value] += 1;
-				}
-
-				auto iterator = map.begin();
-				for (std::map<std::string, int>::iterator it = map.begin(); it != map.end(); ++it)
-				{
-					if (it->second > iterator->second)
-						iterator = it;
-				}
-
-				queryPart.str("");
-				queryPart << "('" << deviceId << "', '" << dataId << "', FROM_UNIXTIME(" << currentPeriodStamp << "), '', '" << iterator->first << "', ''),";
-
-				int packetSize = StringStreamSize(query) + StringStreamSize(queryPart);
-				if (packetSize > maxMySQLPacketSize)
-				{
-					ExecuteInsertQuery(query);
-					InitializeInsertQuery(query, destinationTable);
-					std::cout << " Write Count: " << writeCount << std::endl;
-					writeCount = 0;
-				}
-
-				query << queryPart.str();
-				++writeCount;
-			}
-
-			if (currentPeriodStamp > newestSourcePeriodStamp)
-				break;
-
-			samples = 0;
-			values.clear();
-			periodStamp = currentPeriodStamp;
-		}
-
-		std::string meanValue = resultSet.String("mean-value");
-		values.push_back(meanValue);
-		++samples;
-	}
-
-	if (writeCount > 0)
-		ExecuteInsertQuery(query);
-
-	std::cout << " Write Count: " << writeCount << std::endl << std::endl;
-}
-
-void DataCondenser::CleanData()
-{
-	std::cout << "Cleaning: " << std::endl;
-
-	CleanData(RRA::Daily);
-	CleanData(RRA::Weekly);
-	CleanData(RRA::Monthly);
-	CleanData(RRA::Yearly);
-}
-
-void DataCondenser::CleanData(const RRA::type& rra)
-{
-	std::string table = GetDestinationTable(rra);
-	int periodInSeconds = static_cast<int>(rra) * 400;
-	int timestamp = Util::GetTimestamp() + Util::GetUTCOffset() - periodInSeconds;
-
-	std::cout << " Cleaning " << table << " (<" << timestamp << ")" << std::endl;
-
-	std::stringstream query;
-	query << "DELETE FROM `" << table << "` WHERE `timestamp` < FROM_UNIXTIME(" << timestamp << ");";
-	m_pMySQLClient->Execute(query.str());
-}
-
-} // namespace DataInterface
-} // namespace DataStorageInterface
+#include "DataCondenser.h"
+#include <DataId.h>
+#include <DateTime.h>
+#include <limits>
+#include <map>
+#include <memory>
+#include <sstream>
+
+
+namespace DataStorageInterface {
+namespace DataInterface {
+
+DataCondenser::DataCondenser(MySQL::MySQLClient* pMySQLClient) :
+	m_pMySQLClient(pMySQLClient)
+{
+}
+
+void DataCondenser::CondenseData()
+{
+	auto deviceIDs = GetDeviceIDs();
+
+	for (int dataId = DataStorage::DataId::Min(); dataId <= DataStorage::DataId::Max(); ++dataId)
+	{
+		//for (auto& deviceId : deviceIDs)
+		//	WriteReducedData(deviceId, dataId, RRA::Daily);
+		for (auto& deviceId : deviceIDs)
+			WriteReducedData(deviceId, dataId, RRA::Weekly);
+		for (auto& deviceId : deviceIDs)
+			WriteReducedData(deviceId, dataId, RRA::Monthly);
+		for (auto& deviceId : deviceIDs)
+			WriteReducedData(deviceId, dataId, RRA::Yearly);
+	}
+
+	CleanData();
+}
+
+int DataCondenser::GetPeriodStamp(int timestamp, const RRA::type& rra)
+{
+	int modulo = timestamp % static_cast<int>(rra);
+	return timestamp - modulo;
+}
+
+std::vector<int> DataCondenser::GetDeviceIDs()
+{
+	std::stringstream query;
+	query << "SELECT DISTINCT `device_id` FROM `datalog` ORDER BY `device_id`;";
+	auto resultSet = m_pMySQLClient->ExecuteQuery(query.str());
+
+	std::vector<int> returnValue;
+	while (resultSet.Next())
+		returnValue.push_back(resultSet.Int("device_id"));
+
+	return returnValue;
+}
+
+int DataCondenser::GetNewestPeriodStamp(const std::string& sourceTable, int deviceId, int dataId)
+{
+	std::stringstream query;
+	query << "SELECT MAX(UNIX_TIMESTAMP(`timestamp`)) AS timestamp FROM `" << sourceTable << "` WHERE `device_id` = '" << deviceId <<"' AND `data_id` = '" << dataId << "';";
+	auto resultSet = m_pMySQLClient->ExecuteQuery(query.str());
+	if (resultSet.Next())
+		return resultSet.Int("timestamp");
+
+	return 0;
+}
+
+MySQL::MySQLResultSet DataCondenser::GetRawData(const std::string& sourceTable, int deviceId, int dataId, int newestPeriodStamp)
+{
+	std::stringstream query;
+	query << "SELECT UNIX_TIMESTAMP(`timestamp`) AS timestamp, `value` AS `min-value`, `value` AS `mean-value`, `value` AS `max-value` FROM `" << sourceTable << "` WHERE `device_id` = '" << deviceId <<"' AND `data_id` = '" << dataId << "' ";
+	if (newestPeriodStamp == -1)
+		query << "LIMIT 1;";
+	else
+		query << "AND `timestamp` > FROM_UNIXTIME(" << newestPeriodStamp << ") ORDER BY `timestamp` ASC;";
+
+	return m_pMySQLClient->ExecuteQuery(query.str());
+}
+
+MySQL::MySQLResultSet DataCondenser::GetReducedData(const std::string& sourceTable, int deviceId, int dataId, int newestPeriodStamp)
+{
+	std::stringstream query;
+	query << "SELECT UNIX_TIMESTAMP(`timestamp`) AS timestamp, `min-value`, `mean-value`, `max-value` FROM `" << sourceTable << "` WHERE `device_id` = '" << deviceId <<"' AND `data_id` = '" << dataId << "' ";
+	if (newestPeriodStamp == -1)
+		query << "LIMIT 1;";
+	else
+		query << "AND `timestamp` > FROM_UNIXTIME(" << newestPeriodStamp << ") ORDER BY `timestamp` ASC;";
+
+	return m_pMySQLClient->ExecuteQuery(query.str());
+}
+
+int DataCondenser::GetMaximumMySQLPacketSize()
+{
+	std::stringstream query;
+	query << "show variables like 'max_allowed_packet';";
+	MySQL::MySQLResultSet resultSet = m_pMySQLClient->ExecuteQuery(query.str());
+
+	if (resultSet.RowsCount() != 1)
+		throw std::runtime_error("Unexpected reply when retrieving max_allowed_packet.");
+
+	resultSet.Next();
+	return resultSet.Int("Value");
+}
+
+DataType::type DataCondenser::GetDataType(int deviceId, int dataId)
+{
+	auto resultSet = GetRawData("datalog", deviceId, dataId, -1);
+	if (resultSet.Next())
+		return DataInterface::GetDataType(resultSet.String("mean-value"));
+
+	return DataType::None;
+}
+
+std::string DataCondenser::GetSourceTable(const RRA::type& rra)
+{
+	switch (rra)
+	{
+	case RRA::Daily:
+		return "datalog";
+	case RRA::Weekly:
+		return "datalog";
+	case RRA::Monthly:
+		return "datalog-weekly";
+	case RRA::Yearly:
+		return "datalog-monthly";
+	default:
+	case RRA::Raw:
+		return std::string();
+	}
+}
+
+std::string DataCondenser::GetDestinationTable(const RRA::type& rra)
+{
+	switch (rra)
+	{
+	case RRA::Daily:
+		return "datalog-daily";
+	case RRA::Weekly:
+		return "datalog-weekly";
+	case RRA::Monthly:
+		return "datalog-monthly";
+	case RRA::Yearly:
+		return "datalog-yearly";
+	default:
+	case RRA::Raw:
+		return std::string();
+	}
+}
+
+int DataCondenser::StringStreamSize(std::stringstream& ss)
+{
+	ss.seekp(0, std::ios_base::end);
+	return ss.tellp();
+}
+
+void DataCondenser::InitializeInsertQuery(std::stringstream& query, const std::string& table)
+{
+	query.str("");
+	query << "INSERT INTO `" << table << "` (`device_id`, `data_id`, `timestamp`, `min-value`, `mean-value`, `max-value`) VALUES ";
+}
+
+void DataCondenser::ExecuteInsertQuery(std::stringstream& query)
+{
+	query.seekp(-1, std::ios_base::end);
+	query << ";";
+	m_pMySQLClient->Execute(query.str());
+}
+
+void DataCondenser::WriteReducedData(int deviceId, int dataId, const RRA::type& rra)
+{
+	DataType::type dataType = GetDataType(deviceId, dataId);
+
+	switch (dataType)
+	{
+	case DataType::Float:
+		WriteReducedFloatData(deviceId, dataId, rra);
+		break;
+	case DataType::String:
+		WriteReducedStringData(deviceId, dataId, rra);
+		break;
+	default:
+	case DataType::None:
+		break;
+	}
+}
+
+void DataCondenser::WriteReducedFloatData(int deviceId, int dataId, const RRA::type& rra)
+{
+	int maxMySQLPacketSize = GetMaximumMySQLPacketSize() - 2;
+
+	std::string sourceTable = GetSourceTable(rra);
+	std::string destinationTable = GetDestinationTable(rra);
+
+	int newestSourcePeriodStamp = GetNewestPeriodStamp(sourceTable, deviceId, dataId);
+	int newestDestinationPeriodStamp = GetNewestPeriodStamp(destinationTable, deviceId, dataId);
+
+	std::stringstream query;
+	MySQL::MySQLResultSet resultSet;
+	if (sourceTable == "datalog")
+		resultSet = GetRawData(sourceTable, deviceId, dataId, newestDestinationPeriodStamp);
+	else
+		resultSet = GetReducedData(sourceTable, deviceId, dataId, newestDestinationPeriodStamp);
+
+	int samples = 0;
+	float min = std::numeric_limits<float>::max();
+	float mean = 0;
+	float max = std::numeric_limits<float>::min();
+
+	int writeCount = 0;
+
+	InitializeInsertQuery(query, destinationTable);
+
+	std::stringstream queryPart;
+
+	int periodStamp = 0;
+	while (resultSet.Next())
+	{
+		int currentPeriodStamp = GetPeriodStamp(resultSet.Int("timestamp"), rra);
+		if (currentPeriodStamp != periodStamp)
+		{
+			if (samples > 0)
+			{
+				mean = mean / samples;
+
+				queryPart.str("");
+				queryPart << "('" << deviceId << "', '" << dataId << "', FROM_UNIXTIME(" << currentPeriodStamp << "), '" << min << "', '" << mean << "', '" << max << "'),";
+				++writeCount;
+
+				int packetSize = StringStreamSize(query) + StringStreamSize(queryPart);
+				if (packetSize > maxMySQLPacketSize)
+				{
+					ExecuteInsertQuery(query);
+					InitializeInsertQuery(query, destinationTable);
+					writeCount = 0;
+				}
+
+				query << queryPart.str();
+			}
+
+			if (currentPeriodStamp > newestSourcePeriodStamp)
+				break;
+
+			samples = 0;
+			min = std::numeric_limits<float>::max();
+			mean = 0;
+			max = std::numeric_limits<float>::min();
+			periodStamp = currentPeriodStamp;
+		}
+
+		float minValue, meanValue, maxValue;
+		minValue = static_cast<float>(resultSet.Double("min-value"));
+		meanValue = static_cast<float>(resultSet.Double("mean-value"));
+		maxValue = static_cast<float>(resultSet.Double("max-value"));
+
+		if (minValue < min)
+			min = minValue;
+		if (maxValue > max)
+			max = maxValue;
+		mean += meanValue;
+		++samples;
+	}
+
+	if (writeCount > 0)
+		ExecuteInsertQuery(query);
+}
+
+void DataCondenser::WriteReducedStringData(int deviceId, int dataId, const RRA::type& rra)
+{
+	int maxMySQLPacketSize = GetMaximumMySQLPacketSize() - 2;
+
+	std::string sourceTable = GetSourceTable(rra);
+	std::string destinationTable = GetDestinationTable(rra);
+
+	int newestSourcePeriodStamp = GetNewestPeriodStamp(sourceTable, deviceId, dataId);
+	int newestDestinationPeriodStamp = GetNewestPeriodStamp(destinationTable, deviceId, dataId);
+
+	std::stringstream query;
+	MySQL::MySQLResultSet resultSet;
+	if (sourceTable == "datalog")
+		resultSet = GetRawData(sourceTable, deviceId, dataId, newestDestinationPeriodStamp);
+	else
+		resultSet = GetReducedData(sourceTable, deviceId, dataId, newestDestinationPeriodStamp);
+
+	int samples = 0;
+	std::vector<std::string> values;
+
+	int writeCount = 0;
+
+	InitializeInsertQuery(query, destinationTable);
+
+	std::stringstream queryPart;
+
+	int periodStamp = 0;
+	while (resultSet.Next())
+	{
+		int currentPeriodStamp = GetPeriodStamp(resultSet.Int("timestamp"), rra);
+		if (currentPeriodStamp != periodStamp)
+		{
+			if (samples > 0)
+			{
+				std::map<std::string, int> map;
+				for (auto& value : values)
+				{
+					auto iterator = map.find(value);
+					if (iterator == map.end())
+						map.insert(std::pair<std::string, int>(value, 1));
+					else
+						map[value] += 1;
+				}
+
+				auto iterator = map.begin();
+				for (std::map<std::string, int>::iterator it = map.begin(); it != map.end(); ++it)
+				{
+					if (it->second > iterator->second)
+						iterator = it;
+				}
+
+				queryPart.str("");
+				queryPart << "('" << deviceId << "', '" << dataId << "', FROM_UNIXTIME(" << currentPeriodStamp << "), '', '" << iterator->first << "', ''),";
+
+				int packetSize = StringStreamSize(query) + StringStreamSize(queryPart);
+				if (packetSize > maxMySQLPacketSize)
+				{
+					ExecuteInsertQuery(query);
+					InitializeInsertQuery(query, destinationTable);
+					writeCount = 0;
+				}
+
+				query << queryPart.str();
+				++writeCount;
+			}
+
+			if (currentPeriodStamp > newestSourcePeriodStamp)
+				break;
+
+			samples = 0;
+			values.clear();
+			periodStamp = currentPeriodStamp;
+		}
+
+		std::string meanValue = resultSet.String("mean-value");
+		values.push_back(meanValue);
+		++samples;
+	}
+
+	if (writeCount > 0)
+		ExecuteInsertQuery(query);
+}
+
+void DataCondenser::CleanData()
+{
+	CleanData(RRA::Daily);
+	CleanData(RRA::Weekly);
+	CleanData(RRA::Monthly);
+	CleanData(RRA::Yearly);
+}
+
+void DataCondenser::CleanData(const RRA::type& rra)
+{
+	std::string table = GetDestinationTable(rra);
+	int periodInSeconds = static_cast<int>(rra) * 400;
+	int timestamp = DateTime::GetTimestamp() + DateTime::GetUTCOffset() - periodInSeconds;
+
+	std::stringstream query;
+	query << "DELETE FROM `" << table << "` WHERE `timestamp` < FROM_UNIXTIME(" << timestamp << ");";
+	m_pMySQLClient->Execute(query.str());
+}
+
+} // namespace DataInterface
+} // namespace DataStorageInterface

+ 52 - 52
DataInterface/DataCondenser.h

@@ -1,52 +1,52 @@
-#ifndef DATAINTERFACE_DATACONDENSER_H
-#define DATAINTERFACE_DATACONDENSER_H
-
-#include "DataType.h"
-#include "RRA.h"
-#include <MySQLClient.h>
-#include <string>
-#include <vector>
-
-
-namespace DataStorageInterface {
-namespace DataInterface {
-
-class DataCondenser
-{
-public:
-	DataCondenser(MySQL::MySQLClient* pMySQLClient);
-
-	void CondenseData();
-
-private:
-	int GetPeriodStamp(int timestamp, const RRA::type& rra);
-	std::vector<int> GetDeviceIDs();
-	int GetNewestPeriodStamp(const std::string& sourceTable, int deviceId, int dataId);
-	MySQL::MySQLResultSet GetRawData(const std::string& sourceTable, int deviceId, int dataId, int newestPeriodStamp);
-	MySQL::MySQLResultSet GetReducedData(const std::string& sourceTable, int deviceId, int dataId, int newestPeriodStamp);
-
-	int GetMaximumMySQLPacketSize();
-
-	DataType::type GetDataType(int deviceId, int dataId);
-	std::string GetSourceTable(const RRA::type& rra);
-	std::string GetDestinationTable(const RRA::type& rra);
-	int StringStreamSize(std::stringstream& ss);
-
-	void InitializeInsertQuery(std::stringstream& query, const std::string& table);
-	void ExecuteInsertQuery(std::stringstream& query);
-
-	void WriteReducedFloatData(int deviceId, int dataId, const RRA::type& rra);
-	void WriteReducedStringData(int deviceId, int dataId, const RRA::type& rra);
-	void WriteReducedData(int deviceId, int dataId, const RRA::type& rra);
-
-	void CleanData();
-	void CleanData(const RRA::type& rra);
-
-private:
-	MySQL::MySQLClient* m_pMySQLClient;
-};
-
-} // namespace DataInterface
-} // namespace DataStorageInterface
-
-#endif // DATAINTERFACE_DATACONDENSER_H
+#ifndef DATAINTERFACE_DATACONDENSER_H
+#define DATAINTERFACE_DATACONDENSER_H
+
+#include "DataType.h"
+#include "RRA.h"
+#include <MySQLClient.h>
+#include <string>
+#include <vector>
+
+
+namespace DataStorageInterface {
+namespace DataInterface {
+
+class DataCondenser
+{
+public:
+	DataCondenser(MySQL::MySQLClient* pMySQLClient);
+
+	void CondenseData();
+
+private:
+	int GetPeriodStamp(int timestamp, const RRA::type& rra);
+	std::vector<int> GetDeviceIDs();
+	int GetNewestPeriodStamp(const std::string& sourceTable, int deviceId, int dataId);
+	MySQL::MySQLResultSet GetRawData(const std::string& sourceTable, int deviceId, int dataId, int newestPeriodStamp);
+	MySQL::MySQLResultSet GetReducedData(const std::string& sourceTable, int deviceId, int dataId, int newestPeriodStamp);
+
+	int GetMaximumMySQLPacketSize();
+
+	DataType::type GetDataType(int deviceId, int dataId);
+	std::string GetSourceTable(const RRA::type& rra);
+	std::string GetDestinationTable(const RRA::type& rra);
+	int StringStreamSize(std::stringstream& ss);
+
+	void InitializeInsertQuery(std::stringstream& query, const std::string& table);
+	void ExecuteInsertQuery(std::stringstream& query);
+
+	void WriteReducedFloatData(int deviceId, int dataId, const RRA::type& rra);
+	void WriteReducedStringData(int deviceId, int dataId, const RRA::type& rra);
+	void WriteReducedData(int deviceId, int dataId, const RRA::type& rra);
+
+	void CleanData();
+	void CleanData(const RRA::type& rra);
+
+private:
+	MySQL::MySQLClient* m_pMySQLClient;
+};
+
+} // namespace DataInterface
+} // namespace DataStorageInterface
+
+#endif // DATAINTERFACE_DATACONDENSER_H

+ 139 - 139
DataInterface/DataImporter.cpp

@@ -1,139 +1,139 @@
-#include "DataImporter.h"
-#include <DataId.h>
-#include <Logging.h>
-#include <MySQLClient.h>
-#include <SQLiteClient.h>
-#include <StringAlgorithm.h>
-#include <sstream>
-
-
-namespace DataStorageInterface {
-namespace DataInterface {
-
-DataImporter::DataImporter(MySQL::MySQLClient* pMySQLClient, SQLite::SQLiteClient* pSQLiteClient) :
-	m_pMySQLClient(pMySQLClient),
-	m_pSQLiteClient(pSQLiteClient)
-{
-}
-
-void DataImporter::ImportData()
-{
-	auto tables = GetTables();
-	for (auto& table : tables)
-	{
-		auto devices = GetDevices(table);
-		for (auto& device : devices)
-			ImportData(table, device);
-	}
-}
-
-std::vector<std::string> DataImporter::GetTables()
-{
-	std::stringstream query;
-	query << "SELECT name FROM sqlite_master WHERE type ='table' AND name NOT LIKE 'sqlite_%';";
-	auto resultSet = m_pSQLiteClient->ExecuteQuery(query.str());
-
-	std::vector<std::string> tables;
-	while (resultSet.Next())
-		tables.push_back(resultSet.String("name"));
-
-	return tables;
-}
-
-std::vector<int> DataImporter::GetDevices(const std::string& table)
-{
-	std::stringstream query;
-	query << "SELECT DISTINCT device_id FROM " << table << ";";
-	auto resultSet = m_pSQLiteClient->ExecuteQuery(query.str());
-
-	std::vector<int> devices;
-	while (resultSet.Next())
-		devices.push_back(resultSet.Int("device_id"));
-
-	return devices;
-}
-
-void DataImporter::ImportData(const std::string& table, int device)
-{
-	std::stringstream ss;
-	ss << "Table: " << table << " #" << device;
-	Logging::Log(Logging::Severity::Info, ss.str());
-
-	int maxMySQLPacketSize = GetMaximumMySQLPacketSize() - 2;
-	int maxTimestamp = GetMaximumTimestamp(table, device);
-	int dataId = DataStorage::DataId(table).Id();
-
-	std::stringstream queryPart;
-
-	std::stringstream query;
-	query << "SELECT timestamp, value FROM " << table << " WHERE device_id = '" << device << "' AND timestamp > '" << maxTimestamp << "' ORDER BY timestamp ASC;";
-	auto resultSet = m_pSQLiteClient->ExecuteQuery(query.str());
-
-	int writeCount = 0;
-	InitializeInsertQuery(query);
-	while (resultSet.Next())
-	{
-		queryPart.str("");
-		queryPart << "('" << device << "', '" << dataId << "', FROM_UNIXTIME(" << resultSet.Int("timestamp") << "), '" << resultSet.String("value") << "'),";
-		++writeCount;
-
-		int packetSize = StringStreamSize(query) + StringStreamSize(queryPart);
-		if (packetSize > maxMySQLPacketSize)
-		{
-			ExecuteInsertQuery(query);
-			writeCount = 0;
-			InitializeInsertQuery(query);
-		}
-
-		query << queryPart.str();
-	}
-
-	if (writeCount > 0)
-		ExecuteInsertQuery(query);
-}
-
-int DataImporter::GetMaximumMySQLPacketSize()
-{
-	std::stringstream query;
-	query << "show variables like 'max_allowed_packet';";
-	MySQL::MySQLResultSet resultSet = m_pMySQLClient->ExecuteQuery(query.str());
-
-	if (resultSet.RowsCount() != 1)
-		throw std::runtime_error("Unexpected reply when retrieving max_allowed_packet.");
-
-	resultSet.Next();
-	return resultSet.Int("Value");
-}
-
-int DataImporter::StringStreamSize(std::stringstream& ss)
-{
-	ss.seekp(0, std::ios_base::end);
-	return ss.tellp();
-}
-
-void DataImporter::InitializeInsertQuery(std::stringstream& query)
-{
-	query.str("");
-	query << "INSERT INTO `datalog` (`device_id`, `data_id`, `timestamp`, `value`) VALUES ";
-}
-
-void DataImporter::ExecuteInsertQuery(std::stringstream& query)
-{
-	query.seekp(-1, std::ios_base::end);
-	query << ";";
-	m_pMySQLClient->Execute(query.str());
-}
-
-int DataImporter::GetMaximumTimestamp(const std::string& table, int device)
-{
-	std::stringstream query;
-	query << "SELECT MAX(UNIX_TIMESTAMP(`timestamp`)) AS timestamp FROM `datalog` WHERE `device_id` = '" << device <<"' AND `data_id` = '" << DataStorage::DataId(table).Id() << "';";
-	auto resultSet = m_pMySQLClient->ExecuteQuery(query.str());
-	if (resultSet.Next())
-		return resultSet.Int("timestamp");
-
-	return 0;
-}
-
-} // namespace DataInterface
-} // namespace DataStorageInterface
+#include "DataImporter.h"
+#include <DataId.h>
+#include <Logging.h>
+#include <MySQLClient.h>
+#include <SQLiteClient.h>
+#include <StringAlgorithm.h>
+#include <sstream>
+
+
+namespace DataStorageInterface {
+namespace DataInterface {
+
+DataImporter::DataImporter(MySQL::MySQLClient* pMySQLClient, SQLite::SQLiteClient* pSQLiteClient) :
+	m_pMySQLClient(pMySQLClient),
+	m_pSQLiteClient(pSQLiteClient)
+{
+}
+
+void DataImporter::ImportData()
+{
+	auto tables = GetTables();
+	for (auto& table : tables)
+	{
+		auto devices = GetDevices(table);
+		for (auto& device : devices)
+			ImportData(table, device);
+	}
+}
+
+std::vector<std::string> DataImporter::GetTables()
+{
+	std::stringstream query;
+	query << "SELECT name FROM sqlite_master WHERE type ='table' AND name NOT LIKE 'sqlite_%';";
+	auto resultSet = m_pSQLiteClient->ExecuteQuery(query.str());
+
+	std::vector<std::string> tables;
+	while (resultSet.Next())
+		tables.push_back(resultSet.String("name"));
+
+	return tables;
+}
+
+std::vector<int> DataImporter::GetDevices(const std::string& table)
+{
+	std::stringstream query;
+	query << "SELECT DISTINCT device_id FROM " << table << ";";
+	auto resultSet = m_pSQLiteClient->ExecuteQuery(query.str());
+
+	std::vector<int> devices;
+	while (resultSet.Next())
+		devices.push_back(resultSet.Int("device_id"));
+
+	return devices;
+}
+
+void DataImporter::ImportData(const std::string& table, int device)
+{
+	std::stringstream ss;
+	ss << "Table: " << table << " #" << device;
+	Logging::Log(Logging::Severity::Info, ss.str());
+
+	int maxMySQLPacketSize = GetMaximumMySQLPacketSize() - 2;
+	int maxTimestamp = GetMaximumTimestamp(table, device);
+	int dataId = DataStorage::DataId(table).Id();
+
+	std::stringstream queryPart;
+
+	std::stringstream query;
+	query << "SELECT timestamp, value FROM " << table << " WHERE device_id = '" << device << "' AND timestamp > '" << maxTimestamp << "' ORDER BY timestamp ASC;";
+	auto resultSet = m_pSQLiteClient->ExecuteQuery(query.str());
+
+	int writeCount = 0;
+	InitializeInsertQuery(query);
+	while (resultSet.Next())
+	{
+		queryPart.str("");
+		queryPart << "('" << device << "', '" << dataId << "', FROM_UNIXTIME(" << resultSet.Int("timestamp") << "), '" << resultSet.String("value") << "'),";
+		++writeCount;
+
+		int packetSize = StringStreamSize(query) + StringStreamSize(queryPart);
+		if (packetSize > maxMySQLPacketSize)
+		{
+			ExecuteInsertQuery(query);
+			writeCount = 0;
+			InitializeInsertQuery(query);
+		}
+
+		query << queryPart.str();
+	}
+
+	if (writeCount > 0)
+		ExecuteInsertQuery(query);
+}
+
+int DataImporter::GetMaximumMySQLPacketSize()
+{
+	std::stringstream query;
+	query << "show variables like 'max_allowed_packet';";
+	MySQL::MySQLResultSet resultSet = m_pMySQLClient->ExecuteQuery(query.str());
+
+	if (resultSet.RowsCount() != 1)
+		throw std::runtime_error("Unexpected reply when retrieving max_allowed_packet.");
+
+	resultSet.Next();
+	return resultSet.Int("Value");
+}
+
+int DataImporter::StringStreamSize(std::stringstream& ss)
+{
+	ss.seekp(0, std::ios_base::end);
+	return ss.tellp();
+}
+
+void DataImporter::InitializeInsertQuery(std::stringstream& query)
+{
+	query.str("");
+	query << "INSERT INTO `datalog` (`device_id`, `data_id`, `timestamp`, `value`) VALUES ";
+}
+
+void DataImporter::ExecuteInsertQuery(std::stringstream& query)
+{
+	query.seekp(-1, std::ios_base::end);
+	query << ";";
+	m_pMySQLClient->Execute(query.str());
+}
+
+int DataImporter::GetMaximumTimestamp(const std::string& table, int device)
+{
+	std::stringstream query;
+	query << "SELECT MAX(UNIX_TIMESTAMP(`timestamp`)) AS timestamp FROM `datalog` WHERE `device_id` = '" << device <<"' AND `data_id` = '" << DataStorage::DataId(table).Id() << "';";
+	auto resultSet = m_pMySQLClient->ExecuteQuery(query.str());
+	if (resultSet.Next())
+		return resultSet.Int("timestamp");
+
+	return 0;
+}
+
+} // namespace DataInterface
+} // namespace DataStorageInterface

+ 49 - 49
DataInterface/DataImporter.h

@@ -1,49 +1,49 @@
-#ifndef DATAINTERFACE_DATAIMPORTER_H
-#define DATAINTERFACE_DATAIMPORTER_H
-
-#include <string>
-#include <vector>
-
-
-namespace MySQL {
-
-class MySQLClient;
-
-} // namespace MySQL
-
-namespace SQLite {
-
-class SQLiteClient;
-
-} // namespace SQLite
-
-namespace DataStorageInterface {
-namespace DataInterface {
-
-class DataImporter
-{
-public:
-	DataImporter(MySQL::MySQLClient* pMySQLClient, SQLite::SQLiteClient* pSQLiteClient);
-
-	void ImportData();
-
-private:
-	std::vector<std::string> GetTables();
-	std::vector<int> GetDevices(const std::string& table);
-	void ImportData(const std::string& table, int device);
-
-	int GetMaximumMySQLPacketSize();
-	int StringStreamSize(std::stringstream& ss);
-	void InitializeInsertQuery(std::stringstream& query);
-	void ExecuteInsertQuery(std::stringstream& query);
-	int GetMaximumTimestamp(const std::string& table, int device);
-
-private:
-	MySQL::MySQLClient* m_pMySQLClient;
-	SQLite::SQLiteClient* m_pSQLiteClient;
-};
-
-} // namespace DataInterface
-} // namespace DataStorageInterface
-
-#endif // DATAINTERFACE_DATAIMPORTER_H
+#ifndef DATAINTERFACE_DATAIMPORTER_H
+#define DATAINTERFACE_DATAIMPORTER_H
+
+#include <string>
+#include <vector>
+
+
+namespace MySQL {
+
+class MySQLClient;
+
+} // namespace MySQL
+
+namespace SQLite {
+
+class SQLiteClient;
+
+} // namespace SQLite
+
+namespace DataStorageInterface {
+namespace DataInterface {
+
+class DataImporter
+{
+public:
+	DataImporter(MySQL::MySQLClient* pMySQLClient, SQLite::SQLiteClient* pSQLiteClient);
+
+	void ImportData();
+
+private:
+	std::vector<std::string> GetTables();
+	std::vector<int> GetDevices(const std::string& table);
+	void ImportData(const std::string& table, int device);
+
+	int GetMaximumMySQLPacketSize();
+	int StringStreamSize(std::stringstream& ss);
+	void InitializeInsertQuery(std::stringstream& query);
+	void ExecuteInsertQuery(std::stringstream& query);
+	int GetMaximumTimestamp(const std::string& table, int device);
+
+private:
+	MySQL::MySQLClient* m_pMySQLClient;
+	SQLite::SQLiteClient* m_pSQLiteClient;
+};
+
+} // namespace DataInterface
+} // namespace DataStorageInterface
+
+#endif // DATAINTERFACE_DATAIMPORTER_H

+ 117 - 117
DataInterface/GraphClient.cpp

@@ -1,117 +1,117 @@
-#include "GraphClient.h"
-#include "Util/Util.h"
-#include <MySQLClient.h>
-#include <StringAlgorithm.h>
-#include <sstream>
-
-
-namespace DataStorageInterface {
-namespace DataInterface {
-
-GraphClient::GraphClient(MySQL::MySQLClient* pMySQLClient) :
-	m_pMySQLClient(pMySQLClient)
-{
-}
-
-nlohmann::json GraphClient::GetGraphHeader(const std::vector<int>& dataIds, DataStorage::Timespan::type timespan)
-{
-	int currentTimestamp = Util::GetTimestamp() + Util::GetUTCOffset();
-	return GetGraphHeader(timespan, currentTimestamp);
-}
-
-nlohmann::json GraphClient::GetGraphData(int deviceId, const std::vector<int>& dataIds, DataStorage::Timespan::type timespan)
-{
-	int currentTimestamp = Util::GetTimestamp() + Util::GetUTCOffset();
-	nlohmann::json json = GetGraphHeader(timespan, currentTimestamp);
-
-	std::string table;
-	switch (timespan)
-	{
-		case DataStorage::Timespan::Day:
-			table = "datalog"; // "datalog-daily"
-			break;
-		case DataStorage::Timespan::Week:
-			table = "datalog-weekly";
-			break;
-		case DataStorage::Timespan::Month:
-			table = "datalog-monthly";
-			break;
-		case DataStorage::Timespan::Year:
-			table = "datalog-yearly";
-			break;
-		default:
-		case DataStorage::Timespan::Unknown:
-			return nlohmann::json();
-	}
-
-	nlohmann::json rootData = nlohmann::json::array();
-	std::stringstream query;
-
-	int startTimestamp = Util::GetTimestamp() - timespan;
-	for (auto dataId : dataIds)
-	{
-		nlohmann::json data;
-		nlohmann::json dataArray;
-		query.str("");
-
-		if (StringAlgorithm::iequals(table, "datalog"))
-			query << "SELECT UNIX_TIMESTAMP(`d`.`timestamp`) AS timestamp, `d`.`value` AS 'min-value', `d`.`value` AS 'mean-value', `d`.`value` AS 'max-value' ";
-		else
-			query << "SELECT UNIX_TIMESTAMP(`d`.`timestamp`) AS timestamp, `d`.`min-value`, `d`.`mean-value`, `d`.`max-value` ";
-
-		query << "FROM `" << table << "` AS `d` ";
-		query << "WHERE `d`.`device_id` = '" << deviceId << "' ";
-		query << "AND `d`.`data_id` = '" << dataId << "' ";
-		query << "AND UNIX_TIMESTAMP(`d`.`timestamp`) > '" << startTimestamp <<  "';";
-
-		auto resultSet = m_pMySQLClient->ExecuteQuery(query.str());
-
-		int offset = Util::GetUTCOffset();
-		int timestamp;
-		double minValue, meanValue, maxValue;
-		if (resultSet.RowsCount() > 0)
-		{
-			while (resultSet.Next())
-			{
-				timestamp = resultSet.Int("timestamp") + offset;
-				minValue = resultSet.Double("min-value");
-				meanValue = resultSet.Double("mean-value");
-				maxValue = resultSet.Double("max-value");
-
-				nlohmann::json entry = nlohmann::json::array();
-				entry.push_back(timestamp);
-
-				std::stringstream entryValue;
-				if (StringAlgorithm::iequals(table, "datalog"))
-					entryValue << std::fixed << meanValue;
-				else
-					entryValue << std::fixed << minValue << ", " << std::fixed << maxValue;
-				entry.push_back(entryValue.str());
-
-				dataArray.push_back(entry);
-			}
-
-			data["data"] = dataArray;
-			rootData.push_back(data);
-		}
-	}
-
-	json["data"] = rootData;
-
-	return json;
-}
-
-nlohmann::json GraphClient::GetGraphHeader(DataStorage::Timespan::type timespan, std::time_t currentTimestamp)
-{
-	int startTimestamp = currentTimestamp - timespan;
-
-	nlohmann::json json;
-	json["timespan"] = DataStorage::Conversions::Timespan(timespan);
-	json["start"] = startTimestamp;
-	json["end"] = currentTimestamp;
-
-	return json;
-}
-
-} // namespace DataInterface
-} // namespace DataStorageInterface
+#include "GraphClient.h"
+#include <DateTime.h>
+#include <MySQLClient.h>
+#include <StringAlgorithm.h>
+#include <sstream>
+
+
+namespace DataStorageInterface {
+namespace DataInterface {
+
+GraphClient::GraphClient(MySQL::MySQLClient* pMySQLClient) :
+	m_pMySQLClient(pMySQLClient)
+{
+}
+
+nlohmann::json GraphClient::GetGraphHeader(const std::vector<int>& dataIds, DataStorage::Timespan::type timespan)
+{
+	int currentTimestamp = DateTime::GetTimestamp() + DateTime::GetUTCOffset();
+	return GetGraphHeader(timespan, currentTimestamp);
+}
+
+nlohmann::json GraphClient::GetGraphData(int deviceId, const std::vector<int>& dataIds, DataStorage::Timespan::type timespan)
+{
+	int currentTimestamp = DateTime::GetTimestamp() + DateTime::GetUTCOffset();
+	nlohmann::json json = GetGraphHeader(timespan, currentTimestamp);
+
+	std::string table;
+	switch (timespan)
+	{
+		case DataStorage::Timespan::Day:
+			table = "datalog"; // "datalog-daily"
+			break;
+		case DataStorage::Timespan::Week:
+			table = "datalog-weekly";
+			break;
+		case DataStorage::Timespan::Month:
+			table = "datalog-monthly";
+			break;
+		case DataStorage::Timespan::Year:
+			table = "datalog-yearly";
+			break;
+		default:
+		case DataStorage::Timespan::Unknown:
+			return nlohmann::json();
+	}
+
+	nlohmann::json rootData = nlohmann::json::array();
+	std::stringstream query;
+
+	int startTimestamp = DateTime::GetTimestamp() - timespan;
+	for (auto dataId : dataIds)
+	{
+		nlohmann::json data;
+		nlohmann::json dataArray;
+		query.str("");
+
+		if (StringAlgorithm::iequals(table, "datalog"))
+			query << "SELECT UNIX_TIMESTAMP(`d`.`timestamp`) AS timestamp, `d`.`value` AS 'min-value', `d`.`value` AS 'mean-value', `d`.`value` AS 'max-value' ";
+		else
+			query << "SELECT UNIX_TIMESTAMP(`d`.`timestamp`) AS timestamp, `d`.`min-value`, `d`.`mean-value`, `d`.`max-value` ";
+
+		query << "FROM `" << table << "` AS `d` ";
+		query << "WHERE `d`.`device_id` = '" << deviceId << "' ";
+		query << "AND `d`.`data_id` = '" << dataId << "' ";
+		query << "AND UNIX_TIMESTAMP(`d`.`timestamp`) > '" << startTimestamp <<  "';";
+
+		auto resultSet = m_pMySQLClient->ExecuteQuery(query.str());
+
+		int offset = DateTime::GetUTCOffset();
+		int timestamp;
+		double minValue, meanValue, maxValue;
+		if (resultSet.RowsCount() > 0)
+		{
+			while (resultSet.Next())
+			{
+				timestamp = resultSet.Int("timestamp") + offset;
+				minValue = resultSet.Double("min-value");
+				meanValue = resultSet.Double("mean-value");
+				maxValue = resultSet.Double("max-value");
+
+				nlohmann::json entry = nlohmann::json::array();
+				entry.push_back(timestamp);
+
+				std::stringstream entryValue;
+				if (StringAlgorithm::iequals(table, "datalog"))
+					entryValue << std::fixed << meanValue;
+				else
+					entryValue << std::fixed << minValue << ", " << std::fixed << maxValue;
+				entry.push_back(entryValue.str());
+
+				dataArray.push_back(entry);
+			}
+
+			data["data"] = dataArray;
+			rootData.push_back(data);
+		}
+	}
+
+	json["data"] = rootData;
+
+	return json;
+}
+
+nlohmann::json GraphClient::GetGraphHeader(DataStorage::Timespan::type timespan, std::time_t currentTimestamp)
+{
+	int startTimestamp = currentTimestamp - timespan;
+
+	nlohmann::json json;
+	json["timespan"] = DataStorage::Conversions::Timespan(timespan);
+	json["start"] = startTimestamp;
+	json["end"] = currentTimestamp;
+
+	return json;
+}
+
+} // namespace DataInterface
+} // namespace DataStorageInterface

+ 37 - 37
DataInterface/GraphClient.h

@@ -1,37 +1,37 @@
-#ifndef DATAINTERFACE_GRAPHCLIENT_H
-#define DATAINTERFACE_GRAPHCLIENT_H
-
-#include <Timespan.h>
-#include <json.hpp>
-#include <ctime>
-#include <vector>
-
-
-namespace MySQL {
-
-class MySQLClient;
-
-} // namespace MySQL
-
-namespace DataStorageInterface {
-namespace DataInterface {
-
-class GraphClient
-{
-public:
-	GraphClient(MySQL::MySQLClient* pMySQLClient);
-
-	nlohmann::json GetGraphHeader(const std::vector<int>& dataIds, DataStorage::Timespan::type timespan);
-	nlohmann::json GetGraphData(int deviceId, const std::vector<int>& dataIds, DataStorage::Timespan::type timespan);
-
-private:
-	nlohmann::json GetGraphHeader(DataStorage::Timespan::type timespan, std::time_t currentTimestamp);
-
-private:
-	MySQL::MySQLClient* m_pMySQLClient;
-};
-
-} // namespace DataInterface
-} // namespace DataStorageInterface
-
-#endif // DATAINTERFACE_GRAPHCLIENT_H
+#ifndef DATAINTERFACE_GRAPHCLIENT_H
+#define DATAINTERFACE_GRAPHCLIENT_H
+
+#include <Timespan.h>
+#include <json.hpp>
+#include <ctime>
+#include <vector>
+
+
+namespace MySQL {
+
+class MySQLClient;
+
+} // namespace MySQL
+
+namespace DataStorageInterface {
+namespace DataInterface {
+
+class GraphClient
+{
+public:
+	GraphClient(MySQL::MySQLClient* pMySQLClient);
+
+	nlohmann::json GetGraphHeader(const std::vector<int>& dataIds, DataStorage::Timespan::type timespan);
+	nlohmann::json GetGraphData(int deviceId, const std::vector<int>& dataIds, DataStorage::Timespan::type timespan);
+
+private:
+	nlohmann::json GetGraphHeader(DataStorage::Timespan::type timespan, std::time_t currentTimestamp);
+
+private:
+	MySQL::MySQLClient* m_pMySQLClient;
+};
+
+} // namespace DataInterface
+} // namespace DataStorageInterface
+
+#endif // DATAINTERFACE_GRAPHCLIENT_H

+ 1 - 0
Libraries/DateTime

@@ -0,0 +1 @@
+../../Libraries/DateTime

+ 8 - 2
Makefile.conf

@@ -14,7 +14,7 @@ LFLAGS += -lDataStorage
 LFLAGS += -L$(ROOTPATH)/Libraries/DataStorage/lib/$(ARCH)
 CFLAGS += -I$(ROOTPATH)/Libraries/DataStorage/include
 
-LFLAGS += -lHttp
+LFLAGS += -lHttp -lcrypto -lcurl -lssl -lz
 LFLAGS += -L$(ROOTPATH)/Libraries/Http/lib/$(ARCH)
 CFLAGS += -I$(ROOTPATH)/Libraries/Http/include
 
@@ -26,10 +26,16 @@ LFLAGS += -lUtilities
 LFLAGS += -L$(ROOTPATH)/Libraries/Utilities/lib/$(ARCH)
 CFLAGS += -I$(ROOTPATH)/Libraries/Utilities/include
 
+LFLAGS += -lDateTime
+LFLAGS += -L$(ROOTPATH)/Libraries/DateTime/lib/$(ARCH)
+CFLAGS += -I$(ROOTPATH)/Libraries/DateTime/include
+
 CFLAGS += -I$(ROOTPATH)/Libraries/clipp/include
 CFLAGS += -I$(ROOTPATH)/Libraries/inih
 CFLAGS += -I$(ROOTPATH)/Libraries/json/include/nlohmann -I$(ROOTPATH)/Libraries/json/include
 
-LFLAGS += -pthread -lm
+LFLAGS += -pthread
+
 CFLAGS += -I$(ROOTPATH) -I$(ROOTPATH)/Libraries
+
 DEBUGDIR := .debug

+ 0 - 1
Util/Makefile

@@ -1 +0,0 @@
-../Makefile

+ 0 - 23
Util/Util.cpp

@@ -1,23 +0,0 @@
-#include "Util.h"
-#include <algorithm>
-#include <ctime>
-
-
-namespace DataStorageInterface {
-namespace Util {
-
-int GetTimestamp()
-{
-	return std::time(nullptr);
-}
-
-int GetUTCOffset()
-{
-	std::time_t current_time;
-	std::time(&current_time);
-	struct std::tm *timeinfo = std::localtime(&current_time);
-	return timeinfo->tm_gmtoff;
-}
-
-} // namespace Util
-} // namespace DataStorageInterface

+ 0 - 16
Util/Util.h

@@ -1,16 +0,0 @@
-#ifndef UTIL_UTIL_H
-#define UTIL_UTIL_H
-
-#include <string>
-
-
-namespace DataStorageInterface {
-namespace Util {
-
-int GetTimestamp();
-int GetUTCOffset();
-
-} // namespace Util
-} // namespace DataStorageInterface
-
-#endif // UTIL_UTIL_H