#include "DataImporter.h" #include #include #include #include #include #include namespace DataStorageInterface { namespace DataInterface { DataImporter::DataImporter(MySQL::MySQLClient* pMySQLClient, SQLite::SQLiteClient* pSQLiteClient) : m_pMySQLClient(pMySQLClient), m_pSQLiteClient(pSQLiteClient) { } void DataImporter::ImportData() { auto tables = GetTables(); for (auto& table : tables) { auto devices = GetDevices(table); for (auto& device : devices) ImportData(table, device); } } std::vector DataImporter::GetTables() { std::stringstream query; query << "SELECT name FROM sqlite_master WHERE type ='table' AND name NOT LIKE 'sqlite_%';"; auto resultSet = m_pSQLiteClient->ExecuteQuery(query.str()); std::vector tables; while (resultSet.Next()) tables.push_back(resultSet.String("name")); return tables; } std::vector DataImporter::GetDevices(const std::string& table) { std::stringstream query; query << "SELECT DISTINCT device_id FROM " << table << ";"; auto resultSet = m_pSQLiteClient->ExecuteQuery(query.str()); std::vector devices; while (resultSet.Next()) devices.push_back(resultSet.Int("device_id")); return devices; } void DataImporter::ImportData(const std::string& table, int device) { std::stringstream ss; ss << "Table: " << table << " #" << device; Logging::Log(Logging::Severity::Info, ss.str()); int maxMySQLPacketSize = GetMaximumMySQLPacketSize() - 2; int maxTimestamp = GetMaximumTimestamp(table, device); int dataId = DataStorage::DataId(table).Id(); std::stringstream queryPart; std::stringstream query; query << "SELECT timestamp, value FROM " << table << " WHERE device_id = '" << device << "' AND timestamp > '" << maxTimestamp << "' ORDER BY timestamp ASC;"; auto resultSet = m_pSQLiteClient->ExecuteQuery(query.str()); int writeCount = 0; InitializeInsertQuery(query); while (resultSet.Next()) { queryPart.str(""); queryPart << "('" << device << "', '" << dataId << "', FROM_UNIXTIME(" << resultSet.Int("timestamp") << "), '" << resultSet.String("value") << "'),"; ++writeCount; int packetSize = StringStreamSize(query) + StringStreamSize(queryPart); if (packetSize > maxMySQLPacketSize) { ExecuteInsertQuery(query); writeCount = 0; InitializeInsertQuery(query); } query << queryPart.str(); } if (writeCount > 0) ExecuteInsertQuery(query); } int DataImporter::GetMaximumMySQLPacketSize() { std::stringstream query; query << "show variables like 'max_allowed_packet';"; MySQL::MySQLResultSet resultSet = m_pMySQLClient->ExecuteQuery(query.str()); if (resultSet.RowsCount() != 1) throw std::runtime_error("Unexpected reply when retrieving max_allowed_packet."); resultSet.Next(); return resultSet.Int("Value"); } int DataImporter::StringStreamSize(std::stringstream& ss) { ss.seekp(0, std::ios_base::end); return ss.tellp(); } void DataImporter::InitializeInsertQuery(std::stringstream& query) { query.str(""); query << "INSERT INTO `datalog` (`device_id`, `data_id`, `timestamp`, `value`) VALUES "; } void DataImporter::ExecuteInsertQuery(std::stringstream& query) { query.seekp(-1, std::ios_base::end); query << ";"; m_pMySQLClient->Execute(query.str()); } int DataImporter::GetMaximumTimestamp(const std::string& table, int device) { std::stringstream query; query << "SELECT MAX(UNIX_TIMESTAMP(`timestamp`)) AS timestamp FROM `datalog` WHERE `device_id` = '" << device <<"' AND `data_id` = '" << DataStorage::DataId(table).Id() << "';"; auto resultSet = m_pMySQLClient->ExecuteQuery(query.str()); if (resultSet.Next()) return resultSet.Int("timestamp"); return 0; } } // namespace DataInterface } // namespace DataStorageInterface