# Java并发容器使用 ## ArrayBlockingQueue使用 ```java ArrayBlockingQueue queue = new ArrayBlockingQueue<>(8); // 阻塞添加 queue.add("1"); queue.add("2"); queue.add("3"); System.out.println(queue); // 非阻塞添加 queue.put("4"); queue.put("5"); System.out.println(queue); // 阻塞添加,但队列满不会进入阻塞 queue.offer("6"); queue.offer("7"); System.out.println(queue); // 指定时间内进入阻塞 queue.offer("8", 3, TimeUnit.SECONDS); queue.offer("9", 3, TimeUnit.SECONDS);// 8个队列满,不会进入阻塞 System.out.println(queue); // 从头部获取数据并移出,如果队列为空进入阻塞直到有数据添加 String take = queue.take(); System.out.println(take); // 获取数据并移出队列第一个元素,如果队列为空将会阻塞指定的时间,直到在此期间有新数据写入,或者当前线程被其他线程中断,当线程超时会返回null String poll = queue.poll(3, TimeUnit.SECONDS); System.out.println(poll); // 从头部获取数据并移出,如果队列为空不会阻塞 String poll1 = queue.poll(); System.out.println(poll1); // 从头部获取数据不会移除,队列为空不会阻塞直接返回null String peek = queue.peek(); System.out.println(peek); ``` ## PriorityBlockingQueue使用 ```java // 无边界队列,可以定义初始容量并不是最大容量 PriorityBlockingQueue queue = new PriorityBlockingQueue<>(2); System.out.println(queue); // 队列的添加方法都等于offer方法 queue.offer(1); queue.offer(10); queue.offer(3); System.out.println(queue); ``` ## LinkedBlockingQueue使用 ```java // 属于可选边界 LinkedBlockingQueue integers = new LinkedBlockingQueue<>(10); boolean b = integers.remainingCapacity() == 10; System.out.println(b); // 等于定义的 LinkedBlockingQueue integers1 = new LinkedBlockingQueue<>(); boolean b1 = integers1.remainingCapacity() == Integer.MAX_VALUE; System.out.println(b1); ``` ## InterruptedException使用 ```java public class AtomicExample08 { public static void main(String[] args) throws InterruptedException { DelayQueue queue = new DelayQueue<>(); queue.add(new DelayedEntry("A", 1000L)); queue.add(new DelayedEntry("B", 1000L)); queue.add(new DelayedEntry("C", 1000L)); queue.add(new DelayedEntry("D", 1000L)); queue.add(new DelayedEntry("E", 1000L)); queue.add(new DelayedEntry("F", 1000L)); queue.add(new DelayedEntry("G", 1000L)); queue.add(new DelayedEntry("H", 1000L)); queue.add(new DelayedEntry("I", 1000L)); // 非阻塞读取,立即返回但不移除头部元素,队列为空返回null assert queue.peek() != null; System.out.println(queue.peek().value); // 非阻塞读取,当队列为空或者头部元素未达到过期时间返回值为null System.out.println(Objects.requireNonNull(queue.poll()).value); // 最大阻塞单位时间,到达阻塞单位时间后,此刻为空或者头部元素未达到过期时间返回值为null,否则立即移出头部元素 System.out.println(Objects.requireNonNull(queue.poll(3, TimeUnit.SECONDS)).value); // 会一直阻塞到队列中有元素,并且队列头部元素达到过期时间,之后从队列中移除并返回 System.out.println(queue.take().value); } // 首先要实现Delayed接口 @Getter static class DelayedEntry implements Delayed { private final String value; private final long time; private DelayedEntry(String value, long time) { this.time = time; this.value = value; } @Override public long getDelay(TimeUnit unit) { long delta = time - System.currentTimeMillis(); return unit.convert(delta, TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { if (this.time < ((DelayedEntry) o).time) { return -1; } else if (this.time > ((DelayedEntry) o).time) { return 1; } return 0; } } } ``` ## ConcurrentLinkedQueue(并发队列)性能 ConcurrentLinkedQueue要优秀很多 ```java import org.openjdk.jmh.annotations.*; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.RunnerException; import org.openjdk.jmh.runner.options.Options; import org.openjdk.jmh.runner.options.OptionsBuilder; import java.util.LinkedList; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; @Measurement(iterations = 10) @Warmup(iterations = 10) @BenchmarkMode(Mode.AverageTime) @State(Scope.Thread) @OutputTimeUnit(TimeUnit.MICROSECONDS) public class AtomicExample09 { private final static String DATA = "TEST"; private final static Object LOCK = new Object(); private SynchronizedLinkedList synchronizedLinkedList; private ConcurrentLinkedQueue concurrentLinkedQueue; // 如果使用不当反而会降低性能 public static void main(String[] args) throws RunnerException { Options options = new OptionsBuilder() .include(AtomicExample09.class.getSimpleName()) .forks(1) .build(); new Runner(options).run(); } @Setup(Level.Iteration) public void setUp() { synchronizedLinkedList = new SynchronizedLinkedList(); concurrentLinkedQueue = new ConcurrentLinkedQueue<>(); } // 测试 SynchronizedLinkedList @Group("sync") @Benchmark @GroupThreads(5) public void synchronizedListAdd() { synchronizedLinkedList.addLast(DATA); } @Group("sync") @Benchmark @GroupThreads(5) public String synchronizedListGet() { return synchronizedLinkedList.removeFirst(); } // 测试 ConcurrentLinkedQueue @Group("concurrent") @Benchmark @GroupThreads(5) public void concurrentLinkedQueueAdd() { concurrentLinkedQueue.offer(DATA); } @Group("concurrent") @Benchmark @GroupThreads(5) public String concurrentLinkedQueueGet() { return concurrentLinkedQueue.poll(); } private static class SynchronizedLinkedList { private final LinkedList list = new LinkedList<>(); void addLast(String element) { synchronized (LOCK) { list.addLast(element); } } String removeFirst() { synchronized (LOCK) { if (list.isEmpty()) { return null; } return list.removeLast(); } } } } // 测试结果 // Benchmark Mode Cnt Score Error Units // AtomicExample09.concurrent avgt 10 0.221 ± 0.649 us/op // AtomicExample09.concurrent:concurrentLinkedQueueAdd avgt 10 0.407 ± 1.296 us/op // AtomicExample09.concurrent:concurrentLinkedQueueGet avgt 10 0.034 ± 0.048 us/op // AtomicExample09.sync avgt 10 0.240 ± 0.039 us/op // AtomicExample09.sync:synchronizedListAdd avgt 10 0.232 ± 0.034 us/op // AtomicExample09.sync:synchronizedListGet avgt 10 0.248 ± 0.044 us/op ``` > 测试需要下载 pom 包 > > ```xml > > > org.projectlombok > lombok > 1.18.36 > > > > org.slf4j > slf4j-api > 2.0.16 > > > org.slf4j > slf4j-simple > 2.0.16 > > > > org.openjdk.jmh > jmh-core > 1.19 > > > org.openjdk.jmh > jmh-generator-annprocess > 1.19 > > > ```