|
@@ -0,0 +1,164 @@
|
|
|
|
|
+#include "MQTTImpl.h"
|
|
|
|
|
+#include <Logging.h>
|
|
|
|
|
+#include <mosquitto.h>
|
|
|
|
|
+#include <sstream>
|
|
|
|
|
+#include <unistd.h>
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+namespace MQTT {
|
|
|
|
|
+
|
|
|
|
|
+MQTTImpl::MQTTImpl(const std::string& hostname, int port) :
|
|
|
|
|
+ m_hostname(hostname),
|
|
|
|
|
+ m_port(port),
|
|
|
|
|
+ m_connected(false),
|
|
|
|
|
+ m_loop(false)
|
|
|
|
|
+{
|
|
|
|
|
+ std::stringstream clientId;
|
|
|
|
|
+ clientId << "MQTT_" << getpid();
|
|
|
|
|
+ m_clientId = clientId.str();
|
|
|
|
|
+
|
|
|
|
|
+ mosquitto_lib_init();
|
|
|
|
|
+ m_pMosquitto = mosquitto_new(m_clientId.c_str(), true, this);
|
|
|
|
|
+
|
|
|
|
|
+ m_connected = Connect();
|
|
|
|
|
+ StartLoop();
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+MQTTImpl::~MQTTImpl()
|
|
|
|
|
+{
|
|
|
|
|
+ StopLoop();
|
|
|
|
|
+ Disconnect();
|
|
|
|
|
+ mosquitto_destroy(m_pMosquitto);
|
|
|
|
|
+ mosquitto_lib_cleanup();
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+bool MQTTImpl::Send(const Message& message)
|
|
|
|
|
+{
|
|
|
|
|
+ if (m_connected)
|
|
|
|
|
+ return (MOSQ_ERR_SUCCESS == mosquitto_publish(m_pMosquitto, NULL, message.topic.c_str(), message.payload.length(), message.payload.c_str(), 0, 0));
|
|
|
|
|
+ return false;
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+size_t MQTTImpl::Connect(MQTT::CallbackMethod function)
|
|
|
|
|
+{
|
|
|
|
|
+ return m_signal.connect(function);
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+void MQTTImpl::Disconnect(size_t connection)
|
|
|
|
|
+{
|
|
|
|
|
+ m_signal.disconnect(connection);
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+bool MQTTImpl::Subscribe(const std::string& topic)
|
|
|
|
|
+{
|
|
|
|
|
+ if (m_pMosquitto && m_connected)
|
|
|
|
|
+ {
|
|
|
|
|
+ if (MOSQ_ERR_SUCCESS != mosquitto_subscribe(m_pMosquitto, NULL, topic.c_str(), 0))
|
|
|
|
|
+ return false;
|
|
|
|
|
+ return true;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return false;
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+void MQTTImpl::Unsubscribe(const std::string& topic)
|
|
|
|
|
+{
|
|
|
|
|
+ if (m_pMosquitto && m_connected)
|
|
|
|
|
+ mosquitto_unsubscribe(m_pMosquitto, NULL, topic.c_str());
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+void MQTTImpl::ConnectCallback(mosquitto* pMosquitto, void* pObject, int result)
|
|
|
|
|
+{
|
|
|
|
|
+ auto pMQTTImpl = static_cast<MQTTImpl*>(pObject);
|
|
|
|
|
+ pMQTTImpl->ConnectCallback(result);
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+void MQTTImpl::MessageCallback(mosquitto* pMosquitto, void* pObject, const mosquitto_message* pMessage)
|
|
|
|
|
+{
|
|
|
|
|
+ auto pMQTTImpl = static_cast<MQTTImpl*>(pObject);
|
|
|
|
|
+ pMQTTImpl->MessageCallback(pMessage);
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+bool MQTTImpl::Connect()
|
|
|
|
|
+{
|
|
|
|
|
+ if (m_pMosquitto)
|
|
|
|
|
+ {
|
|
|
|
|
+ mosquitto_connect_callback_set(m_pMosquitto, MQTTImpl::ConnectCallback);
|
|
|
|
|
+ mosquitto_message_callback_set(m_pMosquitto, MQTTImpl::MessageCallback);
|
|
|
|
|
+
|
|
|
|
|
+ if (MOSQ_ERR_SUCCESS != mosquitto_connect(m_pMosquitto, m_hostname.c_str(), m_port, 60))
|
|
|
|
|
+ return false;
|
|
|
|
|
+
|
|
|
|
|
+ Logging::Log(Logging::Severity::Info, "MQTT::Connect() - Connected");
|
|
|
|
|
+
|
|
|
|
|
+ return true;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ return false;
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+void MQTTImpl::Disconnect()
|
|
|
|
|
+{
|
|
|
|
|
+ if (m_pMosquitto && m_connected)
|
|
|
|
|
+ {
|
|
|
|
|
+ if (MOSQ_ERR_SUCCESS != mosquitto_disconnect(m_pMosquitto))
|
|
|
|
|
+ return;
|
|
|
|
|
+ m_connected = false;
|
|
|
|
|
+ }
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+void MQTTImpl::StartLoop()
|
|
|
|
|
+{
|
|
|
|
|
+ std::unique_lock<std::mutex> lock(m_mutex);
|
|
|
|
|
+ m_loop = true;
|
|
|
|
|
+ m_thread = std::thread([&] { Loop(); });
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+bool MQTTImpl::RunLoop() const
|
|
|
|
|
+{
|
|
|
|
|
+ std::unique_lock<std::mutex> lock(m_mutex);
|
|
|
|
|
+ return m_loop;
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+void MQTTImpl::Loop()
|
|
|
|
|
+{
|
|
|
|
|
+ while (RunLoop())
|
|
|
|
|
+ {
|
|
|
|
|
+ bool success = (MOSQ_ERR_SUCCESS == mosquitto_loop(m_pMosquitto, -1, 1));
|
|
|
|
|
+
|
|
|
|
|
+ if (m_loop && !success)
|
|
|
|
|
+ {
|
|
|
|
|
+ Logging::Log(Logging::Severity::Info, "MQTT::Loop() - Disconnected");
|
|
|
|
|
+ Disconnect();
|
|
|
|
|
+ sleep(3);
|
|
|
|
|
+ while (RunLoop() && !success)
|
|
|
|
|
+ success = Connect();
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ Logging::Log(Logging::Severity::Info, "MQTT::Loop() - End");
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+void MQTTImpl::StopLoop()
|
|
|
|
|
+{
|
|
|
|
|
+ {
|
|
|
|
|
+ std::unique_lock<std::mutex> lock(m_mutex);
|
|
|
|
|
+ m_loop = false;
|
|
|
|
|
+ }
|
|
|
|
|
+ m_thread.join();
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+void MQTTImpl::ConnectCallback(int result)
|
|
|
|
|
+{
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+void MQTTImpl::MessageCallback(const mosquitto_message* pMessage)
|
|
|
|
|
+{
|
|
|
|
|
+ Message message;
|
|
|
|
|
+ message.topic = std::string(pMessage->topic);
|
|
|
|
|
+ message.payload = std::string(static_cast<char*>(pMessage->payload), pMessage->payloadlen);
|
|
|
|
|
+
|
|
|
|
|
+ m_signal.emit(message);
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
|
|
+} // namespace MQTT
|