Program Listing for File mqtt_subscriber.cpp¶
↰ Return to documentation for file (src/bitrl/network/mqtt_subscriber.cpp)
//
// Created by alex on 11/23/25.
//
#include "bitrl/network/mqtt_subscriber.h"
#include <optional>
#include <mqtt/message.h>
namespace bitrl {
namespace network
{
MqttSubscriber::MqttSubscriber(const std::string& server, const std::string& topic)
:
server_(server),
topic_(topic),
cli_(server, "")
{}
MqttSubscriber::~MqttSubscriber() {
try
{
cli_.stop_consuming();
cli_.disconnect()->wait();
}
catch (...)
{
}
}
void
MqttSubscriber::connect()
{
conn_opts_.set_clean_session(true);
cli_.connect(conn_opts_)->wait();
cli_.set_callback(*this);
cli_.start_consuming();
cli_.subscribe(topic_, 1)->wait();
}
void
MqttSubscriber::message_arrived(mqtt::const_message_ptr msg)
{
std::lock_guard<std::mutex> lock(mutex_);
queue_.push(msg->to_string());
cv_.notify_one(); // Notify any thread waiting in poll()
}
std::optional<std::string>
MqttSubscriber::poll(std::chrono::milliseconds timeout) {
std::unique_lock<std::mutex> lock(mutex_);
if (!cv_.wait_for(lock, timeout, [this]{ return !queue_.empty(); })) {
// timeout, no message arrived
return std::nullopt;
}
auto data = queue_.front();
queue_.pop();
return data;
}
std::optional<std::string>
MqttSubscriber::read(std::chrono::milliseconds timeout)
{
std::unique_lock<std::mutex> lock(mutex_);
if (timeout == std::chrono::milliseconds::zero()) {
// Block indefinitely
cv_.wait(lock, [&]{ return !queue_.empty(); });
std::string msg = std::move(queue_.front());
queue_.pop();
return msg;
}
else {
// Block with timeout
if (!cv_.wait_for(lock, timeout, [&]{ return !queue_.empty(); }))
return std::nullopt;
std::string msg = std::move(queue_.front());
queue_.pop();
return msg;
}
}
}
}