2023-02-28 14:16:49 来源:腾讯云
延迟队列存储的对象是对应的延迟消息,所谓的延迟消息是指当消息被发送以后,并不想让消费者立刻拿到消息,而是等待特定时间后,消费者才能拿到这个消息进行消费
利用RabbitMq的TTL和死信队列 来实现延时消费。
(相关资料图)
如果设置的是队列统一过期时间放到死信队列,没有什么问题。
如果是延时时间设置到每条消息上的。而不是给队列的。
实现方式为消息存活时间为动态用户页面可配置的。
这就导致了一个问题:
先用一条消息的存活时间是1天。后面又进了一条消息存活时间是1小时。
结果一小时到了,发现这条消息并没有被转发到消费延时过期消息的队列。
原因是尽管ttl是设给每条消息的。但是本质上,所有延时消息都还在一个队列里,对它过期时间的检测也是从头部开始的。
它不会检测每一条消息是否过期。而是顺序检测。
如果first in的消息过期时间很长,会导致它阻塞后进的消息。
不仅无法实现真正的过期时间。还会导致,一个大的过期时间的先进的消息,会堆积一堆后进的过期时间短的消息。
问题解决
这个时候可以使用rabbitMq的一个插件:rabbitmq_delayed_message_exchange
一段时间以来,人们一直在寻找用RabbitMQ实现延迟消息的传递方法,到目前为止,公认的解决方案是混合使用TTL和DLX。而rabbitmq_delayed_message_exchange插件就是基于此来实现的,RabbitMQ延迟消息插件新增了一种新的交换器类型,消息通过这种交换器路由就可以实现延迟发送
插件安装
需要根据自己的rabbitMq选择对应的版本。我rabbitMq的版本是RabbitMQ 3.11.0,对应的插件版本就是:3.11.1
基于Linux
--1、cd到rabbitmq默认安装位置cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.7.18/plugins--2、通过ftp工具将插件上传到此目录下--3、开启插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange--4、重启MQ服务systemctl restart rabbitmq-server基于Docker
--1、通过ftp工具将插件上传到Linux服务器的根目录下--2、拷贝到docker中rabbitmq插件目录下,rabbitmq_delayed_message_exchange-3.9.0.ez(下载包的全名)docker cp /rabbitmq_delayed_message_exchange-3.9.0.ez 容器ID:/plugins--3、进入容器docker exec -it 容器id /bin/bash--4、查看插件是否存在(确保2中的操作已经将插件拷贝过来了)cd pluginsls |grep delay--5、开启插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange--6、退出容器exit--7、重启MQ服务docker restart 容器ID安装成功
web界面新建交换机选择类型出现红框标注即表示成功
代码实现
1:springBoot配置
@Configurationpublic class DelayRabbitmqConfig { /** * 声明延迟队列 * @return */ @Bean public Queue delayQueue(){ return new Queue(QueueConstant.DelayQueue, true,false,false); } /** * 声明延迟自定义交换机类型 * @return */ @Bean public CustomExchange delayCustomExchange(){ HashMap args = new HashMap<>();// 设置 x-delayed-type 为 direct,当然也可以是 topic 等 发送消息时设置消息头 headers 的 x-delay 属性,即延迟时间,如果不设置消息将会立即投递 args.put("x-delayed-type","direct"); return new CustomExchange(ExchangeConstant.DelayCustomerExchange, "x-delayed-message",true,false,args); } /** * 绑定延迟交换机和队列 * @return */ @Bean public Binding delayQueueAndCustomExchange(){ return BindingBuilder.bind(delayQueue()) .to(delayCustomExchange()).with(RoutingKeyConstant.DelayCustomerRoutingKey).noargs(); }} springMvc配置
引入依赖: xmlns:util="http://www.springframework.org/schema/util" http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util-4.0.xsd 代码实现
//消息发送final MessagePostProcessor messagePostProcessor = new MyMessagePostProcessor(Integer.valueOf(ttl.toString()));DisTimingPushDto disTimingPushDto = new DisTimingPushDto();disTimingPushDto.setOrderId(dispense.getOrderId());disTimingPushDto.setPushTime(disDispense.getPushTime());rabbitTemplate.convertAndSend(MsgQueueEnum.TIMING_PUSH.getExchangeName(), MsgQueueEnum.TIMING_PUSH.getQueueName(), disTimingPushDto, messagePostProcessor);//每条消息时间配置import org.springframework.amqp.AmqpException;import org.springframework.amqp.core.Message;import org.springframework.amqp.core.MessagePostProcessor;/** * 延迟消息处理器 Processor * @author king * @date 2022年12月28日 11:14 */public class MyMessagePostProcessor implements MessagePostProcessor { /** * 消息延迟时间,单位:毫秒 */ private final Integer TTL; public MyMessagePostProcessor(final Integer ttl) { this.TTL = ttl; } @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(TTL); return message; }}
标签: RabbitMQ
- 加快虚拟仿真实训基地建设 启动职业学校信息化建设试点很必要
- “双减”后如何在满足学生多样需求方面做“加法”?
- 处于生理活跃期且心理发展不成熟 高校开设公共卫生必修课很必要
- 价格低于相应蔬菜零售价 西安投放约1万吨政府储备蔬菜
- 深受年轻消费群体所青睐 国潮风商品成为年货新选择
知识
- 他把银行卡卖给骗子,“黑吃黑”“截胡”十万元
- “老司机”4S店试驾豪车 结果油门当刹车撞了
- 新开工改造城镇老旧小区5.34万个
- 发动巡河志愿者2万余名 “用心护好每一条河”
- 假客服的套路:伪装成大平台客服,层层布局引人上钩
人物
- 当前热门:小鲨易贷网贷逾期7个月征信有什么影响
- 通讯!业界:搭平台促交流 助力中国白酒走向世界
- 视讯!LPR连续四月不变 专家预计短期内仍将持稳
- 环球热资讯!57.5% 网友赞成,马斯克或将卸任 Twitter CEO
- 提钱花网贷逾期半年会不会上征信_速看料
- 育碧正在为其《星球大战》游戏寻找测试人员
- 世界杯的阿尔法之战
- 别受伤!浓眉篮下被小卡晃飞 空中失去重心后背着地 当前速讯
- 世界新资讯:中油测井多相位定向光纤避射技术填补空白
- 天天快资讯:中材科技: 董事会决议公告
- 每日热门:锦程消费金融的一股东想彻底退出
- 深南电A董秘回复:关于您提出的问题,现回复如下:公司目前的主营业务为天然气发电,未涉及到核聚变反应相关业务
- 热议:伯特利: 芜湖伯特利汽车安全系统股份有限公司章程(2022年12月修订)(更正后)
- 宝钢包装(601968)12月15日主力资金净买入24.60万元 今日播报
- 鸥玛软件董秘回复:截至11月30日,公司股票持有数量15,896户
- 铂力特: 西安铂力特增材技术股份有限公司监事会关于2020年限制性股票激励计划首次授予部分第二个归属及预留授予第一个归属期归属名单的核查意见 今日报
- 济源市人民法院:加强失信曝光、法律文书电子送达 全球新要闻
- 【机构调研记录】中海基金调研楚江新材、伟星新材等4只个股(附名单)
- 世界通讯!一边要路权,一边要停车,占路20余年车棚这样拆违腾路
- 环球热消息:独家资金:早盘主力买入前10股
- 免费可商用!荣耀HONOR Sans字体来了 附下载
- 在南洋与中文相遇(阅读时光)_短讯
- 源杰科技(688498)新股概览,12月12日开始网上申购-视焦点讯
- 中欣氟材董秘回复:谢谢您对本公司的关注,目前公司钠电池电解液添加液产品正在设计及设备订购等前期项目准备中
- 头条:英集芯(688209)12月7日主力资金净卖出490.52万元
- 股票行情快报:石化机械(000852)12月6日主力资金净卖出3954.17万元
- 浙江两轮核酸检测结果均为阴性 无新增本土阳性感染者
- 新疆阿克苏地区库车市发生4.1级地震 震源深度18千米
- 抵返哈尔滨人员须持48小时内核酸检测阴性证明
- 浙大紫金港校区已解封 有7337人有序离开该校区
- 2021年广东省第七届风筝锦标赛落幕
- 黑龙江讷河市启动全员核酸检测 目前讷河市全员核酸检测结果均为阴性
- 【同心粤港澳 携手大湾区】南头古城,搭建深港澳三地文化创意活动交流平台
- 重庆入河排污口整治工作推进至全市26个区县
- 四川省第二批政法队伍教育整顿:立案审查调查省级政法机关干警58人
- 长三角区域生态环境部门“云签约”长江大保护倡议书
- 古老长城重焕新生机
- 藏不住了!你同事里有许多“武林高手”……
- 浙江杭州2例无症状感染者系感染德尔塔变异株
- 喜马拉雅的深情和誓言
- 浪漫之城打造山海城一体新地标
- 让老年人更适应数字生活
- 内蒙古通辽市新增1例本土确诊病例、1例无症状感染者
- 徐州无新增确诊病例 核酸检测55515人结果均为阴性
- 甘肃培树“农家巧娘”增技能:返乡创业掌勺又“掌柜”
- 内蒙古通辽市科尔沁区一地调整为中风险地区
- 上海本轮疫情涉及闭环管理的医疗机构全面恢复门急诊
- 青年学生成艾滋病感染高发人群 “社会疫苗”如何打?
- 内蒙古满洲里新增本土确诊病例1例 当地开展第二轮大规模核酸检测
- 江西无新增本土确诊病例 上饶全面恢复正常生产生活秩序
精彩阅读
- 中老铁路上会四国语言的列车长:用心维护中老友谊的桥梁
- 海南首次发现有环志的世界极危鸟种勺嘴鹬
- 一场“网络劝生者”和“网络劝死者”的战役
- 内蒙古通辽新增本土确诊和无症状感染者各1例 轨迹公布
- 江西中烟工业有限责任公司原总经理姚庆艳接受审查调查
- 宁夏45例新冠肺炎确诊病例均已治愈出院
- 内蒙古通辽市科尔沁区发现2名初筛阳性人员
- 生活在闹钟里的丈夫:自己迟一秒,渐冻症妻子就会多一分疼
- 辽宁新冠肺炎确诊病例零新增
- 11月28日16-24时,内蒙古新增本土确诊病例1例
- 奥密克戎毒株为何“需要关注”?现有防疫工具还有效吗?
- 黑龙江新增本土无症状感染者1例
- 这辈子一定要去趟这个公园 在这里“有种爱叫放手”
- 那年今日 | 一张漫画涨知识之11月29日
- 寒潮预警!我国中东部迎大范围降温 黑龙江等地降幅可达12℃
- 冷空气继续影响我国中东部 华北黄淮等地有雾和霾天气
