博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
聊聊redisson的DelayedQueue
阅读量:6322 次
发布时间:2019-06-22

本文共 10192 字,大约阅读时间需要 33 分钟。

  hot3.png

本文主要研究一下redisson的DelayedQueue

maven

org.redisson
redisson
3.8.1

实例

@Test    public void testDelayedQueue() throws InterruptedException {        Config config = new Config();        config.useSingleServer()                .setAddress("redis://192.168.99.100:6379");        RedissonClient redisson = Redisson.create(config);        RBlockingQueue
blockingQueue = redisson.getBlockingQueue("dest_queue1"); RDelayedQueue
delayedQueue = redisson.getDelayedQueue(blockingQueue); delayedQueue.offer("demo", 10, TimeUnit.SECONDS); Assert.assertFalse(blockingQueue.contains("demo")); TimeUnit.SECONDS.sleep(15); Assert.assertTrue(blockingQueue.contains("demo")); }
  • 这里使用了两个queue,对delayedQueue的offer操作是直接进入delayedQueue,但是delay是作用在目标队列上,这里就是RBlockingQueue

源码解析

RDelayedQueue.offer

redisson-3.8.1-sources.jar!/org/redisson/RedissonDelayedQueue.java

public class RedissonDelayedQueue
extends RedissonExpirable implements RDelayedQueue
{ private final QueueTransferService queueTransferService; private final String channelName; private final String queueName; private final String timeoutSetName; protected RedissonDelayedQueue(QueueTransferService queueTransferService, Codec codec, final CommandAsyncExecutor commandExecutor, String name) { super(codec, commandExecutor, name); channelName = prefixName("redisson_delay_queue_channel", getName()); queueName = prefixName("redisson_delay_queue", getName()); timeoutSetName = prefixName("redisson_delay_queue_timeout", getName()); //QueueTransferTask task = ...... queueTransferService.schedule(queueName, task); this.queueTransferService = queueTransferService; } public void offer(V e, long delay, TimeUnit timeUnit) { get(offerAsync(e, delay, timeUnit)); } public RFuture
offerAsync(V e, long delay, TimeUnit timeUnit) { long delayInMs = timeUnit.toMillis(delay); long timeout = System.currentTimeMillis() + delayInMs; long randomId = PlatformDependent.threadLocalRandom().nextLong(); return commandExecutor.evalWriteAsync(getName(), codec, RedisCommands.EVAL_VOID, "local value = struct.pack('dLc0', tonumber(ARGV[2]), string.len(ARGV[3]), ARGV[3]);" + "redis.call('zadd', KEYS[2], ARGV[1], value);" + "redis.call('rpush', KEYS[3], value);" // if new object added to queue head when publish its startTime // to all scheduler workers + "local v = redis.call('zrange', KEYS[2], 0, 0); " + "if v[1] == value then " + "redis.call('publish', KEYS[4], ARGV[1]); " + "end;" , Arrays.
asList(getName(), timeoutSetName, queueName, channelName), timeout, randomId, encode(e)); } public ByteBuf encode(Object value) { if (commandExecutor.isRedissonReferenceSupportEnabled()) { RedissonReference reference = RedissonObjectFactory.toReference(commandExecutor.getConnectionManager().getCfg(), value); if (reference != null) { value = reference; } } try { return codec.getValueEncoder().encode(value); } catch (IOException e) { throw new IllegalArgumentException(e); } } public static String prefixName(String prefix, String name) { if (name.contains("{")) { return prefix + ":" + name; } return prefix + ":{" + name + "}"; } //......}
  • 这里使用的是一段lua脚本,其中keys参数数组有四个值,KEYS[1]为getName(), KEYS[2]为timeoutSetName, KEYS[3]为queueName, KEYS[4]为channelName
  • 变量有三个,ARGV[1]为timeout,ARGV[2]为randomId,ARGV[3]为encode(e)
  • 这段lua脚本对timeoutSetName的zset添加一个结构体,其score为timeout值;对queueName的list的表尾添加结构体;然后判断timeoutSetName的zset的第一个元素是否是当前的结构体,如果是则对channel发布timeout消息

queueTransferService.schedule

redisson-3.8.1-sources.jar!/org/redisson/RedissonDelayedQueue.java

QueueTransferTask task = new QueueTransferTask(commandExecutor.getConnectionManager()) {                        @Override            protected RFuture
pushTaskAsync() { return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_LONG, "local expiredValues = redis.call('zrangebyscore', KEYS[2], 0, ARGV[1], 'limit', 0, ARGV[2]); " + "if #expiredValues > 0 then " + "for i, v in ipairs(expiredValues) do " + "local randomId, value = struct.unpack('dLc0', v);" + "redis.call('rpush', KEYS[1], value);" + "redis.call('lrem', KEYS[3], 1, v);" + "end; " + "redis.call('zrem', KEYS[2], unpack(expiredValues));" + "end; " // get startTime from scheduler queue head task + "local v = redis.call('zrange', KEYS[2], 0, 0, 'WITHSCORES'); " + "if v[1] ~= nil then " + "return v[2]; " + "end " + "return nil;", Arrays.
asList(getName(), timeoutSetName, queueName), System.currentTimeMillis(), 100); } @Override protected RTopic
getTopic() { return new RedissonTopic
(LongCodec.INSTANCE, commandExecutor, channelName); } }; queueTransferService.schedule(queueName, task);
  • RedissonDelayedQueue构造器里头对QueueTransferTask进行调度
  • 调度执行的是pushTaskAsync方法,主要就是将到期的元素从元素队列移到目标队列
  • 这里使用一段lua脚本,KEYS[1]为getName(),KEYS[2]为timeoutSetName,KEYS[3]为queueName;ARGV[1]为当前时间戳,ARGV[2]为100
  • 这里调用zrangebyscore,对timeoutSetName的zset使用timeout参数进行排序,取得分介于0和当前时间戳的元素,取前200条
  • 如果有值表示该元素需要移交到目标队列,然后调用rpush移交到目标队列,再调用lrem从元素队列移除,最后在从timeoutSetName的zset中删除掉已经处理的这些元素
  • 处理完过元素转移之后,再取timeoutSetName的zset的第一个元素的得分返回,如果没有返回nil

QueueTransferService.schedule

redisson-3.8.1-sources.jar!/org/redisson/QueueTransferService.java

public class QueueTransferService {    private final ConcurrentMap
tasks = PlatformDependent.newConcurrentHashMap(); public synchronized void schedule(String name, QueueTransferTask task) { QueueTransferTask oldTask = tasks.putIfAbsent(name, task); if (oldTask == null) { task.start(); } else { oldTask.incUsage(); } } public synchronized void remove(String name) { QueueTransferTask task = tasks.get(name); if (task != null) { if (task.decUsage() == 0) { tasks.remove(name, task); task.stop(); } } }}
  • 这里的schedule方法首先添加到ConcurrentMap中,如果该任务已经存在,则调用oldTask.incUsage(),不存在则启动该任务

QueueTransferTask.start

redisson-3.8.1-sources.jar!/org/redisson/QueueTransferTask.java

public void start() {        RTopic
schedulerTopic = getTopic(); statusListenerId = schedulerTopic.addListener(new BaseStatusListener() { @Override public void onSubscribe(String channel) { pushTask(); } }); messageListenerId = schedulerTopic.addListener(new MessageListener
() { @Override public void onMessage(CharSequence channel, Long startTime) { scheduleTask(startTime); } }); } private void scheduleTask(final Long startTime) { TimeoutTask oldTimeout = lastTimeout.get(); if (startTime == null) { return; } if (oldTimeout != null) { oldTimeout.getTask().cancel(); } long delay = startTime - System.currentTimeMillis(); if (delay > 10) { Timeout timeout = connectionManager.newTimeout(new TimerTask() { @Override public void run(Timeout timeout) throws Exception { pushTask(); TimeoutTask currentTimeout = lastTimeout.get(); if (currentTimeout.getTask() == timeout) { lastTimeout.compareAndSet(currentTimeout, null); } } }, delay, TimeUnit.MILLISECONDS); if (!lastTimeout.compareAndSet(oldTimeout, new TimeoutTask(startTime, timeout))) { timeout.cancel(); } } else { pushTask(); } } private void pushTask() { RFuture
startTimeFuture = pushTaskAsync(); startTimeFuture.addListener(new FutureListener
() { @Override public void operationComplete(io.netty.util.concurrent.Future
future) throws Exception { if (!future.isSuccess()) { if (future.cause() instanceof RedissonShutdownException) { return; } log.error(future.cause().getMessage(), future.cause()); scheduleTask(System.currentTimeMillis() + 5 * 1000L); return; } if (future.getNow() != null) { scheduleTask(future.getNow()); } } }); }
  • 这里用到了RTopic,添加了StatusListener以及MessageListener
  • StatusListener在订阅的时候触发pushTask,MessageListener主要是调用scheduleTask
  • pushTaskAsync在RedissonDelayedQueue的实现就是上面讲的实现元素在原始队列及目标队列的转移
  • scheduleTask方法会重新计算delay,对于大于10的延时触发pushTask,小于等于10的则立刻触发pushTask
  • pushTask会对pushTaskAsync操作进行回调,如果执行不成功则重新触发scheduleTask,如果执行成功但是返回值(timeoutSetName的zset的第一个元素的得分)不为null的话,则以该值触发scheduleTask

小结

  • redisson的DelayedQueue使用上是将元素及延时信息入队,之后定时任务将到期的元素转移到目标队列
  • 这里使用了三个结构来存储,一个是目标队列list;一个是原生队列list,添加的是带有延时信息的结构体;一个是timeoutSetName的zset,元素是结构体,其score为timeout值
  • redisson使用了很多异步回调来操作,整体代码阅读上会相对费劲些

doc

转载于:https://my.oschina.net/go4it/blog/2206612

你可能感兴趣的文章
Jenkins
查看>>
segment
查看>>
面试/编程
查看>>
打造一个上传图片到图床利器的插件(Mac版 开源)
查看>>
thinkphp判断更新是否成功
查看>>
高效使用jquery之一:请使用'On'函数
查看>>
sessionKey
查看>>
iOS8 Push Notifications
查看>>
各大名企笔试及面经大全(程序猿必读)
查看>>
轨磁条简介
查看>>
如何设计高扩展的在线网页制作平台
查看>>
云服务正在吞噬世界!
查看>>
最近话题火爆的四件事你知道不?
查看>>
SpringBoot整合MyBatis
查看>>
Android 类库书签更新(一)
查看>>
Unity3D Input按键系统
查看>>
简单的一条SQL,不简单的做事思维 NOT IN 、NOT EXISTS、LEFT JOIN用法差别 ...
查看>>
DataWorks:任务未运行自助排查
查看>>
「镁客早报」特斯拉裁员,马斯克解释没有办法;微软推出Azure DevOps赏金计划...
查看>>
centos 7.4 使用 pgxc_ctl 安装与使用
查看>>