DataCondenser.cpp 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. #include "DataCondenser.h"
  2. #include "Util/Util.h"
  3. #include <DataId.h>
  4. #include <iostream>
  5. #include <limits>
  6. #include <map>
  7. #include <memory>
  8. #include <sstream>
  9. namespace DataStorageInterface {
  10. namespace DataInterface {
  11. DataCondenser::DataCondenser(MySQL::MySQLClient* pMySQLClient) :
  12. m_pMySQLClient(pMySQLClient)
  13. {
  14. }
  15. void DataCondenser::CondenseData()
  16. {
  17. auto deviceIDs = GetDeviceIDs();
  18. for (int dataId = DataStorage::DataId::Min(); dataId <= DataStorage::DataId::Max(); ++dataId)
  19. {
  20. std::cout << "DataID: " << dataId << std::endl;
  21. //std::cout << "Daily" << std::endl;
  22. //for (auto& deviceId : deviceIDs)
  23. // WriteReducedData(deviceId, dataId, RRA::Daily);
  24. std::cout << "Weekly" << std::endl;
  25. for (auto& deviceId : deviceIDs)
  26. WriteReducedData(deviceId, dataId, RRA::Weekly);
  27. std::cout << "Monthly" << std::endl;
  28. for (auto& deviceId : deviceIDs)
  29. WriteReducedData(deviceId, dataId, RRA::Monthly);
  30. std::cout << "Yearly" << std::endl;
  31. for (auto& deviceId : deviceIDs)
  32. WriteReducedData(deviceId, dataId, RRA::Yearly);
  33. }
  34. CleanData();
  35. }
  36. int DataCondenser::GetPeriodStamp(int timestamp, const RRA::type& rra)
  37. {
  38. int modulo = timestamp % static_cast<int>(rra);
  39. return timestamp - modulo;
  40. }
  41. std::vector<int> DataCondenser::GetDeviceIDs()
  42. {
  43. std::stringstream query;
  44. query << "SELECT DISTINCT `device_id` FROM `datalog` ORDER BY `device_id`;";
  45. auto resultSet = m_pMySQLClient->ExecuteQuery(query.str());
  46. std::vector<int> returnValue;
  47. while (resultSet.Next())
  48. returnValue.push_back(resultSet.Int("device_id"));
  49. return returnValue;
  50. }
  51. int DataCondenser::GetNewestPeriodStamp(const std::string& sourceTable, int deviceId, int dataId)
  52. {
  53. std::stringstream query;
  54. query << "SELECT MAX(UNIX_TIMESTAMP(`timestamp`)) AS timestamp FROM `" << sourceTable << "` WHERE `device_id` = '" << deviceId <<"' AND `data_id` = '" << dataId << "';";
  55. auto resultSet = m_pMySQLClient->ExecuteQuery(query.str());
  56. if (resultSet.Next())
  57. return resultSet.Int("timestamp");
  58. return 0;
  59. }
  60. MySQL::MySQLResultSet DataCondenser::GetRawData(const std::string& sourceTable, int deviceId, int dataId, int newestPeriodStamp)
  61. {
  62. std::stringstream query;
  63. query << "SELECT UNIX_TIMESTAMP(`timestamp`) AS timestamp, `value` AS `min-value`, `value` AS `mean-value`, `value` AS `max-value` FROM `" << sourceTable << "` WHERE `device_id` = '" << deviceId <<"' AND `data_id` = '" << dataId << "' ";
  64. if (newestPeriodStamp == -1)
  65. query << "LIMIT 1;";
  66. else
  67. query << "AND `timestamp` > FROM_UNIXTIME(" << newestPeriodStamp << ") ORDER BY `timestamp` ASC;";
  68. return m_pMySQLClient->ExecuteQuery(query.str());
  69. }
  70. MySQL::MySQLResultSet DataCondenser::GetReducedData(const std::string& sourceTable, int deviceId, int dataId, int newestPeriodStamp)
  71. {
  72. std::stringstream query;
  73. query << "SELECT UNIX_TIMESTAMP(`timestamp`) AS timestamp, `min-value`, `mean-value`, `max-value` FROM `" << sourceTable << "` WHERE `device_id` = '" << deviceId <<"' AND `data_id` = '" << dataId << "' ";
  74. if (newestPeriodStamp == -1)
  75. query << "LIMIT 1;";
  76. else
  77. query << "AND `timestamp` > FROM_UNIXTIME(" << newestPeriodStamp << ") ORDER BY `timestamp` ASC;";
  78. return m_pMySQLClient->ExecuteQuery(query.str());
  79. }
  80. int DataCondenser::GetMaximumMySQLPacketSize()
  81. {
  82. std::stringstream query;
  83. query << "show variables like 'max_allowed_packet';";
  84. MySQL::MySQLResultSet resultSet = m_pMySQLClient->ExecuteQuery(query.str());
  85. if (resultSet.RowsCount() != 1)
  86. throw std::runtime_error("Unexpected reply when retrieving max_allowed_packet.");
  87. resultSet.Next();
  88. return resultSet.Int("Value");
  89. }
  90. DataType::type DataCondenser::GetDataType(int deviceId, int dataId)
  91. {
  92. auto resultSet = GetRawData("datalog", deviceId, dataId, -1);
  93. if (resultSet.Next())
  94. return DataInterface::GetDataType(resultSet.String("mean-value"));
  95. return DataType::None;
  96. }
  97. std::string DataCondenser::GetSourceTable(const RRA::type& rra)
  98. {
  99. switch (rra)
  100. {
  101. case RRA::Daily:
  102. return "datalog";
  103. case RRA::Weekly:
  104. return "datalog";
  105. case RRA::Monthly:
  106. return "datalog-weekly";
  107. case RRA::Yearly:
  108. return "datalog-monthly";
  109. default:
  110. case RRA::Raw:
  111. return std::string();
  112. }
  113. }
  114. std::string DataCondenser::GetDestinationTable(const RRA::type& rra)
  115. {
  116. switch (rra)
  117. {
  118. case RRA::Daily:
  119. return "datalog-daily";
  120. case RRA::Weekly:
  121. return "datalog-weekly";
  122. case RRA::Monthly:
  123. return "datalog-monthly";
  124. case RRA::Yearly:
  125. return "datalog-yearly";
  126. default:
  127. case RRA::Raw:
  128. return std::string();
  129. }
  130. }
  131. int DataCondenser::StringStreamSize(std::stringstream& ss)
  132. {
  133. ss.seekp(0, std::ios_base::end);
  134. return ss.tellp();
  135. }
  136. void DataCondenser::InitializeInsertQuery(std::stringstream& query, const std::string& table)
  137. {
  138. query.str("");
  139. query << "INSERT INTO `" << table << "` (`device_id`, `data_id`, `timestamp`, `min-value`, `mean-value`, `max-value`) VALUES ";
  140. }
  141. void DataCondenser::ExecuteInsertQuery(std::stringstream& query)
  142. {
  143. query.seekp(-1, std::ios_base::end);
  144. query << ";";
  145. m_pMySQLClient->Execute(query.str());
  146. }
  147. void DataCondenser::WriteReducedData(int deviceId, int dataId, const RRA::type& rra)
  148. {
  149. DataType::type dataType = GetDataType(deviceId, dataId);
  150. if (dataType == DataType::None)
  151. std::cout << " DataType: None" << std::endl;
  152. else if (dataType == DataType::Float)
  153. std::cout << " DataType: Float" << std::endl;
  154. else if (dataType == DataType::String)
  155. std::cout << " DataType: String" << std::endl;
  156. switch (dataType)
  157. {
  158. case DataType::Float:
  159. WriteReducedFloatData(deviceId, dataId, rra);
  160. break;
  161. case DataType::String:
  162. WriteReducedStringData(deviceId, dataId, rra);
  163. break;
  164. default:
  165. case DataType::None:
  166. break;
  167. }
  168. }
  169. void DataCondenser::WriteReducedFloatData(int deviceId, int dataId, const RRA::type& rra)
  170. {
  171. int maxMySQLPacketSize = GetMaximumMySQLPacketSize() - 2;
  172. std::string sourceTable = GetSourceTable(rra);
  173. std::string destinationTable = GetDestinationTable(rra);
  174. int newestSourcePeriodStamp = GetNewestPeriodStamp(sourceTable, deviceId, dataId);
  175. int newestDestinationPeriodStamp = GetNewestPeriodStamp(destinationTable, deviceId, dataId);
  176. std::cout << " DeviceID: " << deviceId << std::endl;
  177. std::cout << " DataID: " << dataId << std::endl;
  178. std::cout << " Newest Src : " << newestSourcePeriodStamp << std::endl;
  179. std::cout << " Newest Dest: " << newestDestinationPeriodStamp << std::endl;
  180. std::stringstream query;
  181. MySQL::MySQLResultSet resultSet;
  182. if (sourceTable == "datalog")
  183. resultSet = GetRawData(sourceTable, deviceId, dataId, newestDestinationPeriodStamp);
  184. else
  185. resultSet = GetReducedData(sourceTable, deviceId, dataId, newestDestinationPeriodStamp);
  186. std::cout << " Source Count: " << resultSet.RowsCount() << std::endl;
  187. int samples = 0;
  188. float min = std::numeric_limits<float>::max();
  189. float mean = 0;
  190. float max = std::numeric_limits<float>::min();
  191. int writeCount = 0;
  192. InitializeInsertQuery(query, destinationTable);
  193. std::stringstream queryPart;
  194. int periodStamp = 0;
  195. while (resultSet.Next())
  196. {
  197. int currentPeriodStamp = GetPeriodStamp(resultSet.Int("timestamp"), rra);
  198. if (currentPeriodStamp != periodStamp)
  199. {
  200. if (samples > 0)
  201. {
  202. mean = mean / samples;
  203. queryPart.str("");
  204. queryPart << "('" << deviceId << "', '" << dataId << "', FROM_UNIXTIME(" << currentPeriodStamp << "), '" << min << "', '" << mean << "', '" << max << "'),";
  205. ++writeCount;
  206. int packetSize = StringStreamSize(query) + StringStreamSize(queryPart);
  207. if (packetSize > maxMySQLPacketSize)
  208. {
  209. ExecuteInsertQuery(query);
  210. InitializeInsertQuery(query, destinationTable);
  211. std::cout << " Write Count: " << writeCount << std::endl;
  212. writeCount = 0;
  213. }
  214. query << queryPart.str();
  215. }
  216. if (currentPeriodStamp > newestSourcePeriodStamp)
  217. break;
  218. samples = 0;
  219. min = std::numeric_limits<float>::max();
  220. mean = 0;
  221. max = std::numeric_limits<float>::min();
  222. periodStamp = currentPeriodStamp;
  223. }
  224. float minValue, meanValue, maxValue;
  225. minValue = static_cast<float>(resultSet.Double("min-value"));
  226. meanValue = static_cast<float>(resultSet.Double("mean-value"));
  227. maxValue = static_cast<float>(resultSet.Double("max-value"));
  228. if (minValue < min)
  229. min = minValue;
  230. if (maxValue > max)
  231. max = maxValue;
  232. mean += meanValue;
  233. ++samples;
  234. }
  235. if (writeCount > 0)
  236. ExecuteInsertQuery(query);
  237. std::cout << " Write Count: " << writeCount << std::endl << std::endl;
  238. }
  239. void DataCondenser::WriteReducedStringData(int deviceId, int dataId, const RRA::type& rra)
  240. {
  241. int maxMySQLPacketSize = GetMaximumMySQLPacketSize() - 2;
  242. std::string sourceTable = GetSourceTable(rra);
  243. std::string destinationTable = GetDestinationTable(rra);
  244. int newestSourcePeriodStamp = GetNewestPeriodStamp(sourceTable, deviceId, dataId);
  245. int newestDestinationPeriodStamp = GetNewestPeriodStamp(destinationTable, deviceId, dataId);
  246. std::cout << " DeviceID: " << deviceId << std::endl;
  247. std::cout << " DataID: " << dataId << std::endl;
  248. std::cout << " Newest Src : " << newestSourcePeriodStamp << std::endl;
  249. std::cout << " Newest Dest: " << newestDestinationPeriodStamp << std::endl;
  250. std::stringstream query;
  251. MySQL::MySQLResultSet resultSet;
  252. if (sourceTable == "datalog")
  253. resultSet = GetRawData(sourceTable, deviceId, dataId, newestDestinationPeriodStamp);
  254. else
  255. resultSet = GetReducedData(sourceTable, deviceId, dataId, newestDestinationPeriodStamp);
  256. std::cout << " Source Count: " << resultSet.RowsCount() << std::endl;
  257. int samples = 0;
  258. std::vector<std::string> values;
  259. int writeCount = 0;
  260. InitializeInsertQuery(query, destinationTable);
  261. std::stringstream queryPart;
  262. int periodStamp = 0;
  263. while (resultSet.Next())
  264. {
  265. int currentPeriodStamp = GetPeriodStamp(resultSet.Int("timestamp"), rra);
  266. if (currentPeriodStamp != periodStamp)
  267. {
  268. if (samples > 0)
  269. {
  270. std::map<std::string, int> map;
  271. for (auto& value : values)
  272. {
  273. auto iterator = map.find(value);
  274. if (iterator == map.end())
  275. map.insert(std::pair<std::string, int>(value, 1));
  276. else
  277. map[value] += 1;
  278. }
  279. auto iterator = map.begin();
  280. for (std::map<std::string, int>::iterator it = map.begin(); it != map.end(); ++it)
  281. {
  282. if (it->second > iterator->second)
  283. iterator = it;
  284. }
  285. queryPart.str("");
  286. queryPart << "('" << deviceId << "', '" << dataId << "', FROM_UNIXTIME(" << currentPeriodStamp << "), '', '" << iterator->first << "', ''),";
  287. int packetSize = StringStreamSize(query) + StringStreamSize(queryPart);
  288. if (packetSize > maxMySQLPacketSize)
  289. {
  290. ExecuteInsertQuery(query);
  291. InitializeInsertQuery(query, destinationTable);
  292. std::cout << " Write Count: " << writeCount << std::endl;
  293. writeCount = 0;
  294. }
  295. query << queryPart.str();
  296. ++writeCount;
  297. }
  298. if (currentPeriodStamp > newestSourcePeriodStamp)
  299. break;
  300. samples = 0;
  301. values.clear();
  302. periodStamp = currentPeriodStamp;
  303. }
  304. std::string meanValue = resultSet.String("mean-value");
  305. values.push_back(meanValue);
  306. ++samples;
  307. }
  308. if (writeCount > 0)
  309. ExecuteInsertQuery(query);
  310. std::cout << " Write Count: " << writeCount << std::endl << std::endl;
  311. }
  312. void DataCondenser::CleanData()
  313. {
  314. std::cout << "Cleaning: " << std::endl;
  315. CleanData(RRA::Daily);
  316. CleanData(RRA::Weekly);
  317. CleanData(RRA::Monthly);
  318. CleanData(RRA::Yearly);
  319. }
  320. void DataCondenser::CleanData(const RRA::type& rra)
  321. {
  322. std::string table = GetDestinationTable(rra);
  323. int periodInSeconds = static_cast<int>(rra) * 400;
  324. int timestamp = Util::GetTimestamp() + Util::GetUTCOffset() - periodInSeconds;
  325. std::cout << " Cleaning " << table << " (<" << timestamp << ")" << std::endl;
  326. std::stringstream query;
  327. query << "DELETE FROM `" << table << "` WHERE `timestamp` < FROM_UNIXTIME(" << timestamp << ");";
  328. m_pMySQLClient->Execute(query.str());
  329. }
  330. } // namespace DataInterface
  331. } // namespace DataStorageInterface