DataImporter.cpp 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. #include "DataImporter.h"
  2. #include <DataId.h>
  3. #include <Logging.h>
  4. #include <MySQLClient.h>
  5. #include <SQLiteClient.h>
  6. #include <StringAlgorithm.h>
  7. #include <sstream>
  8. namespace DataStorageInterface {
  9. namespace DataInterface {
  10. DataImporter::DataImporter(MySQL::MySQLClient* pMySQLClient, SQLite::SQLiteClient* pSQLiteClient) :
  11. m_pMySQLClient(pMySQLClient),
  12. m_pSQLiteClient(pSQLiteClient)
  13. {
  14. }
  15. void DataImporter::ImportData()
  16. {
  17. auto tables = GetTables();
  18. for (auto& table : tables)
  19. {
  20. auto devices = GetDevices(table);
  21. for (auto& device : devices)
  22. ImportData(table, device);
  23. }
  24. }
  25. std::vector<std::string> DataImporter::GetTables()
  26. {
  27. std::stringstream query;
  28. query << "SELECT name FROM sqlite_master WHERE type ='table' AND name NOT LIKE 'sqlite_%';";
  29. auto resultSet = m_pSQLiteClient->ExecuteQuery(query.str());
  30. std::vector<std::string> tables;
  31. while (resultSet.Next())
  32. tables.push_back(resultSet.String("name"));
  33. return tables;
  34. }
  35. std::vector<int> DataImporter::GetDevices(const std::string& table)
  36. {
  37. std::stringstream query;
  38. query << "SELECT DISTINCT device_id FROM " << table << ";";
  39. auto resultSet = m_pSQLiteClient->ExecuteQuery(query.str());
  40. std::vector<int> devices;
  41. while (resultSet.Next())
  42. devices.push_back(resultSet.Int("device_id"));
  43. return devices;
  44. }
  45. void DataImporter::ImportData(const std::string& table, int device)
  46. {
  47. std::stringstream ss;
  48. ss << "Table: " << table << " #" << device;
  49. Logging::Log(Logging::Severity::Info, ss.str());
  50. int maxMySQLPacketSize = GetMaximumMySQLPacketSize() - 2;
  51. int maxTimestamp = GetMaximumTimestamp(table, device);
  52. int dataId = DataStorage::DataId(table).Id();
  53. std::stringstream queryPart;
  54. std::stringstream query;
  55. query << "SELECT timestamp, value FROM " << table << " WHERE device_id = '" << device << "' AND timestamp > '" << maxTimestamp << "' ORDER BY timestamp ASC;";
  56. auto resultSet = m_pSQLiteClient->ExecuteQuery(query.str());
  57. int writeCount = 0;
  58. InitializeInsertQuery(query);
  59. while (resultSet.Next())
  60. {
  61. queryPart.str("");
  62. queryPart << "('" << device << "', '" << dataId << "', FROM_UNIXTIME(" << resultSet.Int("timestamp") << "), '" << resultSet.String("value") << "'),";
  63. ++writeCount;
  64. int packetSize = StringStreamSize(query) + StringStreamSize(queryPart);
  65. if (packetSize > maxMySQLPacketSize)
  66. {
  67. ExecuteInsertQuery(query);
  68. writeCount = 0;
  69. InitializeInsertQuery(query);
  70. }
  71. query << queryPart.str();
  72. }
  73. if (writeCount > 0)
  74. ExecuteInsertQuery(query);
  75. }
  76. int DataImporter::GetMaximumMySQLPacketSize()
  77. {
  78. std::stringstream query;
  79. query << "show variables like 'max_allowed_packet';";
  80. MySQL::MySQLResultSet resultSet = m_pMySQLClient->ExecuteQuery(query.str());
  81. if (resultSet.RowsCount() != 1)
  82. throw std::runtime_error("Unexpected reply when retrieving max_allowed_packet.");
  83. resultSet.Next();
  84. return resultSet.Int("Value");
  85. }
  86. int DataImporter::StringStreamSize(std::stringstream& ss)
  87. {
  88. ss.seekp(0, std::ios_base::end);
  89. return ss.tellp();
  90. }
  91. void DataImporter::InitializeInsertQuery(std::stringstream& query)
  92. {
  93. query.str("");
  94. query << "INSERT INTO `datalog` (`device_id`, `data_id`, `timestamp`, `value`) VALUES ";
  95. }
  96. void DataImporter::ExecuteInsertQuery(std::stringstream& query)
  97. {
  98. query.seekp(-1, std::ios_base::end);
  99. query << ";";
  100. m_pMySQLClient->Execute(query.str());
  101. }
  102. int DataImporter::GetMaximumTimestamp(const std::string& table, int device)
  103. {
  104. std::stringstream query;
  105. query << "SELECT MAX(UNIX_TIMESTAMP(`timestamp`)) AS timestamp FROM `datalog` WHERE `device_id` = '" << device <<"' AND `data_id` = '" << DataStorage::DataId(table).Id() << "';";
  106. auto resultSet = m_pMySQLClient->ExecuteQuery(query.str());
  107. if (resultSet.Next())
  108. return resultSet.Int("timestamp");
  109. return 0;
  110. }
  111. } // namespace DataInterface
  112. } // namespace DataStorageInterface