MultiThread/README/Java并发容器使用.md

267 lines
8.1 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Java并发容器使用
## ArrayBlockingQueue使用
```java
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<>(8);
// 阻塞添加
queue.add("1");
queue.add("2");
queue.add("3");
System.out.println(queue);
// 非阻塞添加
queue.put("4");
queue.put("5");
System.out.println(queue);
// 阻塞添加,但队列满不会进入阻塞
queue.offer("6");
queue.offer("7");
System.out.println(queue);
// 指定时间内进入阻塞
queue.offer("8", 3, TimeUnit.SECONDS);
queue.offer("9", 3, TimeUnit.SECONDS);// 8个队列满不会进入阻塞
System.out.println(queue);
// 从头部获取数据并移出,如果队列为空进入阻塞直到有数据添加
String take = queue.take();
System.out.println(take);
// 获取数据并移出队列第一个元素如果队列为空将会阻塞指定的时间直到在此期间有新数据写入或者当前线程被其他线程中断当线程超时会返回null
String poll = queue.poll(3, TimeUnit.SECONDS);
System.out.println(poll);
// 从头部获取数据并移出,如果队列为空不会阻塞
String poll1 = queue.poll();
System.out.println(poll1);
// 从头部获取数据不会移除队列为空不会阻塞直接返回null
String peek = queue.peek();
System.out.println(peek);
```
## PriorityBlockingQueue使用
```java
// 无边界队列,可以定义初始容量并不是最大容量
PriorityBlockingQueue<Integer> queue = new PriorityBlockingQueue<>(2);
System.out.println(queue);
// 队列的添加方法都等于offer方法
queue.offer(1);
queue.offer(10);
queue.offer(3);
System.out.println(queue);
```
## LinkedBlockingQueue使用
```java
// 属于可选边界
LinkedBlockingQueue<Integer> integers = new LinkedBlockingQueue<>(10);
boolean b = integers.remainingCapacity() == 10;
System.out.println(b);
// 等于定义的
LinkedBlockingQueue<Integer> integers1 = new LinkedBlockingQueue<>();
boolean b1 = integers1.remainingCapacity() == Integer.MAX_VALUE;
System.out.println(b1);
```
## InterruptedException使用
```java
public class AtomicExample08 {
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedEntry> queue = new DelayQueue<>();
queue.add(new DelayedEntry("A", 1000L));
queue.add(new DelayedEntry("B", 1000L));
queue.add(new DelayedEntry("C", 1000L));
queue.add(new DelayedEntry("D", 1000L));
queue.add(new DelayedEntry("E", 1000L));
queue.add(new DelayedEntry("F", 1000L));
queue.add(new DelayedEntry("G", 1000L));
queue.add(new DelayedEntry("H", 1000L));
queue.add(new DelayedEntry("I", 1000L));
// 非阻塞读取立即返回但不移除头部元素队列为空返回null
assert queue.peek() != null;
System.out.println(queue.peek().value);
// 非阻塞读取当队列为空或者头部元素未达到过期时间返回值为null
System.out.println(Objects.requireNonNull(queue.poll()).value);
// 最大阻塞单位时间到达阻塞单位时间后此刻为空或者头部元素未达到过期时间返回值为null否则立即移出头部元素
System.out.println(Objects.requireNonNull(queue.poll(3, TimeUnit.SECONDS)).value);
// 会一直阻塞到队列中有元素,并且队列头部元素达到过期时间,之后从队列中移除并返回
System.out.println(queue.take().value);
}
// 首先要实现Delayed接口
@Getter
static class DelayedEntry implements Delayed {
private final String value;
private final long time;
private DelayedEntry(String value, long time) {
this.time = time;
this.value = value;
}
@Override
public long getDelay(TimeUnit unit) {
long delta = time - System.currentTimeMillis();
return unit.convert(delta, TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
if (this.time < ((DelayedEntry) o).time) {
return -1;
} else if (this.time > ((DelayedEntry) o).time) {
return 1;
}
return 0;
}
}
}
```
## ConcurrentLinkedQueue并发队列性能
ConcurrentLinkedQueue要优秀很多
```java
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.LinkedList;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
@Measurement(iterations = 10)
@Warmup(iterations = 10)
@BenchmarkMode(Mode.AverageTime)
@State(Scope.Thread)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
public class AtomicExample09 {
private final static String DATA = "TEST";
private final static Object LOCK = new Object();
private SynchronizedLinkedList synchronizedLinkedList;
private ConcurrentLinkedQueue<String> concurrentLinkedQueue;
// 如果使用不当反而会降低性能
public static void main(String[] args) throws RunnerException {
Options options = new OptionsBuilder()
.include(AtomicExample09.class.getSimpleName())
.forks(1)
.build();
new Runner(options).run();
}
@Setup(Level.Iteration)
public void setUp() {
synchronizedLinkedList = new SynchronizedLinkedList();
concurrentLinkedQueue = new ConcurrentLinkedQueue<>();
}
// 测试 SynchronizedLinkedList
@Group("sync")
@Benchmark
@GroupThreads(5)
public void synchronizedListAdd() {
synchronizedLinkedList.addLast(DATA);
}
@Group("sync")
@Benchmark
@GroupThreads(5)
public String synchronizedListGet() {
return synchronizedLinkedList.removeFirst();
}
// 测试 ConcurrentLinkedQueue
@Group("concurrent")
@Benchmark
@GroupThreads(5)
public void concurrentLinkedQueueAdd() {
concurrentLinkedQueue.offer(DATA);
}
@Group("concurrent")
@Benchmark
@GroupThreads(5)
public String concurrentLinkedQueueGet() {
return concurrentLinkedQueue.poll();
}
private static class SynchronizedLinkedList {
private final LinkedList<String> list = new LinkedList<>();
void addLast(String element) {
synchronized (LOCK) {
list.addLast(element);
}
}
String removeFirst() {
synchronized (LOCK) {
if (list.isEmpty()) {
return null;
}
return list.removeLast();
}
}
}
}
// 测试结果
// Benchmark Mode Cnt Score Error Units
// AtomicExample09.concurrent avgt 10 0.221 ± 0.649 us/op
// AtomicExample09.concurrent:concurrentLinkedQueueAdd avgt 10 0.407 ± 1.296 us/op
// AtomicExample09.concurrent:concurrentLinkedQueueGet avgt 10 0.034 ± 0.048 us/op
// AtomicExample09.sync avgt 10 0.240 ± 0.039 us/op
// AtomicExample09.sync:synchronizedListAdd avgt 10 0.232 ± 0.034 us/op
// AtomicExample09.sync:synchronizedListGet avgt 10 0.248 ± 0.044 us/op
```
> 测试需要下载 pom 包
>
> ```xml
> <dependencies>
> <dependency>
> <groupId>org.projectlombok</groupId>
> <artifactId>lombok</artifactId>
> <version>1.18.36</version>
> </dependency>
> <!-- slf4j -->
> <dependency>
> <groupId>org.slf4j</groupId>
> <artifactId>slf4j-api</artifactId>
> <version>2.0.16</version>
> </dependency>
> <dependency>
> <groupId>org.slf4j</groupId>
> <artifactId>slf4j-simple</artifactId>
> <version>2.0.16</version>
> </dependency>
> <!-- jmh -->
> <dependency>
> <groupId>org.openjdk.jmh</groupId>
> <artifactId>jmh-core</artifactId>
> <version>1.19</version>
> </dependency>
> <dependency>
> <groupId>org.openjdk.jmh</groupId>
> <artifactId>jmh-generator-annprocess</artifactId>
> <version>1.19</version>
> </dependency>
> </dependencies>
> ```