Explorar o código

Add multi-threaded behavior

JDierkse hai 10 meses
pai
achega
e3873bf6ec
Modificáronse 6 ficheiros con 14 adicións e 6 borrados
  1. 1 0
      Libraries/CTPL
  2. 2 2
      MQTT/MQTT.cpp
  3. 6 2
      MQTT/MQTTImpl.cpp
  4. 3 1
      MQTT/MQTTImpl.h
  5. 1 0
      Makefile.conf
  6. 1 1
      include/MQTT.h

+ 1 - 0
Libraries/CTPL

@@ -0,0 +1 @@
+../../CTPL

+ 2 - 2
MQTT/MQTT.cpp

@@ -4,8 +4,8 @@
 
 namespace MQTT {
 
-MQTT::MQTT(const std::string& hostname, int port) :
-	m_pMQTTImpl(new MQTTImpl(hostname, port))
+MQTT::MQTT(const std::string& hostname, int port, int threads) :
+	m_pMQTTImpl(new MQTTImpl(hostname, port, threads))
 {
 }
 

+ 6 - 2
MQTT/MQTTImpl.cpp

@@ -7,7 +7,8 @@
 
 namespace MQTT {
 
-MQTTImpl::MQTTImpl(const std::string& hostname, int port) :
+MQTTImpl::MQTTImpl(const std::string& hostname, int port, int threads) :
+	m_threadPool(threads),
 	m_hostname(hostname),
 	m_port(port),
 	m_connected(false),
@@ -26,6 +27,7 @@ MQTTImpl::MQTTImpl(const std::string& hostname, int port) :
 
 MQTTImpl::~MQTTImpl()
 {
+	m_threadPool.stop();
 	StopLoop();
 	Disconnect();
 	mosquitto_destroy(m_pMosquitto);
@@ -160,7 +162,9 @@ void MQTTImpl::MessageCallback(const mosquitto_message* pMessage)
 	message.topic = std::string(pMessage->topic);
 	message.payload = std::string(static_cast<char*>(pMessage->payload), pMessage->payloadlen);
 
-	m_signal.emit(message);
+	m_threadPool.push([this, message](int) {
+		m_signal.emit(message);
+	});
 }
 
 bool MQTTImpl::InternalSubscribe(const std::string& topic)

+ 3 - 1
MQTT/MQTTImpl.h

@@ -3,6 +3,7 @@
 
 #include "MQTT.h"
 #include "MQTTMessage.h"
+#include <ctpl_stl.h>
 #include <SimpleSignal.h>
 #include <mutex>
 #include <string>
@@ -18,7 +19,7 @@ namespace MQTT {
 class MQTTImpl
 {
 public:
-	MQTTImpl(const std::string& hostname, int port);
+	MQTTImpl(const std::string& hostname, int port, int threads);
 	~MQTTImpl();
 
 	bool Send(const MQTTMessage& message);
@@ -52,6 +53,7 @@ private:
 private:
 	std::thread m_thread;
 	mutable std::mutex m_mutex;
+	ctpl::thread_pool m_threadPool;
 
 	std::string m_hostname;
 	int m_port;

+ 1 - 0
Makefile.conf

@@ -4,6 +4,7 @@
 
 LIBRARIES += Logging
 
+CFLAGS += -I$(ROOTPATH)/Libraries/CTPL
 CFLAGS += -I$(ROOTPATH)/Libraries/SimpleSignal
 
 DEBUGDIR := .debug

+ 1 - 1
include/MQTT.h

@@ -14,7 +14,7 @@ class MQTTImpl;
 class MQTT
 {
 public:
-	MQTT(const std::string& hostname, int port);
+	MQTT(const std::string& hostname, int port, int threads = 3);
 	~MQTT();
 
 	MQTT(const MQTT&) = delete;