博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
java并发之DelayQueue实际运用示例
阅读量:4292 次
发布时间:2019-05-27

本文共 10968 字,大约阅读时间需要 36 分钟。

在学习Java 多线程并发开发过程中,了解到DelayQueue类的主要作用:是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。注意:不能将null元素放置到这种队列中。

Delayed,一种混合风格的接口,用来标记那些应该在给定延迟时间之后执行的对象。此接口的实现必须定义一个 compareTo 方法,该方法提供与此接口的 getDelay 方法一致的排序。

简单的延时队列要有三部分:第一实现了Delayed接口的消息体第二消费消息的消费者第三存放消息的延时队列那下面就来看看延时队列demo。

一、消息体

package com.delqueue;    import java.util.concurrent.Delayed;  import java.util.concurrent.TimeUnit;    /**  * 消息体定义 实现Delayed接口就是实现两个方法即compareTo 和 getDelay最重要的就是getDelay方法,这个方法用来判断是否到期……  *   * @author whd  * @date 2017年9月24日 下午8:57:14  */  public class Message implements Delayed {      private int id;      private String body; // 消息内容      private long excuteTime;// 延迟时长,这个是必须的属性因为要按照这个判断延时时长。        public int getId() {          return id;      }        public String getBody() {          return body;      }        public long getExcuteTime() {          return excuteTime;      }        public Message(int id, String body, long delayTime) {          this.id = id;          this.body = body;          this.excuteTime = TimeUnit.NANOSECONDS.convert(delayTime, TimeUnit.MILLISECONDS) + System.nanoTime();      }        // 自定义实现比较方法返回 1 0 -1 三个参数      @Override      public int compareTo(Delayed delayed) {          Message msg = (Message) delayed;          return Integer.valueOf(this.id) > Integer.valueOf(msg.id) ? 1                  : (Integer.valueOf(this.id) < Integer.valueOf(msg.id) ? -1 : 0);      }        // 延迟任务是否到时就是按照这个方法判断如果返回的是负数则说明到期否则还没到期      @Override      public long getDelay(TimeUnit unit) {          return unit.convert(this.excuteTime - System.nanoTime(), TimeUnit.NANOSECONDS);      }  }

二、消息消费者

package com.delqueue;    import java.util.concurrent.DelayQueue;    public class Consumer implements Runnable {      // 延时队列 ,消费者从其中获取消息进行消费      private DelayQueue
queue; public Consumer(DelayQueue
queue) { this.queue = queue; } @Override public void run() { while (true) { try { Message take = queue.take(); System.out.println("消费消息id:" + take.getId() + " 消息体:" + take.getBody()); } catch (InterruptedException e) { e.printStackTrace(); } } } }

三、延时队列

package com.delqueue;    import java.util.concurrent.DelayQueue;  import java.util.concurrent.ExecutorService;  import java.util.concurrent.Executors;    public class DelayQueueTest {       public static void main(String[] args) {                // 创建延时队列                DelayQueue
queue = new DelayQueue
(); // 添加延时消息,m1 延时3s Message m1 = new Message(1, "world", 3000); // 添加延时消息,m2 延时10s Message m2 = new Message(2, "hello", 10000); //将延时消息放到延时队列中 queue.offer(m2); queue.offer(m1); // 启动消费线程 消费添加到延时队列中的消息,前提是任务到了延期时间 ExecutorService exec = Executors.newFixedThreadPool(1); exec.execute(new Consumer(queue)); exec.shutdown(); } }

将消息体放入延迟队列中,在启动消费者线程去消费延迟队列中的消息,如果延迟队列中的消息到了延迟时间则可以从中取出消息否则无法取出消息也就无法消费。

这就是延迟队列demo,下面我们来说说在真实环境下的使用。

在网上也看到两个示例,但这两个示例个人在实际运行时均没有达到满足业务场景的效果,因而对其进行了修改,供大家参考讨论。

业务场景一:多考生考试

该场景来自于,模拟一个考试的日子,考试时间为120分钟,30分钟后才可交卷,当时间到了,或学生都交完卷了考试结束。

这个场景中几个点需要注意:

  1. 考试时间为120分钟,30分钟后才可交卷,初始化考生完成试卷时间最小应为30分钟
  2. 对于能够在120分钟内交卷的考生,如何实现这些考生交卷
  3. 对于120分钟内没有完成考试的考生,在120分钟考试时间到后需要让他们强制交卷
  4. 在所有的考生都交完卷后,需要将控制线程关闭

实现思想:用DelayQueue存储考生(Student类),每一个考生都有自己的名字和完成试卷的时间,Teacher线程对DelayQueue进行监控,收取完成试卷小于120分钟的学生的试卷。当考试时间120分钟到时,先关闭Teacher线程,然后强制DelayQueue中还存在的考生交卷。每一个考生交卷都会进行一次countDownLatch.countDown(),当countDownLatch.await()不再阻塞说明所有考生都交完卷了,而后结束考试。

package com.my.base.concurrent.delayQueue;import java.util.Iterator;import java.util.Random;import java.util.concurrent.CountDownLatch;import java.util.concurrent.DelayQueue;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;/** *this project is created for my partactice. *In the  project I will write the mybatis by myself * *2014-1-10  下午9:43:48 *@author 孙振超   mychaoyue2011@163.com */public class Exam {    /**     *     *2014-1-10 下午9:43:48 by 孙振超     *     *@param args     *void     * @throws InterruptedException      */    public static void main(String[] args) throws InterruptedException {        // TODO Auto-generated method stub        int studentNumber = 20;        CountDownLatch countDownLatch = new CountDownLatch(studentNumber+1);        DelayQueue< Student> students = new DelayQueue
(); Random random = new Random(); for (int i = 0; i < studentNumber; i++) { students.put(new Student("student"+(i+1), 30+random.nextInt(120),countDownLatch)); } Thread teacherThread =new Thread(new Teacher(students)); students.put(new EndExam(students, 120,countDownLatch,teacherThread)); teacherThread.start(); countDownLatch.await(); System.out.println(" 考试时间到,全部交卷!"); }}class Student implements Runnable,Delayed{ private String name; private long workTime; private long submitTime; private boolean isForce = false; private CountDownLatch countDownLatch; public Student(){} public Student(String name,long workTime,CountDownLatch countDownLatch){ this.name = name; this.workTime = workTime; this.submitTime = TimeUnit.NANOSECONDS.convert(workTime, TimeUnit.NANOSECONDS)+System.nanoTime(); this.countDownLatch = countDownLatch; } @Override public int compareTo(Delayed o) { // TODO Auto-generated method stub if(o == null || ! (o instanceof Student)) return 1; if(o == this) return 0; Student s = (Student)o; if (this.workTime > s.workTime) { return 1; }else if (this.workTime == s.workTime) { return 0; }else { return -1; } } @Override public long getDelay(TimeUnit unit) { // TODO Auto-generated method stub return unit.convert(submitTime - System.nanoTime(), TimeUnit.NANOSECONDS); } @Override public void run() { // TODO Auto-generated method stub if (isForce) { System.out.println(name + " 交卷, 希望用时" + workTime + "分钟"+" ,实际用时 120分钟" ); }else { System.out.println(name + " 交卷, 希望用时" + workTime + "分钟"+" ,实际用时 "+workTime +" 分钟"); } countDownLatch.countDown(); } public boolean isForce() { return isForce; } public void setForce(boolean isForce) { this.isForce = isForce; } }class EndExam extends Student{ private DelayQueue
students; private CountDownLatch countDownLatch; private Thread teacherThread; public EndExam(DelayQueue
students, long workTime, CountDownLatch countDownLatch,Thread teacherThread) { super("强制收卷", workTime,countDownLatch); this.students = students; this.countDownLatch = countDownLatch; this.teacherThread = teacherThread; } @Override public void run() { // TODO Auto-generated method stub teacherThread.interrupt(); Student tmpStudent; for (Iterator
iterator2 = students.iterator(); iterator2.hasNext();) { tmpStudent = iterator2.next(); tmpStudent.setForce(true); tmpStudent.run(); } countDownLatch.countDown(); } }class Teacher implements Runnable{ private DelayQueue
students; public Teacher(DelayQueue
students){ this.students = students; } @Override public void run() { // TODO Auto-generated method stub try { System.out.println(" test start"); while(!Thread.interrupted()){ students.take().run(); } } catch (Exception e) { // TODO: handle exception e.printStackTrace(); } } }

业务场景二:具有过期时间的缓存

该场景来自于,向缓存添加内容时,给每一个key设定过期时间,系统自动将超过过期时间的key清除。

这个场景中几个点需要注意:

  1. 当向缓存中添加key-value对时,如果这个key在缓存中存在并且还没有过期,需要用这个key对应的新过期时间
  2. 为了能够让DelayQueue将其已保存的key删除,需要重写实现Delayed接口添加到DelayQueue的DelayedItem的hashCode函数和equals函数
  3. 当缓存关闭,监控程序也应关闭,因而监控线程应当用守护线程

具体实现如下:

package com.my.base.concurrent.delayQueue;import java.util.Random;import java.util.concurrent.ConcurrentHashMap;import java.util.concurrent.DelayQueue;import java.util.concurrent.Delayed;import java.util.concurrent.TimeUnit;/** *Cache.java * * Created on 2014-1-11 上午11:30:36 by sunzhenchao mychaoyue2011@163.com */public class Cache
{ public ConcurrentHashMap
map = new ConcurrentHashMap
(); public DelayQueue
> queue = new DelayQueue
>(); public void put(K k,V v,long liveTime){ V v2 = map.put(k, v); DelayedItem
tmpItem = new DelayedItem
(k, liveTime); if (v2 != null) { queue.remove(tmpItem); } queue.put(tmpItem); } public Cache(){ Thread t = new Thread(){ @Override public void run(){ dameonCheckOverdueKey(); } }; t.setDaemon(true); t.start(); } public void dameonCheckOverdueKey(){ while (true) { DelayedItem
delayedItem = queue.poll(); if (delayedItem != null) { map.remove(delayedItem.getT()); System.out.println(System.nanoTime()+" remove "+delayedItem.getT() +" from cache"); } try { Thread.sleep(300); } catch (Exception e) { // TODO: handle exception } } } /** * TODO * @param args * 2014-1-11 上午11:30:36 * @author:孙振超 * @throws InterruptedException */ public static void main(String[] args) throws InterruptedException { Random random = new Random(); int cacheNumber = 10; int liveTime = 0; Cache
cache = new Cache
(); for (int i = 0; i < cacheNumber; i++) { liveTime = random.nextInt(3000); System.out.println(i+" "+liveTime); cache.put(i+"", i, random.nextInt(liveTime)); if (random.nextInt(cacheNumber) > 7) { liveTime = random.nextInt(3000); System.out.println(i+" "+liveTime); cache.put(i+"", i, random.nextInt(liveTime)); } } Thread.sleep(3000); System.out.println(); }}class DelayedItem
implements Delayed{ private T t; private long liveTime ; private long removeTime; public DelayedItem(T t,long liveTime){ this.setT(t); this.liveTime = liveTime; this.removeTime = TimeUnit.NANOSECONDS.convert(liveTime, TimeUnit.NANOSECONDS) + System.nanoTime(); } @Override public int compareTo(Delayed o) { if (o == null) return 1; if (o == this) return 0; if (o instanceof DelayedItem){ DelayedItem
tmpDelayedItem = (DelayedItem
)o; if (liveTime > tmpDelayedItem.liveTime ) { return 1; }else if (liveTime == tmpDelayedItem.liveTime) { return 0; }else { return -1; } } long diff = getDelay(TimeUnit.NANOSECONDS) - o.getDelay(TimeUnit.NANOSECONDS); return diff > 0 ? 1:diff == 0? 0:-1; } @Override public long getDelay(TimeUnit unit) { return unit.convert(removeTime - System.nanoTime(), unit); } public T getT() { return t; } public void setT(T t) { this.t = t; } @Override public int hashCode(){ return t.hashCode(); } @Override public boolean equals(Object object){ if (object instanceof DelayedItem) { return object.hashCode() == hashCode() ?true:false; } return false; }}

转载地址:http://uozws.baihongyu.com/

你可能感兴趣的文章
vnpy学习_04回测评价指标的缺陷
查看>>
ubuntu终端一次多条命令方法和区别
查看>>
python之偏函数
查看>>
vnpy学习_06回测结果可视化改进
查看>>
git文件gitignore修改后不生效
查看>>
python之yield
查看>>
读书笔记_量化交易如何建立自己的算法交易01
查看>>
python中==和is与isinstance()和type()区别
查看>>
读书笔记_量化交易如何建立自己的算法交易02
查看>>
读书笔记_量化交易如何建立自己的算法交易03
查看>>
读书笔记_量化交易如何建立自己的算法交易04
查看>>
设计模式01_原因
查看>>
设计模式03_工厂
查看>>
设计模式04_抽象工厂
查看>>
设计模式05_单例
查看>>
设计模式06_原型
查看>>
设计模式07_建造者
查看>>
设计模式08_适配器
查看>>
设计模式09_代理模式
查看>>
设计模式10_桥接
查看>>