2025-01-20 21:28:35 +08:00
|
|
|
|
# Java并发容器使用
|
|
|
|
|
|
|
|
|
|
## ArrayBlockingQueue使用
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(8);
|
|
|
|
|
|
|
|
|
|
// 阻塞添加
|
|
|
|
|
queue.add("1");
|
|
|
|
|
queue.add("2");
|
|
|
|
|
queue.add("3");
|
|
|
|
|
System.out.println(queue);
|
|
|
|
|
|
|
|
|
|
// 非阻塞添加
|
|
|
|
|
queue.put("4");
|
|
|
|
|
queue.put("5");
|
|
|
|
|
System.out.println(queue);
|
|
|
|
|
|
|
|
|
|
// 阻塞添加,但队列满不会进入阻塞
|
|
|
|
|
queue.offer("6");
|
|
|
|
|
queue.offer("7");
|
|
|
|
|
System.out.println(queue);
|
|
|
|
|
|
|
|
|
|
// 指定时间内进入阻塞
|
|
|
|
|
queue.offer("8", 3, TimeUnit.SECONDS);
|
|
|
|
|
queue.offer("9", 3, TimeUnit.SECONDS);// 8个队列满,不会进入阻塞
|
|
|
|
|
System.out.println(queue);
|
|
|
|
|
|
|
|
|
|
// 从头部获取数据并移出,如果队列为空进入阻塞直到有数据添加
|
|
|
|
|
String take = queue.take();
|
|
|
|
|
System.out.println(take);
|
|
|
|
|
|
|
|
|
|
// 获取数据并移出队列第一个元素,如果队列为空将会阻塞指定的时间,直到在此期间有新数据写入,或者当前线程被其他线程中断,当线程超时会返回null
|
|
|
|
|
String poll = queue.poll(3, TimeUnit.SECONDS);
|
|
|
|
|
System.out.println(poll);
|
|
|
|
|
|
|
|
|
|
// 从头部获取数据并移出,如果队列为空不会阻塞
|
|
|
|
|
String poll1 = queue.poll();
|
|
|
|
|
System.out.println(poll1);
|
|
|
|
|
|
|
|
|
|
// 从头部获取数据不会移除,队列为空不会阻塞直接返回null
|
|
|
|
|
String peek = queue.peek();
|
|
|
|
|
System.out.println(peek);
|
2025-01-20 21:33:37 +08:00
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
## PriorityBlockingQueue使用
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
// 无边界队列,可以定义初始容量并不是最大容量
|
|
|
|
|
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>(2);
|
|
|
|
|
System.out.println(queue);
|
|
|
|
|
|
|
|
|
|
// 队列的添加方法都等于offer方法
|
|
|
|
|
queue.offer(1);
|
|
|
|
|
queue.offer(10);
|
|
|
|
|
queue.offer(3);
|
|
|
|
|
System.out.println(queue);
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
## LinkedBlockingQueue使用
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
// 属于可选边界
|
|
|
|
|
LinkedBlockingQueue<Integer> integers = new LinkedBlockingQueue<>(10);
|
|
|
|
|
boolean b = integers.remainingCapacity() == 10;
|
|
|
|
|
System.out.println(b);
|
|
|
|
|
|
|
|
|
|
// 等于定义的
|
|
|
|
|
LinkedBlockingQueue<Integer> integers1 = new LinkedBlockingQueue<>();
|
|
|
|
|
boolean b1 = integers1.remainingCapacity() == Integer.MAX_VALUE;
|
|
|
|
|
System.out.println(b1);
|
2025-01-21 13:44:35 +08:00
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
## InterruptedException使用
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
public class AtomicExample08 {
|
|
|
|
|
public static void main(String[] args) throws InterruptedException {
|
|
|
|
|
DelayQueue<DelayedEntry> queue = new DelayQueue<>();
|
|
|
|
|
queue.add(new DelayedEntry("A", 1000L));
|
|
|
|
|
queue.add(new DelayedEntry("B", 1000L));
|
|
|
|
|
queue.add(new DelayedEntry("C", 1000L));
|
|
|
|
|
queue.add(new DelayedEntry("D", 1000L));
|
|
|
|
|
queue.add(new DelayedEntry("E", 1000L));
|
|
|
|
|
queue.add(new DelayedEntry("F", 1000L));
|
|
|
|
|
queue.add(new DelayedEntry("G", 1000L));
|
|
|
|
|
queue.add(new DelayedEntry("H", 1000L));
|
|
|
|
|
queue.add(new DelayedEntry("I", 1000L));
|
|
|
|
|
|
|
|
|
|
// 非阻塞读取,立即返回但不移除头部元素,队列为空返回null
|
|
|
|
|
assert queue.peek() != null;
|
|
|
|
|
System.out.println(queue.peek().value);
|
|
|
|
|
|
|
|
|
|
// 非阻塞读取,当队列为空或者头部元素未达到过期时间返回值为null
|
|
|
|
|
System.out.println(Objects.requireNonNull(queue.poll()).value);
|
|
|
|
|
|
|
|
|
|
// 最大阻塞单位时间,到达阻塞单位时间后,此刻为空或者头部元素未达到过期时间返回值为null,否则立即移出头部元素
|
|
|
|
|
System.out.println(Objects.requireNonNull(queue.poll(3, TimeUnit.SECONDS)).value);
|
|
|
|
|
|
|
|
|
|
// 会一直阻塞到队列中有元素,并且队列头部元素达到过期时间,之后从队列中移除并返回
|
|
|
|
|
System.out.println(queue.take().value);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 首先要实现Delayed接口
|
|
|
|
|
@Getter
|
|
|
|
|
static class DelayedEntry implements Delayed {
|
|
|
|
|
private final String value;
|
|
|
|
|
private final long time;
|
|
|
|
|
|
|
|
|
|
private DelayedEntry(String value, long time) {
|
|
|
|
|
this.time = time;
|
|
|
|
|
this.value = value;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public long getDelay(TimeUnit unit) {
|
|
|
|
|
long delta = time - System.currentTimeMillis();
|
|
|
|
|
return unit.convert(delta, TimeUnit.MILLISECONDS);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@Override
|
|
|
|
|
public int compareTo(Delayed o) {
|
|
|
|
|
if (this.time < ((DelayedEntry) o).time) {
|
|
|
|
|
return -1;
|
|
|
|
|
} else if (this.time > ((DelayedEntry) o).time) {
|
|
|
|
|
return 1;
|
|
|
|
|
}
|
|
|
|
|
return 0;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
## ConcurrentLinkedQueue(并发队列)性能
|
|
|
|
|
|
|
|
|
|
ConcurrentLinkedQueue要优秀很多
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
import org.openjdk.jmh.annotations.*;
|
|
|
|
|
import org.openjdk.jmh.runner.Runner;
|
|
|
|
|
import org.openjdk.jmh.runner.RunnerException;
|
|
|
|
|
import org.openjdk.jmh.runner.options.Options;
|
|
|
|
|
import org.openjdk.jmh.runner.options.OptionsBuilder;
|
|
|
|
|
|
|
|
|
|
import java.util.LinkedList;
|
|
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
|
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
|
|
|
|
|
|
|
@Measurement(iterations = 10)
|
|
|
|
|
@Warmup(iterations = 10)
|
|
|
|
|
@BenchmarkMode(Mode.AverageTime)
|
|
|
|
|
@State(Scope.Thread)
|
|
|
|
|
@OutputTimeUnit(TimeUnit.MICROSECONDS)
|
|
|
|
|
public class AtomicExample09 {
|
|
|
|
|
private final static String DATA = "TEST";
|
|
|
|
|
private final static Object LOCK = new Object();
|
|
|
|
|
private SynchronizedLinkedList synchronizedLinkedList;
|
|
|
|
|
private ConcurrentLinkedQueue<String> concurrentLinkedQueue;
|
|
|
|
|
|
|
|
|
|
// 如果使用不当反而会降低性能
|
|
|
|
|
public static void main(String[] args) throws RunnerException {
|
|
|
|
|
Options options = new OptionsBuilder()
|
|
|
|
|
.include(AtomicExample09.class.getSimpleName())
|
|
|
|
|
.forks(1)
|
|
|
|
|
.build();
|
|
|
|
|
new Runner(options).run();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Setup(Level.Iteration)
|
|
|
|
|
public void setUp() {
|
|
|
|
|
synchronizedLinkedList = new SynchronizedLinkedList();
|
|
|
|
|
concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 测试 SynchronizedLinkedList
|
|
|
|
|
@Group("sync")
|
|
|
|
|
@Benchmark
|
|
|
|
|
@GroupThreads(5)
|
|
|
|
|
public void synchronizedListAdd() {
|
|
|
|
|
synchronizedLinkedList.addLast(DATA);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Group("sync")
|
|
|
|
|
@Benchmark
|
|
|
|
|
@GroupThreads(5)
|
|
|
|
|
public String synchronizedListGet() {
|
|
|
|
|
return synchronizedLinkedList.removeFirst();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// 测试 ConcurrentLinkedQueue
|
|
|
|
|
@Group("concurrent")
|
|
|
|
|
@Benchmark
|
|
|
|
|
@GroupThreads(5)
|
|
|
|
|
public void concurrentLinkedQueueAdd() {
|
|
|
|
|
concurrentLinkedQueue.offer(DATA);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@Group("concurrent")
|
|
|
|
|
@Benchmark
|
|
|
|
|
@GroupThreads(5)
|
|
|
|
|
public String concurrentLinkedQueueGet() {
|
|
|
|
|
return concurrentLinkedQueue.poll();
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private static class SynchronizedLinkedList {
|
|
|
|
|
private final LinkedList<String> list = new LinkedList<>();
|
|
|
|
|
|
|
|
|
|
void addLast(String element) {
|
|
|
|
|
synchronized (LOCK) {
|
|
|
|
|
list.addLast(element);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
String removeFirst() {
|
|
|
|
|
synchronized (LOCK) {
|
|
|
|
|
if (list.isEmpty()) {
|
|
|
|
|
return null;
|
|
|
|
|
}
|
|
|
|
|
return list.removeLast();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// 测试结果
|
|
|
|
|
// Benchmark Mode Cnt Score Error Units
|
|
|
|
|
// AtomicExample09.concurrent avgt 10 0.221 ± 0.649 us/op
|
|
|
|
|
// AtomicExample09.concurrent:concurrentLinkedQueueAdd avgt 10 0.407 ± 1.296 us/op
|
|
|
|
|
// AtomicExample09.concurrent:concurrentLinkedQueueGet avgt 10 0.034 ± 0.048 us/op
|
|
|
|
|
// AtomicExample09.sync avgt 10 0.240 ± 0.039 us/op
|
|
|
|
|
// AtomicExample09.sync:synchronizedListAdd avgt 10 0.232 ± 0.034 us/op
|
|
|
|
|
// AtomicExample09.sync:synchronizedListGet avgt 10 0.248 ± 0.044 us/op
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
> 测试需要下载 pom 包
|
|
|
|
|
>
|
|
|
|
|
> ```xml
|
|
|
|
|
> <dependencies>
|
|
|
|
|
> <dependency>
|
|
|
|
|
> <groupId>org.projectlombok</groupId>
|
|
|
|
|
> <artifactId>lombok</artifactId>
|
|
|
|
|
> <version>1.18.36</version>
|
|
|
|
|
> </dependency>
|
|
|
|
|
> <!-- slf4j -->
|
|
|
|
|
> <dependency>
|
|
|
|
|
> <groupId>org.slf4j</groupId>
|
|
|
|
|
> <artifactId>slf4j-api</artifactId>
|
|
|
|
|
> <version>2.0.16</version>
|
|
|
|
|
> </dependency>
|
|
|
|
|
> <dependency>
|
|
|
|
|
> <groupId>org.slf4j</groupId>
|
|
|
|
|
> <artifactId>slf4j-simple</artifactId>
|
|
|
|
|
> <version>2.0.16</version>
|
|
|
|
|
> </dependency>
|
|
|
|
|
> <!-- jmh -->
|
|
|
|
|
> <dependency>
|
|
|
|
|
> <groupId>org.openjdk.jmh</groupId>
|
|
|
|
|
> <artifactId>jmh-core</artifactId>
|
|
|
|
|
> <version>1.19</version>
|
|
|
|
|
> </dependency>
|
|
|
|
|
> <dependency>
|
|
|
|
|
> <groupId>org.openjdk.jmh</groupId>
|
|
|
|
|
> <artifactId>jmh-generator-annprocess</artifactId>
|
|
|
|
|
> <version>1.19</version>
|
|
|
|
|
> </dependency>
|
|
|
|
|
> </dependencies>
|
|
|
|
|
> ```
|