Spring Boot + MQTT:消息交互轻松实现

介绍

MQTT(Message Queuing Telemetry Transport)是一种轻量级的消息传输协议,专为资源受限的设备和低带宽、高延迟的网络环境设计。在物联网(IoT)领域,MQTT因其低开销和高效能而被广泛应用。

图片

使用

安装说明

https://docs.emqx.com/zh/emqx/v4.3/getting-started/install.html#zip-%E5%8E%8B%E7%BC%A9%E5%8C%85%E5%AE%89%E8%A3%85-linux%E3%80%81macos%E3%80%81windows

图片

默认账号:admin/public,登录后进入首页:

图片

依赖引入
复制
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-integration</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-mqtt</artifactId> </dependency>1.2.3.4.5.6.7.8.
配置
复制
# MQTT配置 mqtt: broker-url: tcp://localhost:1883 # MQTT代理服务器地址 client-id: yian-mqtt-client # 客户端ID,必须唯一 username: admin # 用户名(如果需要) password: 1qazxsw2 # 密码(如果需要) topics: topic1: test/topic_1 topic2: test/topic_2 topic3: test/topic_3 qos: 1 # 消息质量等级(0、1或2) automatic-reconnect: true# 自动重连 clean-session: true# 是否清除会话 connection-timeout: 5000 # 连接超时时间(秒) keep-alive-interval: 30 # 保持连接时间(秒) pool-config: core-size: 8 max-size: 32 queue-capacity: 1024 thread-name-prefix: mqtt-worker-1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.
复制
@Data @Component @ConfigurationProperties(prefix = "mqtt") public class MqttProperties { /** * MQTT Broker地址 */ private String brokerUrl; /** * 客户端ID */ private String clientId; /** * 用户名 */ private String username; /** * 密码 */ private String password; /** * QoS级别 (0/1/2) */ private int qos = 1; /** * 自动重连 */ private boolean automaticReconnect = false; /** * 清理会话 */ private boolean cleanSession = false; /** * 连接超时时间(ms) */ private int connectionTimeout = 5000; /** * 保持连接间隔(秒) */ private int keepAliveInterval = 30; /** * 主题配置 */ private Topics topics = new Topics(); /** * 线程相关参数 */ private PoolConfig poolConfig = new PoolConfig(); @Data public static class PoolConfig { private int coreSize = 8; private int maxSize = 16; private int queueCapacity = 1000; private String threadPrefix = "mqtt-worker-"; } @Data public static class Topics { /** * topic1 */ private String topic1; /** * topic2 */ private String topic2; /** * topic3 */ private String topic3; } // 需要显式声明空构造器 public MqttProperties() { } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.
MQTT配置类
复制
@Slf4j @Configuration public class MqttConfig { private static final List<String> DEFAULT_TOPICS = Collections.singletonList("defaultTopic"); private final MqttProperties mqttProperties; private final MqttCallback mqttCallback; public MqttConfig(MqttProperties mqttProperties, MqttCallback mqttCallback) { this.mqttProperties = mqttProperties; this.mqttCallback = mqttCallback; } @Bean public MqttClient mqttClient() throws MqttException { MqttClient client = createMqttClient(); MqttConnectOptions options = buildMqttConnectOptions(); try { client.connect(options); log.info("MQTT连接成功,Broker地址: {}", mqttProperties.getBrokerUrl()); subscribeTopics(client); } catch (MqttException e) { log.error("MQTT连接异常: {},错误码: {}", e.getMessage(), e.getReasonCode(), e); throw new RuntimeException("MQTT连接失败", e); } client.setCallback(mqttCallback); return client; } private MqttClient createMqttClient() throws MqttException { return new MqttClient( mqttProperties.getBrokerUrl(), generateClientId(), new MemoryPersistence() ); } private String generateClientId() { return Optional.ofNullable(mqttProperties.getClientId()) .filter(StringUtils::hasText) .orElseGet(() -> "CLIENT_" + System.currentTimeMillis()); } private MqttConnectOptions buildMqttConnectOptions() { MqttConnectOptions options = new MqttConnectOptions(); options.setAutomaticReconnect(mqttProperties.isAutomaticReconnect()); options.setCleanSession(mqttProperties.isCleanSession()); Optional.ofNullable(mqttProperties.getUsername()) .filter(StringUtils::hasText) .ifPresent(options::setUserName); Optional.ofNullable(mqttProperties.getPassword()) .filter(StringUtils::hasText) .map(String::toCharArray) .ifPresent(options::setPassword); options.setConnectionTimeout(mqttProperties.getConnectionTimeout()); options.setKeepAliveInterval(mqttProperties.getKeepAliveInterval()); return options; } private void subscribeTopics(MqttClient client) throws MqttException { List<String> topics = getTopicsToSubscribe(); for (String topic : topics) { try { client.subscribe(topic, mqttProperties.getQos()); log.info("成功订阅主题: {}", topic); } catch (MqttException e) { log.error("订阅主题[{}]失败,错误码: {}", topic, e.getReasonCode(), e); throw e; } } } private List<String> getTopicsToSubscribe() { return Optional.ofNullable(mqttProperties.getTopics()) .map(t -> Arrays.asList(t.getTopic1())) .filter(list -> !list.contains(null)) .orElse(DEFAULT_TOPICS); } @Bean("mqttExecutor") public Executor mqttExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(mqttProperties.getPoolConfig().getCoreSize()); executor.setMaxPoolSize(mqttProperties.getPoolConfig().getMaxSize()); executor.setQueueCapacity(mqttProperties.getPoolConfig().getQueueCapacity()); executor.setThreadNamePrefix(mqttProperties.getPoolConfig().getThreadPrefix()); executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.
消息回调,处理接收的消息
复制
@Slf4j @Component public class MqttMessageListener implements MqttCallback { private static final int MAX_RETRY_ATTEMPTS = 10; private static final long INITIAL_RETRY_DELAY = 1_000L; private static final List<String> DEFAULT_TOPICS = Collections.singletonList("defaultTopic"); private final ScheduledExecutorService reconnectScheduler = Executors.newSingleThreadScheduledExecutor(); private final AtomicInteger retryCounter = new AtomicInteger(0); private final Map<String, MessageHandler> topicHandlers = new ConcurrentHashMap<>(); private final TestService testService; private final MqttProperties mqttProperties; @Lazy @Autowired private MqttClient mqttClient; public MqttMessageListener(TestService testService, MqttProperties mqttProperties) { this.testService = testService; this.mqttProperties = mqttProperties; initializeHandlers(); } private void initializeHandlers() { topicHandlers.put(mqttProperties.getTopics().getTopic1(), this::handleMessage2); } @Override public void connectionLost(Throwable cause) { log.error("MQTT连接中断,原因: {}", cause.getMessage()); scheduleReconnect(); } private synchronized void scheduleReconnect() { int attempt = retryCounter.incrementAndGet(); if (attempt > MAX_RETRY_ATTEMPTS) { log.error("达到最大重连次数[{}],停止重连尝试", MAX_RETRY_ATTEMPTS); return; } long delay = INITIAL_RETRY_DELAY * (long) Math.pow(2, attempt - 1); log.info("将在{}ms后尝试第{}次重连...", delay, attempt); reconnectScheduler.schedule(() -> { try { if (!mqttClient.isConnected()) { mqttClient.reconnect(); } subscribeTopics(mqttClient); mqttClient.setCallback(this); retryCounter.set(0); log.info("MQTT连接恢复成功"); } catch (MqttException e) { log.error("第{}次重连失败: {}", attempt, e.getMessage(),e); scheduleReconnect(); } }, delay, TimeUnit.MILLISECONDS); } private void subscribeTopics(MqttClient client) throws MqttException { List<String> topics = getTopicsToSubscribe(); for (String topic : topics) { try { client.subscribe(topic, mqttProperties.getQos()); log.info("成功订阅主题: {}", topic); } catch (MqttException e) { log.error("订阅主题[{}]失败,错误码: {}", topic, e.getReasonCode(), e); throw e; } } } private List<String> getTopicsToSubscribe() { return Optional.ofNullable(mqttProperties.getTopics()) .map(t -> Arrays.asList(t.getTopic1())) .filter(list -> !list.contains(null)) .orElse(DEFAULT_TOPICS); } @Override public void messageArrived(String topic, MqttMessage message) { try { String payload = validatePayload(message.getPayload()); log.info("收到消息 [Topic:{}][QoS:{}] {}", topic, message.getQos(), payload); MessageHandler handler = Optional.ofNullable(topicHandlers.get(topic)) .orElseThrow(() -> new MqttException(MqttException.REASON_CODE_CLIENT_EXCEPTION)); asyncProcessMessage(() -> { try { handler.handle(payload); } catch (Exception e) { throw new RuntimeException(e); } }); } catch (JSONException e) { log.error("消息格式错误 [Topic:{}]: {}", topic, e.getMessage(), e); } catch (MqttException e) { log.error("消息处理失败 [Topic:{}]: {}", topic, e.getMessage(), e); } catch (Exception e) { log.error("未知处理异常 [Topic:{}]: {}", topic, e.getMessage(), e); } } private String validatePayload(byte[] payload) throws JSONException { String json = new String(payload); try { JSON.parse(json); // 完整解析 JSON,捕获格式异常 } catch (JSONException e) { log.error("非法JSON格式: {}", json); throw new JSONException("非法JSON格式", e); } return json; } @Async("mqttExecutor") public void asyncProcessMessage(Runnable task) { task.run(); } private void handleMessage2(String payload) { testService.handleMessage2(payload); } private void handleMessage3(String payload) { testService.handleMessage3(payload); } @Override public void deliveryComplete(IMqttDeliveryToken token) { try { if (token.getException() != null) { log.error("消息投递失败 [MessageId:{}]", token.getMessageId(), token.getException()); } else { log.info("消息投递成功 [MessageId:{}]", token.getMessageId()); } } catch (Exception e) { log.error("获取投递状态失败", e); } } @FunctionalInterface private interface MessageHandler { void handle(String payload) throws Exception; } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.57.58.59.60.61.62.63.64.65.66.67.68.69.70.71.72.73.74.75.76.77.78.79.80.81.82.83.84.85.86.87.88.89.90.91.92.93.94.95.96.97.98.99.100.101.102.103.104.105.106.107.108.109.110.111.112.113.114.115.116.117.118.119.120.121.122.123.124.125.126.127.128.129.130.131.132.133.134.135.136.137.138.139.140.141.142.143.144.145.146.147.148.
消息发布
复制
@Slf4j @Component public class MqttPublisher { @Autowired @Lazy private MqttClient mqttClient; @Value("${mqtt.qos:1}") private int qosLevel; private final Object publishLock = new Object(); /** * 发布消息 */ public void publishMessage(String msg, String topic) { try { synchronized (publishLock) { // 线程安全锁 MqttMessage message = new MqttMessage(); message.setPayload(msg.getBytes()); message.setQos(qosLevel); message.setRetained(false); // 不保留消息 mqttClient.publish(topic, message); log.info("Topic:{} 响应已发送: {}", topic, msg); } } catch (MqttException e) { log.error("MQTT发布失败: {}", e.getMessage()); handlePublishFailure(msg, topic, e); } } /** * 消息失败重试机制 */ private void handlePublishFailure(String msg, String topic, MqttException e) { try { if (mqttClient.isConnected()) { mqttClient.disconnect(); } mqttClient.connect(); publishMessage(msg, topic); // 重试发布 } catch (MqttException ex) { log.error("消息重试失败: {}", ex.getMessage(),ex); } } /** * 消息持久化存储(QoS 1保障) */ private void saveFailedMessage(JSONObject msg) { // 实现数据库存储逻辑(此处需补充DAO操作) log.warn("持久化失败消息: {}", msg); } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.56.
测试

启动程序:

图片

查看主题:

图片

发布消息:

图片

THE END