ListenableFuture
ListenableFuture
并发是一个困难的问题,但是通过使用功能强大且简单的抽象可以大大简化并发。为了简化问题,
传统的
创建
对应于
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Explosion> explosion = service.submit(
new Callable<Explosion>() {
public Explosion call() {
return pushBigRedButton();
}
});
Futures.addCallback(
explosion,
new FutureCallback<Explosion>() {
// we want this handler to run immediately after we push the big red button!
public void onSuccess(Explosion explosion) {
walkAwayFrom(explosion);
}
public void onFailure(Throwable thrown) {
battleArchNemesis(); // escaped the explosion!
}
},
service);
另外,如果你要从基于
如果必须将另一个
添加监听
- onSuccess(V),如果
Future 成功,则根据其结果执行的操作 - onFailure(Throwable),如果
Future 失败,则根据失败执行的操作
@RestController
@RequestMapping(value = "testThread")
public class TestThread {
/**线程池*/
static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4, 10, 60,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(200),
new ThreadPoolExecutor.CallerRunsPolicy()
);
/**
* 数据处理
* @return
* @throws Exception
*/
@RequestMapping(value = "parse", method = RequestMethod.GET)
public String parse() throws Exception{
List<String> result = new ArrayList<>();
List<String> list = new ArrayList<>();
//模拟原始数据
for(int i = 0; i < 1211;i ++){
list.add(i+"-");
System.out.println("添加原始数据:"+i);
}
int size = 50;//切分粒度,每size条数据,切分一块,交由一条线程处理
int countNum = 0;//当前处理到的位置
int count = list.size()/size;//切分块数
int threadNum = 0;//使用线程数
if(count*size != list.size()){
count ++;
}
final CountDownLatch countDownLatch = new CountDownLatch(count);
//使用 Guava 的 ListeningExecutorService 装饰线程池
ListeningExecutorService executorService = MoreExecutors.listeningDecorator(threadPoolExecutor);
while (countNum < count*size){
//切割不同的数据块,分段处理
threadNum ++;
countNum += size;
MyCallable myCallable = new MyCallable();
myCallable.setList(ImmutableList.copyOf(
list.subList(countNum-size,list.size() > countNum ? countNum : list.size())));
ListenableFuture listenableFuture = executorService.submit(myCallable);
//回调函数
Futures.addCallback(listenableFuture, new FutureCallback<List<String>>() {
//任务处理成功时执行
@Override
public void onSuccess(List<String> list) {
countDownLatch.countDown();
System.out.println("第h次处理完成");
result.addAll(list);
}
//任务处理失败时执行
@Override
public void onFailure(Throwable throwable) {
countDownLatch.countDown();
System.out.println("处理失败:"+throwable);
}
});
}
//设置时间,超时了直接向下执行,不再阻塞
countDownLatch.await(3,TimeUnit.SECONDS);
result.stream().forEach(s -> System.out.println(s));
System.out.println("------------结果处理完毕,返回完毕,使用线程数量:"+threadNum);
return "处理完了";
}
}
public class MyCallable implements Callable{
private List<String> list ;
@Override
public Object call() throws Exception {
List<String> listReturn = new ArrayList<>();
//模拟对数据处理,然后返回
for(int i = 0;i < list.size();i++){
listReturn.add(list.get(i)+":处理时间:"+System.currentTimeMillis()+"---:处理线程:"+Thread.currentThread());
}
return listReturn;
}
public void setList(List<String> list) {
this.list = list;
}
}
异步操作链
使用
ListenableFuture<RowKey> rowKeyFuture = indexService.lookUp(query);
AsyncFunction<RowKey, QueryResult> queryFunction =
new AsyncFunction<RowKey, QueryResult>() {
public ListenableFuture<QueryResult> apply(RowKey rowKey) {
return dataService.read(rowKey);
}
};
ListenableFuture<QueryResult> queryFuture =
Futures.transformAsync(rowKeyFuture, queryFunction, queryExecutor);
方法 | 描述 |
---|---|
transform | 加一个回调函数 |
allAsList | 返回一个 |
successAsList | 返回一个 |
public class ListeningFutureDemo {
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(1);
final ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<String> explosion = service.submit(new Callable<String>() {
public String call() throws Exception {
System.out.println("任务线程正在执行...");
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
return "任务线程的结果";
}
});
ListenableFuture<String> first = Futures.transform(explosion, new AsyncFunction<String, String>() {
public ListenableFuture<String> apply(final String input) throws Exception {
ListenableFuture<String> temp = service.submit(new Callable<String>() {
public String call() throws Exception {
System.out.println("第1个回调线程正在执行...");
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
return input + " & 第1个回调线程的结果 ";
}
});
return temp;
}
}, service);
ListenableFuture<String> second = Futures.transform(first, new AsyncFunction<String, String>() {
public ListenableFuture<String> apply(final String input) throws Exception {
ListenableFuture<String> temp = service.submit(new Callable<String>() {
public String call() throws Exception {
System.out.println("第2个回调线程正在执行...");
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
return input + " & 第2个回调线程的结果 ";
}
});
return temp;
}
}, service);
ListenableFuture<String> third = Futures.transform(second, new AsyncFunction<String, String>() {
public ListenableFuture<String> apply(final String input) throws Exception {
ListenableFuture<String> temp = service.submit(new Callable<String>() {
public String call() throws Exception {
System.out.println("第3个回调线程正在执行...");
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
return input + " & 第3个回调线程的结果 ";
}
});
return temp;
}
}, service);
ListenableFuture<String> forth = Futures.transform(third, new AsyncFunction<String, String>() {
public ListenableFuture<String> apply(final String input) throws Exception {
ListenableFuture<String> temp = service.submit(new Callable<String>() {
public String call() throws Exception {
System.out.println("第4个回调线程正在执行...");
try {
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
return input + " & 第4个回调线程的结果 ";
}
});
return temp;
}
}, service);
Futures.addCallback(forth, new FutureCallback<String>() {
public void onSuccess(String result) {
latch.countDown();
System.out.println("结果: " + result);
}
public void onFailure(Throwable t) {
System.out.println(t.getMessage());
}
});
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
service.shutdown();
}
}
与CompletableFuture 之间互相转化
public class ListenableFutureAdapter<T> {
private final ListenableFuture<T> listenableFuture;
private final CompletableFuture<T> completableFuture;
public ListenableFutureAdapter(ListenableFuture<T> listenableFuture) {
this.listenableFuture = listenableFuture;
this.completableFuture = new CompletableFuture<T>() {
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
boolean cancelled = listenableFuture.cancel(mayInterruptIfRunning);
super.cancel(cancelled);
return cancelled;
}
};
Futures.addCallback(this.listenableFuture, new FutureCallback<T>() {
@Override
public void onSuccess(T result) {
completableFuture.complete(result);
}
@Override
public void onFailure(Throwable ex) {
completableFuture.completeExceptionally(ex);
}
});
}
public CompletableFuture<T> getCompletableFuture() {
return completableFuture;
}
public static final <T> CompletableFuture<T> toCompletable(ListenableFuture<T> listenableFuture) {
ListenableFutureAdapter<T> listenableFutureAdapter = new ListenableFutureAdapter<>(listenableFuture);
return listenableFutureAdapter.getCompletableFuture();
}
}
实践案例
异步阻塞
主线程分配一个任务给子线程后,然后继续运行,子线程运算出结果后把结果返回给主线程,在这段代码里主线程依旧是阻塞的。
@Slf4j
public class Test2 {
public static void main(String[] args) throws ExecutionException, InterruptedException {
final Date date = new Date(1976);
// 这里可以用lambda表达式,但是贴代码的时候会很不直观,不知道是Runnable还是Callable
ListenableFutureTask<Object> futureTask = ListenableFutureTask.create(new Runnable() {
@Override
public void run() {
log.info(Thread.currentThread().getName() + " Runnable任务启动....");
date.setTime(new Date().getTime());
}
}, date);
new Thread(futureTask).start();
// 睡眠一会 等待子线程执行完
Thread.sleep(1000L);
log.info(Thread.currentThread().getName() + "当前时间" + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(date));
ListenableFutureTask<Object> futureTask2 = ListenableFutureTask.create(new Callable<Object>() {
@Override
public Object call() throws Exception {
log.info(Thread.currentThread().getName() + " Callable任务启动....");
Thread.sleep(5000L);
return "当前时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}
});
new Thread(futureTask2).start();
log.info(Thread.currentThread().getName() + futureTask2.get());
log.info(Thread.currentThread().getName() + "主线程继续执行");
}
}
16:25:46.665 [Thread-1] INFO com.pipiha.Collections.Concurrency.ListenableFutuTest.Test2 - Thread-1 Runnable任务启动....
16:25:47.663 [main] INFO com.pipiha.Collections.Concurrency.ListenableFutuTest.Test2 - main当前时间2020-05-30 16:25:46
16:25:47.699 [Thread-2] INFO com.pipiha.Collections.Concurrency.ListenableFutuTest.Test2 - Thread-2 Callable任务启动....
16:25:52.792 [main] INFO com.pipiha.Collections.Concurrency.ListenableFutuTest.Test2 - main当前时间:2020-05-30 16:25:52
16:25:52.792 [main] INFO com.pipiha.Collections.Concurrency.ListenableFutuTest.Test2 - main主线程继续执行
注意观察运行结果的线程名和日志输出时间。需要注意的是由于没有执行成功的异步回调,实际上我们的主线程依旧是阻塞的,必须等子线程运行完,才能拿到结果。
异步非阻塞
@Slf4j
public class Test3 {
public static void main(String[] args) {
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFutureTask<Object> futureTask = ListenableFutureTask.create(new Callable<Object>() {
@Override
public Object call() throws Exception {
log.info(Thread.currentThread().getName() + " Callable任务启动....");
Thread.sleep(5000L);
return "当前时间:" + LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
}
});
Futures.addCallback(futureTask, new FutureCallback<Object>() {
public void onSuccess(Object calCultorResult) {
log.info(Thread.currentThread().getName() + "子线程执行成功,计算结果{}", calCultorResult);
}
public void onFailure(Throwable thrown) {
}
}, service);
new Thread(futureTask).start();
log.info(Thread.currentThread().getName() + "主线程继续执行");
}
}
分析打印的日志,可以看到主线程在提交任务过后就紧着执行,没有被阻塞而停下来。还可以发现,计算的执行过程是由
多任务协作
如果我们有有一个大任务比较耗时,拆分成子任务
@Slf4j
public class FutureTest {
public static void main(String[] args) {
ListeningExecutorService service = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(10));
ListenableFuture<Integer> task1Future = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.info("任务1开始执行...");
int washTime = new Random().nextInt(10) + 1;
Thread.sleep(washTime);
if (washTime > 7) {
throw new RuntimeException("任务1开始因执行时间过长而失败");
}
return washTime;
}
});
AsyncFunction<Integer, Boolean> asyncFunction = new AsyncFunction<Integer, Boolean>() {
public ListenableFuture<Boolean> apply(Integer rowKey) {
log.info("任务1执行成功,计算结果{}", rowKey);
ListenableFuture<Boolean> hot = service.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
log.info("任务2开始执行,返回固定结果true");
return true;
}
});
return hot;
}
};
ListenableFuture<Boolean> queryFuture = Futures.transformAsync(task1Future, asyncFunction, service);
Futures.addCallback(queryFuture, new FutureCallback<Boolean>() {
public void onSuccess(Boolean explosion) {
log.info("任务1,任务2均执行成功");
}
public void onFailure(Throwable thrown) {
log.error("", thrown);
}
}, service);
}
}