#include "DataCondenser.h" #include "Util/Util.h" #include #include #include #include #include namespace DataStorageInterface { namespace DataInterface { DataCondenser::DataCondenser(MySQL::MySQLClient* pMySQLClient) : m_pMySQLClient(pMySQLClient) { } void DataCondenser::CondenseData() { auto deviceIDs = GetDeviceIDs(); for (int dataId = 0; dataId < 3; ++dataId) { std::cout << "DataID: " << dataId << std::endl; //std::cout << "Daily" << std::endl; //for (auto& deviceId : deviceIDs) // WriteReducedData(deviceId, dataId, RRA::Daily); std::cout << "Weekly" << std::endl; for (auto& deviceId : deviceIDs) WriteReducedData(deviceId, dataId, RRA::Weekly); std::cout << "Monthly" << std::endl; for (auto& deviceId : deviceIDs) WriteReducedData(deviceId, dataId, RRA::Monthly); std::cout << "Yearly" << std::endl; for (auto& deviceId : deviceIDs) WriteReducedData(deviceId, dataId, RRA::Yearly); } CleanData(); } int DataCondenser::GetPeriodStamp(int timestamp, const RRA::type& rra) { int modulo = timestamp % static_cast(rra); return timestamp - modulo; } std::vector DataCondenser::GetDeviceIDs() { std::stringstream query; query << "SELECT DISTINCT `device_id` FROM `datalog` ORDER BY `device_id`;"; auto resultSet = m_pMySQLClient->ExecuteQuery(query.str()); std::vector returnValue; while (resultSet.Next()) returnValue.push_back(resultSet.Int("device_id")); return returnValue; } int DataCondenser::GetNewestPeriodStamp(const std::string& sourceTable, int deviceId, int dataId) { std::stringstream query; query << "SELECT MAX(UNIX_TIMESTAMP(`timestamp`)) AS timestamp FROM `" << sourceTable << "` WHERE `device_id` = '" << deviceId <<"' AND `data_id` = '" << dataId << "';"; auto resultSet = m_pMySQLClient->ExecuteQuery(query.str()); if (resultSet.Next()) return resultSet.Int("timestamp"); return 0; } MySQL::MySQLResultSet DataCondenser::GetRawData(const std::string& sourceTable, int deviceId, int dataId, int newestPeriodStamp) { std::stringstream query; query << "SELECT UNIX_TIMESTAMP(`timestamp`) AS timestamp, `value` AS `min-value`, `value` AS `mean-value`, `value` AS `max-value` FROM `" << sourceTable << "` WHERE `device_id` = '" << deviceId <<"' AND `data_id` = '" << dataId << "' "; if (newestPeriodStamp == -1) query << "LIMIT 1;"; else query << "AND `timestamp` > FROM_UNIXTIME(" << newestPeriodStamp << ") ORDER BY `timestamp` ASC;"; return m_pMySQLClient->ExecuteQuery(query.str()); } MySQL::MySQLResultSet DataCondenser::GetReducedData(const std::string& sourceTable, int deviceId, int dataId, int newestPeriodStamp) { std::stringstream query; query << "SELECT UNIX_TIMESTAMP(`timestamp`) AS timestamp, `min-value`, `mean-value`, `max-value` FROM `" << sourceTable << "` WHERE `device_id` = '" << deviceId <<"' AND `data_id` = '" << dataId << "' "; if (newestPeriodStamp == -1) query << "LIMIT 1;"; else query << "AND `timestamp` > FROM_UNIXTIME(" << newestPeriodStamp << ") ORDER BY `timestamp` ASC;"; return m_pMySQLClient->ExecuteQuery(query.str()); } int DataCondenser::GetMaximumMySQLPacketSize() { std::stringstream query; query << "show variables like 'max_allowed_packet';"; MySQL::MySQLResultSet resultSet = m_pMySQLClient->ExecuteQuery(query.str()); if (resultSet.RowsCount() != 1) throw std::runtime_error("Unexpected reply when retrieving max_allowed_packet."); resultSet.Next(); return resultSet.Int("Value"); } DataType::type DataCondenser::GetDataType(int deviceId, int dataId) { auto resultSet = GetRawData("datalog", deviceId, dataId, -1); if (resultSet.Next()) return DataInterface::GetDataType(resultSet.String("mean-value")); return DataType::None; } std::string DataCondenser::GetSourceTable(const RRA::type& rra) { switch (rra) { case RRA::Daily: return "datalog"; case RRA::Weekly: return "datalog"; case RRA::Monthly: return "datalog-weekly"; case RRA::Yearly: return "datalog-monthly"; default: case RRA::Raw: return std::string(); } } std::string DataCondenser::GetDestinationTable(const RRA::type& rra) { switch (rra) { case RRA::Daily: return "datalog-daily"; case RRA::Weekly: return "datalog-weekly"; case RRA::Monthly: return "datalog-monthly"; case RRA::Yearly: return "datalog-yearly"; default: case RRA::Raw: return std::string(); } } int DataCondenser::StringStreamSize(std::stringstream& ss) { ss.seekp(0, std::ios_base::end); return ss.tellp(); } void DataCondenser::InitializeInsertQuery(std::stringstream& query, const std::string& table) { query.str(""); query << "INSERT INTO `" << table << "` (`device_id`, `data_id`, `timestamp`, `min-value`, `mean-value`, `max-value`) VALUES "; } void DataCondenser::ExecuteInsertQuery(std::stringstream& query) { query.seekp(-1, std::ios_base::end); query << ";"; m_pMySQLClient->Execute(query.str()); } void DataCondenser::WriteReducedData(int deviceId, int dataId, const RRA::type& rra) { DataType::type dataType = GetDataType(deviceId, dataId); if (dataType == DataType::None) std::cout << " DataType: None" << std::endl; else if (dataType == DataType::Float) std::cout << " DataType: Float" << std::endl; else if (dataType == DataType::String) std::cout << " DataType: String" << std::endl; switch (dataType) { case DataType::Float: WriteReducedFloatData(deviceId, dataId, rra); break; case DataType::String: WriteReducedStringData(deviceId, dataId, rra); break; default: case DataType::None: break; } } void DataCondenser::WriteReducedFloatData(int deviceId, int dataId, const RRA::type& rra) { int maxMySQLPacketSize = GetMaximumMySQLPacketSize() - 2; std::string sourceTable = GetSourceTable(rra); std::string destinationTable = GetDestinationTable(rra); int newestSourcePeriodStamp = GetNewestPeriodStamp(sourceTable, deviceId, dataId); int newestDestinationPeriodStamp = GetNewestPeriodStamp(destinationTable, deviceId, dataId); std::cout << " DeviceID: " << deviceId << std::endl; std::cout << " DataID: " << dataId << std::endl; std::cout << " Newest Src : " << newestSourcePeriodStamp << std::endl; std::cout << " Newest Dest: " << newestDestinationPeriodStamp << std::endl; std::stringstream query; MySQL::MySQLResultSet resultSet; if (sourceTable == "datalog") resultSet = GetRawData(sourceTable, deviceId, dataId, newestDestinationPeriodStamp); else resultSet = GetReducedData(sourceTable, deviceId, dataId, newestDestinationPeriodStamp); std::cout << " Source Count: " << resultSet.RowsCount() << std::endl; int samples = 0; float min = std::numeric_limits::max(); float mean = 0; float max = std::numeric_limits::min(); int writeCount = 0; InitializeInsertQuery(query, destinationTable); std::stringstream queryPart; int periodStamp = 0; while (resultSet.Next()) { int currentPeriodStamp = GetPeriodStamp(resultSet.Int("timestamp"), rra); if (currentPeriodStamp != periodStamp) { if (samples > 0) { mean = mean / samples; queryPart.str(""); queryPart << "('" << deviceId << "', '" << dataId << "', FROM_UNIXTIME(" << currentPeriodStamp << "), '" << min << "', '" << mean << "', '" << max << "'),"; ++writeCount; int packetSize = StringStreamSize(query) + StringStreamSize(queryPart); if (packetSize > maxMySQLPacketSize) { ExecuteInsertQuery(query); InitializeInsertQuery(query, destinationTable); std::cout << " Write Count: " << writeCount << std::endl; writeCount = 0; } query << queryPart.str(); } if (currentPeriodStamp > newestSourcePeriodStamp) break; samples = 0; min = std::numeric_limits::max(); mean = 0; max = std::numeric_limits::min(); periodStamp = currentPeriodStamp; } float minValue, meanValue, maxValue; minValue = static_cast(resultSet.Double("min-value")); meanValue = static_cast(resultSet.Double("mean-value")); maxValue = static_cast(resultSet.Double("max-value")); if (minValue < min) min = minValue; if (maxValue > max) max = maxValue; mean += meanValue; ++samples; } if (writeCount > 0) ExecuteInsertQuery(query); std::cout << " Write Count: " << writeCount << std::endl << std::endl; } void DataCondenser::WriteReducedStringData(int deviceId, int dataId, const RRA::type& rra) { int maxMySQLPacketSize = GetMaximumMySQLPacketSize() - 2; std::string sourceTable = GetSourceTable(rra); std::string destinationTable = GetDestinationTable(rra); int newestSourcePeriodStamp = GetNewestPeriodStamp(sourceTable, deviceId, dataId); int newestDestinationPeriodStamp = GetNewestPeriodStamp(destinationTable, deviceId, dataId); std::cout << " DeviceID: " << deviceId << std::endl; std::cout << " DataID: " << dataId << std::endl; std::cout << " Newest Src : " << newestSourcePeriodStamp << std::endl; std::cout << " Newest Dest: " << newestDestinationPeriodStamp << std::endl; std::stringstream query; MySQL::MySQLResultSet resultSet; if (sourceTable == "datalog") resultSet = GetRawData(sourceTable, deviceId, dataId, newestDestinationPeriodStamp); else resultSet = GetReducedData(sourceTable, deviceId, dataId, newestDestinationPeriodStamp); std::cout << " Source Count: " << resultSet.RowsCount() << std::endl; int samples = 0; std::vector values; int writeCount = 0; InitializeInsertQuery(query, destinationTable); std::stringstream queryPart; int periodStamp = 0; while (resultSet.Next()) { int currentPeriodStamp = GetPeriodStamp(resultSet.Int("timestamp"), rra); if (currentPeriodStamp != periodStamp) { if (samples > 0) { std::map map; for (auto& value : values) { auto iterator = map.find(value); if (iterator == map.end()) map.insert(std::pair(value, 1)); else map[value] += 1; } auto iterator = map.begin(); for (std::map::iterator it = map.begin(); it != map.end(); ++it) { if (it->second > iterator->second) iterator = it; } queryPart.str(""); queryPart << "('" << deviceId << "', '" << dataId << "', FROM_UNIXTIME(" << currentPeriodStamp << "), '', '" << iterator->first << "', ''),"; int packetSize = StringStreamSize(query) + StringStreamSize(queryPart); if (packetSize > maxMySQLPacketSize) { ExecuteInsertQuery(query); InitializeInsertQuery(query, destinationTable); std::cout << " Write Count: " << writeCount << std::endl; writeCount = 0; } query << queryPart.str(); ++writeCount; } if (currentPeriodStamp > newestSourcePeriodStamp) break; samples = 0; values.clear(); periodStamp = currentPeriodStamp; } std::string meanValue = resultSet.String("mean-value"); values.push_back(meanValue); ++samples; } if (writeCount > 0) ExecuteInsertQuery(query); std::cout << " Write Count: " << writeCount << std::endl << std::endl; } void DataCondenser::CleanData() { std::cout << "Cleaning: " << std::endl; CleanData(RRA::Daily); CleanData(RRA::Weekly); CleanData(RRA::Monthly); CleanData(RRA::Yearly); } void DataCondenser::CleanData(const RRA::type& rra) { std::string table = GetDestinationTable(rra); int periodInSeconds = static_cast(rra) * 400; int timestamp = Util::GetTimestamp() + Util::GetUTCOffset() - periodInSeconds; std::cout << " Cleaning " << table << " (<" << timestamp << ")" << std::endl; std::stringstream query; query << "DELETE FROM `" << table << "` WHERE `timestamp` < FROM_UNIXTIME(" << timestamp << ");"; m_pMySQLClient->Execute(query.str()); } } // namespace DataInterface } // namespace DataStorageInterface