Spring Boot + MQTT:消息交互轻松实现
介绍
MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,专为资源受限的设备和低带宽、高延迟的网络环境设计。在物联网(IoT)领域,MQTT因其低开销和高效能而被广泛应用。
图片
使用
安装说明
图片
默认账号:admin/public,登录后进入首页:
图片
复制
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-mqtt</artifactId>
</dependency>1.2.3.4.5.6.7.8.
复制
# MQTT配置
mqtt:
broker-url: tcp://localhost:1883 # MQTT代理服务器地址
client-id: yian-mqtt-client # 客户端ID,必须唯一
username: admin # 用户名(如果需要)
password: 1qazxsw2 # 密码(如果需要)
topics:
topic1: test/topic_1
topic2: test/topic_2
topic3: test/topic_3
qos: 1 # 消息质量等级(0、1或2)
automatic-reconnect: true# 自动重连
clean-session: true# 是否清除会话
connection-timeout: 5000 # 连接超时时间(秒)
keep-alive-interval: 30 # 保持连接时间(秒)
pool-config:
core-size: 8
max-size: 32
queue-capacity: 1024
thread-name-prefix: mqtt-worker-1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.
复制
@Data
@Component
@ConfigurationProperties(prefix = "mqtt")
public class MqttProperties {
/**
* MQTT Broker地址
*/
private String brokerUrl;
/**
* 客户端ID
*/
private String clientId;
/**
* 用户名
*/
private String username;
/**
* 密码
*/
private String password;
/**
* QoS级别 (0/1/2)
*/
private int qos = 1;
/**
* 自动重连
*/
private boolean automaticReconnect = false;
/**
* 清理会话
*/
private boolean cleanSession = false;
/**
* 连接超时时间(ms)
*/
private int connectionTimeout = 5000;
/**
* 保持连接间隔(秒)
*/
private int keepAliveInterval = 30;
/**
* 主题配置
*/
private Topics topics = new Topics();
/**
* 线程相关参数
*/
private PoolConfig poolConfig = new PoolConfig();
@Data
public static class PoolConfig {
private int coreSize = 8;
private int maxSize = 16;
private int queueCapacity = 1000;
private String threadPrefix = "mqtt-worker-";
}
@Data
public static class Topics {
/**
* topic1
*/
private String topic1;
/**
* topic2
*/
private String topic2;
/**
* topic3
*/
private String topic3;
}
// 需要显式声明空构造器
public MqttProperties() {
}
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.
复制
@Slf4j
@Configuration
public class MqttConfig {
private static final List<String> DEFAULT_TOPICS = Collections.singletonList("defaultTopic");
private final MqttProperties mqttProperties;
private final MqttCallback mqttCallback;
public MqttConfig(MqttProperties mqttProperties, MqttCallback mqttCallback) {
this.mqttProperties = mqttProperties;
this.mqttCallback = mqttCallback;
}
@Bean
public MqttClient mqttClient() throws MqttException {
MqttClient client = createMqttClient();
MqttConnectOptions options = buildMqttConnectOptions();
try {
client.connect(options);
log.info("MQTT连接成功,Broker地址: {}", mqttProperties.getBrokerUrl());
subscribeTopics(client);
} catch (MqttException e) {
log.error("MQTT连接异常: {},错误码: {}", e.getMessage(), e.getReasonCode(), e);
throw new RuntimeException("MQTT连接失败", e);
}
client.setCallback(mqttCallback);
return client;
}
private MqttClient createMqttClient() throws MqttException {
return new MqttClient(
mqttProperties.getBrokerUrl(),
generateClientId(),
new MemoryPersistence()
);
}
private String generateClientId() {
return Optional.ofNullable(mqttProperties.getClientId())
.filter(StringUtils::hasText)
.orElseGet(() -> "CLIENT_" + System.currentTimeMillis());
}
private MqttConnectOptions buildMqttConnectOptions() {
MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(mqttProperties.isAutomaticReconnect());
options.setCleanSession(mqttProperties.isCleanSession());
Optional.ofNullable(mqttProperties.getUsername())
.filter(StringUtils::hasText)
.ifPresent(options::setUserName);
Optional.ofNullable(mqttProperties.getPassword())
.filter(StringUtils::hasText)
.map(String::toCharArray)
.ifPresent(options::setPassword);
options.setConnectionTimeout(mqttProperties.getConnectionTimeout());
options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval());
return options;
}
private void subscribeTopics(MqttClient client) throws MqttException {
List<String> topics = getTopicsToSubscribe();
for (String topic : topics) {
try {
client.subscribe(topic, mqttProperties.getQos());
log.info("成功订阅主题: {}", topic);
} catch (MqttException e) {
log.error("订阅主题[{}]失败,错误码: {}", topic, e.getReasonCode(), e);
throw e;
}
}
}
private List<String> getTopicsToSubscribe() {
return Optional.ofNullable(mqttProperties.getTopics())
.map(t -> Arrays.asList(t.getTopic1()))
.filter(list -> !list.contains(null))
.orElse(DEFAULT_TOPICS);
}
@Bean("mqttExecutor")
public Executor mqttExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(mqttProperties.getPoolConfig().getCoreSize());
executor.setMaxPoolSize(mqttProperties.getPoolConfig().getMaxSize());
executor.setQueueCapacity(mqttProperties.getPoolConfig().getQueueCapacity());
executor.setThreadNamePrefix(mqttProperties.getPoolConfig().getThreadPrefix());
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor;
}
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.
复制
@Slf4j
@Component
public class MqttMessageListener implements MqttCallback {
private static final int MAX_RETRY_ATTEMPTS = 10;
private static final long INITIAL_RETRY_DELAY = 1_000L;
private static final List<String> DEFAULT_TOPICS = Collections.singletonList("defaultTopic");
private final ScheduledExecutorService reconnectScheduler = Executors.newSingleThreadScheduledExecutor();
private final AtomicInteger retryCounter = new AtomicInteger(0);
private final Map<String, MessageHandler> topicHandlers = new ConcurrentHashMap<>();
private final TestService testService;
private final MqttProperties mqttProperties;
@Lazy
@Autowired
private MqttClient mqttClient;
public MqttMessageListener(TestService testService, MqttProperties mqttProperties) {
this.testService = testService;
this.mqttProperties = mqttProperties;
initializeHandlers();
}
private void initializeHandlers() {
topicHandlers.put(mqttProperties.getTopics().getTopic1(), this::handleMessage2);
}
@Override
public void connectionLost(Throwable cause) {
log.error("MQTT连接中断,原因: {}", cause.getMessage());
scheduleReconnect();
}
private synchronized void scheduleReconnect() {
int attempt = retryCounter.incrementAndGet();
if (attempt > MAX_RETRY_ATTEMPTS) {
log.error("达到最大重连次数[{}],停止重连尝试", MAX_RETRY_ATTEMPTS);
return;
}
long delay = INITIAL_RETRY_DELAY * (long) Math.pow(2, attempt - 1);
log.info("将在{}ms后尝试第{}次重连...", delay, attempt);
reconnectScheduler.schedule(() -> {
try {
if (!mqttClient.isConnected()) {
mqttClient.reconnect();
}
subscribeTopics(mqttClient);
mqttClient.setCallback(this);
retryCounter.set(0);
log.info("MQTT连接恢复成功");
} catch (MqttException e) {
log.error("第{}次重连失败: {}", attempt, e.getMessage(),e);
scheduleReconnect();
}
}, delay, TimeUnit.MILLISECONDS);
}
private void subscribeTopics(MqttClient client) throws MqttException {
List<String> topics = getTopicsToSubscribe();
for (String topic : topics) {
try {
client.subscribe(topic, mqttProperties.getQos());
log.info("成功订阅主题: {}", topic);
} catch (MqttException e) {
log.error("订阅主题[{}]失败,错误码: {}", topic, e.getReasonCode(), e);
throw e;
}
}
}
private List<String> getTopicsToSubscribe() {
return Optional.ofNullable(mqttProperties.getTopics())
.map(t -> Arrays.asList(t.getTopic1()))
.filter(list -> !list.contains(null))
.orElse(DEFAULT_TOPICS);
}
@Override
public void messageArrived(String topic, MqttMessage message) {
try {
String payload = validatePayload(message.getPayload());
log.info("收到消息 [Topic:{}][QoS:{}] {}", topic, message.getQos(), payload);
MessageHandler handler = Optional.ofNullable(topicHandlers.get(topic))
.orElseThrow(() -> new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION));
asyncProcessMessage(() -> {
try {
handler.handle(payload);
} catch (Exception e) {
throw new RuntimeException(e);
}
});
} catch (JSONException e) {
log.error("消息格式错误 [Topic:{}]: {}", topic, e.getMessage(), e);
} catch (MqttException e) {
log.error("消息处理失败 [Topic:{}]: {}", topic, e.getMessage(), e);
} catch (Exception e) {
log.error("未知处理异常 [Topic:{}]: {}", topic, e.getMessage(), e);
}
}
private String validatePayload(byte[] payload) throws JSONException {
String json = new String(payload);
try {
JSON.parse(json); // 完整解析 JSON,捕获格式异常
} catch (JSONException e) {
log.error("非法JSON格式: {}", json);
throw new JSONException("非法JSON格式", e);
}
return json;
}
@Async("mqttExecutor")
public void asyncProcessMessage(Runnable task) {
task.run();
}
private void handleMessage2(String payload) {
testService.handleMessage2(payload);
}
private void handleMessage3(String payload) {
testService.handleMessage3(payload);
}
@Override
public void deliveryComplete(IMqttDeliveryToken token) {
try {
if (token.getException() != null) {
log.error("消息投递失败 [MessageId:{}]", token.getMessageId(), token.getException());
} else {
log.info("消息投递成功 [MessageId:{}]", token.getMessageId());
}
} catch (Exception e) {
log.error("获取投递状态失败", e);
}
}
@FunctionalInterface
private interface MessageHandler {
void handle(String payload) throws Exception;
}
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.97.98.99.100.101.102.103.104.105.106.107.108.109.110.111.112.113.114.115.116.117.118.119.120.121.122.123.124.125.126.127.128.129.130.131.132.133.134.135.136.137.138.139.140.141.142.143.144.145.146.147.148.
复制
@Slf4j
@Component
public class MqttPublisher {
@Autowired
@Lazy
private MqttClient mqttClient;
@Value("${mqtt.qos:1}")
private int qosLevel;
private final Object publishLock = new Object();
/**
* 发布消息
*/
public void publishMessage(String msg, String topic) {
try {
synchronized (publishLock) { // 线程安全锁
MqttMessage message = new MqttMessage();
message.setPayload(msg.getBytes());
message.setQos(qosLevel);
message.setRetained(false); // 不保留消息
mqttClient.publish(topic, message);
log.info("Topic:{} 响应已发送: {}", topic, msg);
}
} catch (MqttException e) {
log.error("MQTT发布失败: {}", e.getMessage());
handlePublishFailure(msg, topic, e);
}
}
/**
* 消息失败重试机制
*/
private void handlePublishFailure(String msg, String topic, MqttException e) {
try {
if (mqttClient.isConnected()) {
mqttClient.disconnect();
}
mqttClient.connect();
publishMessage(msg, topic); // 重试发布
} catch (MqttException ex) {
log.error("消息重试失败: {}", ex.getMessage(),ex);
}
}
/**
* 消息持久化存储(QoS 1保障)
*/
private void saveFailedMessage(JSONObject msg) {
// 实现数据库存储逻辑(此处需补充DAO操作)
log.warn("持久化失败消息: {}", msg);
}
}1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.
启动程序:
图片
查看主题:
图片
发布消息:
图片
THE END