Java-ConcurrentProgramming-CheatSheet
Java Concurrent Programming CheatSheet
当出现线程活跃性问题时,我们可以借助一些工具进行诊断
-
Jstack。通过
jstack 命令,获取线程执行信息,找出其中的线程阻塞和死锁问题。 -
Heap dump。通过
jmap 命令dump 出当前的jvm 堆栈信息,然后使用内存分析工具识别线程阻塞和死锁。 -
Arthas。作为阿里开源的
Java 诊断利器,arthas 也提供了线程分析诊断功能, 可以通过arthas 的thread 命令,查找出当前阻塞的线程。
Concurrent Primitive | 并发单元
常见的
-
Runnable 应该是我们最熟悉的接口,它只有一个run() 函数,用于将耗时操作写在其中,该函数没有返回值。然后使用某个线程去执行该runnable 即可实现多线程,Thread 类在调用start() 函数后就是执行的是Runnable 的run() 函数。 -
Callable 与Runnable 的功能大致相似,Callable 中有一个call() 函数,但是call() 函数有返回值,而Runnable 的run() 函数不能将结果返回给客户程序。 -
Executor 就是Runnable 和Callable 的调度容器,Future 就是对于具体的Runnable 或者Callable 任务的执行结果进行取消、查询是否完成、获取结果、设置结果操作。get 方法会阻塞,直到任务返回结果。 -
FutureTask 则是一个RunnableFuture ,而 RunnableFuture 实现了Runnbale 又实现了Futrue 这两个接口。
Threads & Runnables
Thread Pool | 线程池
Executors
在并发程序中,线程的创建和管理是一个重要的命题。实际的生产代码中,不能为每一个任务就创建一个线程,也就是不能出现
Callable<Integer> task = () -> {
try {
TimeUnit.SECONDS.sleep(1);
return 123;
}
catch (InterruptedException e) {
throw new IllegalStateException("task interrupted", e);
}
};
ExecutorService executor = Executors.newFixedThreadPool(1);
Future<Integer> future = executor.submit(task);
System.out.println("future done? " + future.isDone());
Integer result = future.get();
System.out.println("future done? " + future.isDone());
System.out.print("result: " + result);
在使用
Fork/Join
线程协作
Atomic Variables | 原子性与原子变量
从
@ThreadSafe
public class SafeSequence {
private AtomicInteger value;
/**
* Returns a unique value.
*/
public int getNext() {
return value.getAndIncrement();
}
}
相对于内置的监视器锁,
volatile | 可见性保障
CountDownLatch
public class TestHarness {
public long timeTasks(int nThreads, final Runnable task)
throws InterruptedException {
final CountDownLatch startGate = new CountDownLatch(1);
final CountDownLatch endGate = new CountDownLatch(nThreads);
for (int i = 0; i < nThreads; i++) {
Thread t = new Thread() {
public void run() {
try {
startGate.await();
try {
task.run();
} finally {
endGate.countDown();
}
} catch (InterruptedException ignored) {
}
}
};
t.start();
}
long start = System.nanoTime();
startGate.countDown();
endGate.await();
long end = System.nanoTime();
return end - start;
}
锁与互斥
Semaphore
信号量
public class BoundedHashSet<T> {
private final Set<T> set;
private final Semaphore sem;
public BoundedHashSet(int bound) {
this.set = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(bound);
}
public boolean add(T o) throws InterruptedException {
sem.acquire();
boolean wasAdded = false;
try {
wasAdded = set.add(o);
return wasAdded;
} finally {
if (!wasAdded) sem.release();
}
}
public boolean remove(Object o) {
boolean wasRemoved = set.remove(o);
if (wasRemoved) sem.release();
return wasRemoved;
}
}
Lock
public class TimedLocking {
private Lock lock = new ReentrantLock();
public boolean trySendOnSharedLine(
String message,
long timeout,
TimeUnit unit
)
throws InterruptedException {
long nanosToLock = unit.toNanos(timeout) - estimatedNanosToSend(message);
if (!lock.tryLock(nanosToLock, NANOSECONDS)) return false;
try {
return sendOnSharedLine(message);
} finally {
lock.unlock();
}
}
}
Async Programming | 异步编程
Callable & Future
CompletableFuture
RxJava
Built-in ThreadSafe DataStructure | 内置的线程安全模型
ConcurrentHashMap
为了解决线程活跃性问题,提高并发执行效率,一种可行的方案是降低锁的粒度。
