6.1 在线程中执行任务
在多线程开发中,要执行多线程任务,第一步是“找出任务边界”。
一个好的并发服务器是:大吞吐量,低响应时间。当系统过载的时候,只是性能降低而不会崩溃。
网络服务器为多线程提供了自然的任务边界:每一个客户端的请求。例如Web服务器、Mail服务器、文件服务器、EJB容器、数据库服务器都接受这种请求。各个客户端之间的请求应该是互补影响的。
单线程服务器
class SingleThreadWebServer { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { Socket connection = socket.accept(); handleRequest(connection); } } }
处理一个客户的连接时,其他连接只能等待。在成熟的商业化产品中,显然性能太差。实际上,在处理过程中,经常会发生I/O阻塞(网络read、write等),由于是单线程,只要有阻塞,服务器就没有响应了。
多线程(每次创建新线程)
因此多线程是一个很自然的想法,一种简单的方法是,每一个客户端请求时,新建一个线程进行处理。原线程继续等待。
class ThreadPerTaskWebServer { public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { final Socket connection = socket.accept(); Runnable task = new Runnable() { public void run() { handleRequest(connection); } }; new Thread(task).start(); } } }
需要注意的是:task中的处理代码必须保证线程安全。
当用户请求不密集的时候,这种模型能很好的工作。但当大量用户并发连接(线程增多)的时候,性能会急剧下降甚至导致程序的崩溃:
1、线程的创建、销毁需要开销。特别是线程数量很多的时候,累积起来的开销很大。
2、资源消耗。过多线程会消耗大量内存,增加GC的负担。
3、健壮性。每个操作系统、JVM都有线程创建的极限,当超过这个极限的时候,一般就会OutofMemoryException。
总结:多线程可以通过并发计算的形式提升性能,但毫无限制的创建线程是不行的,最终会导致系统的崩溃。
6.2 Executor框架
单线程模型的并发性太差、响应时间太长。每次请求创建一个新线程的开销太大,很难进行资源管理。
为此JDK提供了Executor框架,以Task(Runnable)为基本单位,将任务的提交与任务的执行解耦。
Executor使用的是“生产者-消费者”模型。Task的提交是生产者,执行时消费者。
这种框架让并发控制与业务逻辑解耦,使用起来非常简单。
一个使用了固定100线程Executor的Web服务器。
class TaskExecutionWebServer { private static final int NTHREADS = 100; private static final Executor exec = Executors.newFixedThreadPool(NTHREADS); public static void main(String[] args) throws IOException { ServerSocket socket = new ServerSocket(80); while (true) { final Socket connection = socket.accept(); Runnable task = new Runnable() { public void run() { handleRequest(connection); } }; exec.execute(task); } } }
6.2.2 Executor的策略
Executor的策略包括:
1、在哪个线程执行
2、什么顺序?(FIFO、LIFO、优先队列)
3、多少个任务可并发执行
4、多少个任务可以被放入队列等待执行
5、如果因为系统过载导致Task被拒绝,应该选择哪个Task,怎么通知用户?
6、执行完一个Task后应该做什么。
围绕这些问题,Executors提供了一系列子类,方便用户选择。
6.2.3 线程池
线程池,顾名思义拥有N个工作线程,以及一个工作队列。每一个线程的工作很简单:从队列中取出一个任务、执行、然后再等待下一个任务。
与每次都新建线程相比,线程池拥有诸多优势,最主要的是:可以避免反复创建、销毁线程的开销。通过合理的调整线程池大小,可以在性能和内存消耗之间达到一个平衡。
Execotors提供了很多线程池:
newFixedThreadPool,初始创建N个线程,如果中途有线程挂掉了,创建新的以维持N个的数量。
newCachedThreadPool,更自由的控制线程数量,当任务多的时候增多线程,少的时候减少线程。没有上下限。
newSingleThreadExecutor,纯单线程,保证任务是顺序执行的。
尽管FixedThreadPool限定了线程上限,但是若Task过多的话,还是有可能OutofMemoryException的。
6.2.4 Executor的生命周期
如果Executor没有被正确shutdown的话,可能会致使JVM无法退出。先来看一下Executor的生命周期吧。
为了加强生命周期管理(如程序关闭、异常结束),JDK拓展出了ExecutorService接口:
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; // ... additional convenience methods for task submission }
ExecutorService有三种状态:running、shutdown、terminated。初始状态下,是Running的
shutdown() 优雅的关闭,不接受新Task,已经提交的Task将被执行完毕后,再关闭。
shutdownNow() 立即关闭,不再执行任何Task,并将Queue的没有执行的返回。
awaitTermination() 等待Executor到达terminated状态
isTerminated() 检查是否到达terminated状态
一个结合了shutdown的服务器实例如下:
class LifecycleWebServer { private final ExecutorService exec = ...; public void start() throws IOException { ServerSocket socket = new ServerSocket(80); while (!exec.isShutdown()) { try { final Socket conn = socket.accept(); exec.execute(new Runnable() { public void run() { handleRequest(conn); } }); } catch (RejectedExecutionException e) { if (!exec.isShutdown()) log("task submission rejected", e); } } } public void stop() { exec.shutdown(); } void handleRequest(Socket connection) { Request req = readRequest(connection); if (isShutdownRequest(req)) stop(); else dispatchRequest(req); } }
6.2.5 延时和周期任务
在JDK5之前,通过Timer执行周期任务,或者延时任务。
JDK5之后ScheduledThreadPoolExecutor可担任Timer提供了更灵活的方法。
Timer的问题是:只能开一一个线程,如果频率是10ms一次,但是某Task是40ms(可以选择fixed rate或者fixeddelay,如果是前者的话)就可能导致任务丢失,后者的话就是任务少执行了。
ScheduledThreadPoolExecutor可以创建多个线程来解决这个问题。
另外一个问题是,如果Task抛出异常,Timer的线程就挂掉了,不会重启,相当于Timer被取消了。ScheduledThradPool可以解决这个问题。
因此JDK5后,应该首选ScheduledThreadPoolExecutor。
类似的还有DelayQueue,也可以使用。
6.3 找出可利用的并行
为了很好地利用Executor框架,需要把Task封装为Runnable。
对于C/S程序而言,任务边界很简单,就是一次Request/Response。更多的时候没有明确的定义。
以一个HTML网页渲染为例。
最基本的策略:单线程、先渲染文本,再下载、渲染图片:
public class SingleThreadRenderer { void renderPage(CharSequence source) { renderText(source); List<ImageData> imageData = new ArrayList<ImageData>(); for (ImageInfo imageInfo : scanForImageInfo(source)) imageData.add(imageInfo.downloadImage()); for (ImageData data : imageData) renderImage(data); } }
由于图片的下载速度受到网络影响,经常会I/O阻塞。因此CPU计算资源将被大量的闲置。
为了支持异步任务的返回,Executor还支持Future(一种Callable)。可以通过get异步获取结果。除了支持submit一个Future(Runnable之外),默认提交Runnable也会返回一个Callale(get也会阻塞,只不过最终返回null)。
这种submit导致的发布时安全的。
使用Callable对程序进行改进:
public class FutureRenderer { private final ExecutorService executor = ...; void renderPage(CharSequence source) { final List<ImageInfo> imageInfos = scanForImageInfo(source); Callable<List<ImageData>> task = new Callable<List<ImageData>>() { public List<ImageData> call() { List<ImageData> result = new ArrayList<ImageData>(); for (ImageInfo imageInfo : imageInfos) result.add(imageInfo.downloadImage()); return result; } }; Future<List<ImageData>> future = executor.submit(task); renderText(source); try { List<ImageData> imageData = future.get(); for (ImageData data : imageData) renderImage(data); } catch (InterruptedException e) { // Re-assert the thread's interrupted status Thread.currentThread().interrupt(); // We don't need the result, so cancel the task too future.cancel(true); } catch (ExecutionException e) { throw launderThrowable(e.getCause()); } } }
先开启线程提交图片下载,再渲染文字,最后等待异步的图片下载完成后,再渲染图片。在一定程度上减少了程序的响应时间。
BlockingQueue是一个非常有利的工具:
1、当多个Class共享一个线程池的时候,可以在Task执行完毕后将结果写入BlockingQueue。主线程从BQ中get,即使暂时没有也是异步的、线程安全的。
2、可以控制多个扔进Executors的异步任务的执行结果。
另外Future提供了一个带timeout的接口。用于有时间限制的完成指定任务。
V get(long timeout,TimeUnit unit)
Page renderPageWithAd() throws InterruptedException { long endNanos = System.nanoTime() + TIME_BUDGET; Future<Ad> f = exec.submit(new FetchAdTask()); // Render the page while waiting for the ad Page page = renderPageBody(); Ad ad; try { // Only wait for the remaining time budget long timeLeft = endNanos - System.nanoTime(); ad = f.get(timeLeft, NANOSECONDS); } catch (ExecutionException e) { ad = DEFAULT_AD; } catch (TimeoutException e) { ad = DEFAULT_AD; f.cancel(true); } page.setAd(ad); return page; }
Executors也提供了带时间版本的invokeAll,用于对一组提交的Future,并在指定时间内返回。