轻量级消息发布订阅:Redis的适用场景

前言

在实际项目中,某些业务场景需要使用消息的发布订阅功能来实现特殊需求。虽然常见的消息中间件如 RabbitMQ、Kafka 和 ActiveMQ 等提供了强大的消息处理能力,但它们通常被认为是较为“重量级”的解决方案,使用成本较高。在一些业务场景中,我们只需要实现消息的发布订阅功能,并不需要保证消息的绝对可靠性和高性能要求。此时,使用 Redis 作为消息中间件无疑是更好的选择。

项目如何搭建略过,可以使用 Spring Initializr 或者其他 IDEA 创建一个新的 Spring Boot 项目,并添加相关依赖即可

创建 Redis 消息发布者

创建一个服务类用于发布消息:

复制
@Service public class RedisPublisherService { @Autowired private RedisTemplate redisTemplate; public void publishMessage(String channel, String message) { redisTemplate.convertAndSend(channel, message); } }1.2.3.4.5.6.7.8.9.10.

创建 Redis 消息订阅者

创建一个服务类用于监听消息:

复制
/** * Redis 消息订阅-消息监听器,当收到阅订的消息时,会将消息交给这个类处理 * 可以直接实现 MessageListener 接口,也可以继承它的实现类 MessageListenerAdapter * 自动多线程处理 */ @Service public class RedisSubscriberService implements MessageListener { @Override public void onMessage(Message message, byte[] pattern) { String channel = message.getChannel().toString(); String data = new String(message.getBody()); System.out.println("Received message from channel " + channel + ": " + data); } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.

消息监听器绑定监听指定通道

复制
/** * 自定义 RedisTemplate 序列化方式 * 配置主题订阅 - Redis消息监听器绑定监听指定通道 */ @Configuration public class RedisConfig { // 自定义的消息订阅监听器 @Resource private RedisSubscriberService redisSubscriberService; /** * 自定义 RedisTemplate 序列化方式 * @param redisConnectionFactory * @return */ @Bean public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) { RedisTemplate<Object, Object> redisTemplate = new RedisTemplate<>(); //绑定 RedisConnectionFactory redisTemplate.setConnectionFactory(redisConnectionFactory); //创建 Jackson2JsonRedisSerializer 序列方式,对象类型使用 Object 类型, Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class); ObjectMapper objectMapper = new ObjectMapper(); objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY); objectMapper.activateDefaultTyping(new LaissezFaireSubTypeValidator(), ObjectMapper.DefaultTyping.NON_FINAL); jackson2JsonRedisSerializer.setObjectMapper(objectMapper); // 设置 RedisTemplate 序列化规则,因为 key 通常是普通的字符串,所以使用StringRedisSerializer即可,而 value 是对象时,才需要使用序列化与反序列化。 redisTemplate.setKeySerializer(new StringRedisSerializer()); redisTemplate.setValueSerializer(jackson2JsonRedisSerializer); // hash key 序列化规则 redisTemplate.setHashKeySerializer(new StringRedisSerializer()); redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer); //属性设置后操作 redisTemplate.afterPropertiesSet(); return redisTemplate; } /** * 配置主题订阅 * 可以添加多个监听器,监听多个通道,只需要将消息监听器与订阅的通道/主题绑定即可。 * addMessageListener(MessageListener listener, Collection<? extends Topic> topics):将消息监听器与多个订阅的通道/主题绑定 * addMessageListener(MessageListener listener, Topic topic):将消息监听器与订阅的通道/主题绑定 * @param connectionFactory * @return */ @Bean public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); // 设置连接工厂,RedisConnectionFactory 可以直接从容器中取,也可以从 RedisTemplate 中取 container.setConnectionFactory(connectionFactory); // 订阅名称叫test-channel的通道, 类似 Redis 中的subscribe命令 container.addMessageListener(redisSubscriberService, new ChannelTopic("test-channel")); // 订阅名称以 user- 开头的全部通道, 类似 Redis 的 pSubscribe 命令 container.addMessageListener(redisSubscriberService, new PatternTopic("user-*")); return container; } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.33.34.35.36.37.38.39.40.41.42.43.44.45.46.47.48.49.50.51.52.53.54.55.

测试发布与订阅

创建一个测试类来测试发布和订阅功能:

复制
@Component public class RedisTestRunner implements CommandLineRunner { @Autowired private RedisPublisherService publisherService; @Autowired private RedisSubscriberService subscriberService; @Override public void run(String... args) throws Exception { // 发布消息 publisherService.publishMessage("test-channel", "Hello, yian!"); publisherService.publishMessage("user-channel", "Hello, weilai!"); } }1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.

THE END
本站服务器由亿华云赞助提供-企业级高防云服务器