| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 |
- #include "DataImporter.h"
- #include <DataId.h>
- #include <Logging.h>
- #include <MySQLClient.h>
- #include <SQLiteClient.h>
- #include <StringAlgorithm.h>
- #include <sstream>
- namespace DataStorageInterface {
- namespace DataInterface {
- DataImporter::DataImporter(MySQL::MySQLClient* pMySQLClient, SQLite::SQLiteClient* pSQLiteClient) :
- m_pMySQLClient(pMySQLClient),
- m_pSQLiteClient(pSQLiteClient)
- {
- }
- void DataImporter::ImportData()
- {
- auto tables = GetTables();
- for (auto& table : tables)
- {
- auto devices = GetDevices(table);
- for (auto& device : devices)
- ImportData(table, device);
- }
- }
- std::vector<std::string> DataImporter::GetTables()
- {
- std::stringstream query;
- query << "SELECT name FROM sqlite_master WHERE type ='table' AND name NOT LIKE 'sqlite_%';";
- auto resultSet = m_pSQLiteClient->ExecuteQuery(query.str());
- std::vector<std::string> tables;
- while (resultSet.Next())
- tables.push_back(resultSet.String("name"));
- return tables;
- }
- std::vector<int> DataImporter::GetDevices(const std::string& table)
- {
- std::stringstream query;
- query << "SELECT DISTINCT device_id FROM " << table << ";";
- auto resultSet = m_pSQLiteClient->ExecuteQuery(query.str());
- std::vector<int> devices;
- while (resultSet.Next())
- devices.push_back(resultSet.Int("device_id"));
- return devices;
- }
- void DataImporter::ImportData(const std::string& table, int device)
- {
- std::stringstream ss;
- ss << "Table: " << table << " #" << device;
- Logging::Log(Logging::Severity::Info, ss.str());
- int maxMySQLPacketSize = GetMaximumMySQLPacketSize() - 2;
- int maxTimestamp = GetMaximumTimestamp(table, device);
- int dataId = DataStorage::DataId(table).Id();
- std::stringstream queryPart;
- std::stringstream query;
- query << "SELECT timestamp, value FROM " << table << " WHERE device_id = '" << device << "' AND timestamp > '" << maxTimestamp << "' ORDER BY timestamp ASC;";
- auto resultSet = m_pSQLiteClient->ExecuteQuery(query.str());
- int writeCount = 0;
- InitializeInsertQuery(query);
- while (resultSet.Next())
- {
- queryPart.str("");
- queryPart << "('" << device << "', '" << dataId << "', FROM_UNIXTIME(" << resultSet.Int("timestamp") << "), '" << resultSet.String("value") << "'),";
- ++writeCount;
- int packetSize = StringStreamSize(query) + StringStreamSize(queryPart);
- if (packetSize > maxMySQLPacketSize)
- {
- ExecuteInsertQuery(query);
- writeCount = 0;
- InitializeInsertQuery(query);
- }
- query << queryPart.str();
- }
- if (writeCount > 0)
- ExecuteInsertQuery(query);
- }
- int DataImporter::GetMaximumMySQLPacketSize()
- {
- std::stringstream query;
- query << "show variables like 'max_allowed_packet';";
- MySQL::MySQLResultSet resultSet = m_pMySQLClient->ExecuteQuery(query.str());
- if (resultSet.RowsCount() != 1)
- throw std::runtime_error("Unexpected reply when retrieving max_allowed_packet.");
- resultSet.Next();
- return resultSet.Int("Value");
- }
- int DataImporter::StringStreamSize(std::stringstream& ss)
- {
- ss.seekp(0, std::ios_base::end);
- return ss.tellp();
- }
- void DataImporter::InitializeInsertQuery(std::stringstream& query)
- {
- query.str("");
- query << "INSERT INTO `datalog` (`device_id`, `data_id`, `timestamp`, `value`) VALUES ";
- }
- void DataImporter::ExecuteInsertQuery(std::stringstream& query)
- {
- query.seekp(-1, std::ios_base::end);
- query << ";";
- m_pMySQLClient->Execute(query.str());
- }
- int DataImporter::GetMaximumTimestamp(const std::string& table, int device)
- {
- std::stringstream query;
- query << "SELECT MAX(UNIX_TIMESTAMP(`timestamp`)) AS timestamp FROM `datalog` WHERE `device_id` = '" << device <<"' AND `data_id` = '" << DataStorage::DataId(table).Id() << "';";
- auto resultSet = m_pMySQLClient->ExecuteQuery(query.str());
- if (resultSet.Next())
- return resultSet.Int("timestamp");
- return 0;
- }
- } // namespace DataInterface
- } // namespace DataStorageInterface
|