本文共 46203 字,大约阅读时间需要 154 分钟。
本文介绍了Java原生的多线程技术(1.2),通过详细介绍wait和notify相关的机制、基础的多线程技术以及基于这些技术的等待超时、线程间的通信技术和线程池高阶技术,最后通过一个基于线程池的简单文本web服务器—MollyServer,来阐明多线程带来好处。通过介绍这些技术,展示了在没有使用Java并发包的时代(1.5-)是如何完成Java的多线程编程,为理解Java5提供了良好帮助。 线程简介1 Java从诞生开始就明智的选择内置对多线程的支持,这将Java语言同其他同一时期的语言相比,具有明显优势。线程作为操作系统最小的调度单元,多个线程同时执行,将会改善我们的代码,在多核环境中具有更加明显的好处,但是过多的创建线程和对线程的不当管理也容易造成问题。 启动线程 构造线程 Java中启动线程必须要先行的构造一个Thread对象,然后调用这个对象的start方法。 01 this.group = g; 02 this.daemon = parent.isDaemon(); 03 this.priority = parent.getPriority(); 04 this.name = name.toCharArray(); 05 if (security == null || isCCLOverridden(parent.getClass())) 06 this.contextClassLoader = parent.getContextClassLoader(); 07 else 08 this.contextClassLoader = parent.contextClassLoader; 09 this.inheritedAccessControlContext = AccessController.getContext(); 10 this.target = target; 11 setPriority(priority); 12 if (parent.inheritableThreadLocals != null) 13 this.inheritableThreadLocals = 14 ThreadLocal.createInheritedMap(parent.inheritableThreadLocals); 15 /* Stash the specified stack size in case the VM cares */ 16 this.stackSize = stackSize; 17 18 /* Set thread ID */ 19 tid = nextThreadID(); 线程的构造,最主要或者说也就是线程对象的初始化过程,在上述过程中,一个新构造的线程对象是由其parent线程来进行分配空间的,而child线程继承了parent的是否Daemon,优先级和加载资源的classloader,栈空间的大小并且还会分配一个唯一的ID来标识这个child线程,至此一个能够运行的线程对象就初始化好了,在堆内存中等待着运行。 启动线程 调用Thread对象的start方法,就可启动一个新的线程,parent线程同步告知Java VM,只要线程规划器空闲,应立即启动这个线程。 2 而启动线程,也是交给操作系统来完成,这里就是一个本地方法了。 启动一个线程时,最好设置名称,这样在jstack分析时,就会好很多,自定义的线程最好能够起个名字。 01 /** 02 * @author weipeng 03 * 04 */ 05 public class ThreadName { 06 07 /** 08 * @param args 09 */ 10 public static void main(String[] args) { 11 Thread t = new Thread(new Job()); 12 t.setName("ThreadNameJob"); 13 t.start(); 14 } 15 16 static class Job implements Runnable { 17 18 @Override 19 public void run() { 20 try { 21 Thread.sleep(10000); 22 } catch (InterruptedException e) { 23 e.printStackTrace(); 24 } 25 } 26 27 } 28 29 } 上述代码直接运行,可以通过jstack pid来观察栈信息,结果如下: 01 2012-05-05 23:50:07 02 Full thread dump Java HotSpot(TM) 64-Bit Server VM (20.1-b02 mixed mode): 03 04 "Attach Listener" daemon prio=10 tid=0x00007f4c38001000 nid=0x30b5 waiting on condition [0x0000000000000000] 05 java.lang.Thread.State: RUNNABLE 06 07 "DestroyJavaVM" prio=10 tid=0x00007f4c60007800 nid=0x3086 waiting on condition [0x0000000000000000] 08 java.lang.Thread.State: RUNNABLE 09 10 "ThreadNameJob" prio=10 tid=0x00007f4c600a2800 nid=0x3097 waiting on condition [0x00007f4c37cfb000] 11 java.lang.Thread.State: TIMED_WAITING (sleeping) 12 at java.lang.Thread.sleep(Native Method) 13 at com.murdock.books.multithread.example.ThreadName$Job.run(ThreadName.java:26) 14 at java.lang.Thread.run(Thread.java:662) 15 16 "Low Memory Detector" daemon prio=10 tid=0x00007f4c60091800 nid=0x3095 runnable [0x0000000000000000] 17 java.lang.Thread.State: RUNNABLE 18 19 "C2 CompilerThread1" daemon prio=10 tid=0x00007f4c6008f000 nid=0x3094 waiting on condition [0x0000000000000000] 20 java.lang.Thread.State: RUNNABLE 21 22 "C2 CompilerThread0" daemon prio=10 tid=0x00007f4c6008c000 nid=0x3093 waiting on condition [0x0000000000000000] 23 java.lang.Thread.State: RUNNABLE 24 25 "Signal Dispatcher" daemon prio=10 tid=0x00007f4c6008a000 nid=0x3092 runnable [0x0000000000000000] 26 java.lang.Thread.State: RUNNABLE 27 28 "Finalizer" daemon prio=10 tid=0x00007f4c6006e000 nid=0x3091 in Object.wait() [0x00007f4c5c860000] 29 java.lang.Thread.State: WAITING (on object monitor) 30 at java.lang.Object.wait(Native Method) 31 - waiting on <0x00000000ec6b1300> (a java.lang.ref.ReferenceQueue$Lock) 32 at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:118) 33 - locked <0x00000000ec6b1300> (a java.lang.ref.ReferenceQueue$Lock) 34 at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:134) 35 at java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:159) 36 37 "Reference Handler" daemon prio=10 tid=0x00007f4c6006c000 nid=0x3090 in Object.wait() [0x00007f4c5c961000] 38 java.lang.Thread.State: WAITING (on object monitor) 39 at java.lang.Object.wait(Native Method) 40 - waiting on <0x00000000ec6b11d8> (a java.lang.ref.Reference$Lock) 41 at java.lang.Object.wait(Object.java:485) 42 at java.lang.ref.Reference$ReferenceHandler.run(Reference.java:116) 43 - locked <0x00000000ec6b11d8> (a java.lang.ref.Reference$Lock) 44 45 "VM Thread" prio=10 tid=0x00007f4c60065800 nid=0x308f runnable 46 47 "GC task thread#0 (ParallelGC)" prio=10 tid=0x00007f4c6001a800 nid=0x3087 runnable 48 49 "GC task thread#1 (ParallelGC)" prio=10 tid=0x00007f4c6001c800 nid=0x3088 runnable 50 51 "GC task thread#2 (ParallelGC)" prio=10 tid=0x00007f4c6001e800 nid=0x3089 runnable 52 53 "GC task thread#3 (ParallelGC)" prio=10 tid=0x00007f4c60020000 nid=0x308a runnable 54 55 "VM Periodic Task Thread" prio=10 tid=0x00007f4c6009c000 nid=0x3096 waiting on condition 56 57 JNI global references: 882 可以看到一个Java程序在运行时,后台创建了很多的线程,所以一个Java程序,纵使只有main,它也是多线程的,其中可以看到ThreadNameJob这个线程,也可以看到本地以吞吐量优先的ParallelGC的线程,它的数量默认是和CPU相同的,其中有4个对新生代进行GC的线程。 终止线程 3 线程从执行Runnalbe开始到结束。 理解中断 中断是一种状态,它使一个运行中的线程能够感知到其他线程对自身作出了中断操作,也就是影响到了自己。线程工作检查自身是否被中断来作出响应的行为。而该状态并没有维护在Thread中,是通过native方法获得。 可以通过当前线程对象的isInterrupted来判断是否被中断了。 01 /** 02 * @author weipeng 03 * 04 */ 05 public class Interrupted { 06 07 /** 08 * @param args 09 */ 10 public static void main(String[] args) throws Exception { 11 InterruptedJob ij = new InterruptedJob(); 12 ij.setName("InterruptedJobThread "); 13 ij.start(); 14 15 Thread.sleep(2000); 16 17 // 中断 18 ij.interrupt(); 19 System.out.println("INTERRUPTED IJ"); 20 21 Thread.sleep(2000); 22 } 23 24 static class InterruptedJob extends Thread { 25 @Override 26 public void run() { 27 try { 28 while (true) { 29 Thread.sleep(1000); 30 } 31 } catch (InterruptedException e) { 32 System.out.println("CURRENT INTERRUPT STATUS IS " 33 + Thread.currentThread().getName() 34 + Thread.currentThread().isInterrupted()); 35 // 再次进行中断 36 Thread.currentThread().interrupt(); 37 38 System.out.println("CURRENT INTERRUPT STATUS IS " 39 + Thread.currentThread().getName() 40 + Thread.currentThread().isInterrupted()); 41 } 42 } 43 } 44 45 } 上述程序输出: INTERRUPTED IJ CURRENT INTERRUPT STATUS IS InterruptedJobThread false CURRENT INTERRUPT STATUS IS InterruptedJobThread true 可以看出一旦抛出InterruptedException,当前线程的中断状态就被清除,但是也可以调用Thread.interrupted()来清除当前的中断状态。 线程属性 4 Java中创建的线程均会映射为操作系统层面的线程,在Java线程对象中有部分属性可以提供访问。线程状态是理解线程运行的关键。 线程优先级 01 public 02 class Thread implements Runnable { 03 /* Make sure registerNatives is the first thing <clinit> does. */ 04 private static native void registerNatives(); 05 static { 06 registerNatives(); 07 } 08 09 private char name[]; 10 private int priority; 可以看到priority,这个代表着优先级,优先级的范围从1到10,优先级高的线程占有CPU时间长一些,这当然是在长时间运行时体现出来的,但是不能做为程序执行的依据。 对priority可以通过对线程对象进行设置,使用setPriority来完成对线程优先级的设定。 下面的例子中,构建了三个不同的线程,它们的优先级不一样,从1到10,然后运行,优先级高的线程对times++执行的会多一些。 01 /** 02 * @author weipeng 03 * 04 */ 05 public class Priority { 06 private static CountDownLatch countDownLatch = new CountDownLatch(10000000); 07 08 private static CountDownLatch start = new CountDownLatch(1); 09 10 public static void main(String[] args) { 11 CountJob job1 = new CountJob(); 12 Thread lingdao = new Thread(job1); 13 lingdao.setPriority(10); 14 lingdao.start(); 15 16 CountJob job2 = new CountJob(); 17 Thread pming = new Thread(job2); 18 pming.setPriority(1); 19 pming.start(); 20 21 CountJob job3 = new CountJob(); 22 Thread zhongchan = new Thread(job3); 23 zhongchan.setPriority(5); 24 zhongchan.start(); 25 26 start.countDown(); 27 28 try { 29 countDownLatch.await(); 30 } catch (InterruptedException e) { 31 e.printStackTrace(); 32 } 33 34 System.out.println("lingdao : have " + job1.getTimes()); 35 System.out.println("pming : have" + job2.getTimes()); 36 System.out.println("zhongchan : have" + job3.getTimes()); 37 38 } 39 40 static class CountJob implements Runnable { 41 42 private int times = 0; 43 44 @Override 45 public void run() { 46 // 等待开始 47 try { 48 start.await(); 49 } catch (InterruptedException e) { 50 e.printStackTrace(); 51 } 52 53 while (countDownLatch.getCount() > 0) { 54 synchronized (CountJob.class) { 55 if (countDownLatch.getCount() > 0) { 56 countDownLatch.countDown(); 57 times++; 58 } 59 } 60 } 61 } 62 63 public int getTimes() { 64 return times; 65 } 66 } 67 } 执行结果如下: lingdao : have 4347635 pming : have2661562 zhongchan : have2990803 每次执行的可能都不一样,但是总的趋势是高优先级的线程对CPU的占用时间会多一些。 线程状态 线程在运行的生命周期中可能处于下面的6种不同的状态,在一个时刻,线程可能处于CPU上处于运行,或者暂时的没有分配到CPU资源而处于就绪(准备运行),或者处于阻塞的状态。具体内容如下面的表格所示: 状态名称 阻塞 可以中断 说明 运行中 N N 正在CPU上进行执行 准备运行(就绪) N N 暂时的失去CPU资源处于就绪队列中,可能随时被线程调度器调度执行 休眠 Y Y 让出CPU资源的就绪队列,等待一段时间后再次被放入队列,可以被中断提前进入就绪队列 等待 Y Y 接受到通知或者等待超时会进入到就绪队列,可以被中断 阻塞于I/O Y N I/O条件满足后,例如读入了一些字符,准备运行 阻塞于同步 Y N 当获得同步锁后准备运行 可以使用如下状态迁移来描述线程的状态: 5 线程在一个时刻将会处于上述的三种状态之一,这个模型将有效的理解Java线程对象,但是其中处于等待状态的线程可能会在等待I/O和等待同步时无法被中断,虽然运行的线程已经被中断标识,但是不会像休眠和等待一样通过InterruptedException来直接返回。 01 /** 02 * <pre> 03 * 处于同步读取的线程被中断,不会抛出异常 04 * 05 * </pre> 06 * 07 * @author weipeng 08 * 09 */ 10 public class ReadInterrupted { 11 12 /** 13 * @param args 14 */ 15 public static void main(String[] args) { 16 // 使用父线程,也就是main-thread 17 Thread thread = new Thread(new InterruptedJob(Thread.currentThread())); 18 thread.start(); 19 20 InputStream is = System.in; 21 try { 22 is.read(); 23 } catch (IOException e) { 24 e.printStackTrace(); 25 } 26 27 System.out.println("Main Thread is interrupted ? " + Thread.currentThread().isInterrupted()); 28 } 29 30 static class InterruptedJob implements Runnable { 31 32 Thread interruptedThread; 33 34 public InterruptedJob(Thread thread) { 35 this.interruptedThread = thread; 36 } 37 38 @Override 39 public void run() { 40 try { 41 Thread.sleep(2000); 42 } catch (InterruptedException e) { 43 e.printStackTrace(); 44 } 45 46 interruptedThread.interrupt(); 47 } 48 } 49 } 运行的结果是: 这时整个线程挂在is.read上,这时随意从控制台输入一个字符,主线程退出: 123 Main Thread is interrupted ? true 可以看出对阻塞于同步I/O的线程被中断后,中断标识被打上,但是不会抛出异常退出。 线程规划 对高I/O的线程尽量给予高优先级的设定,对于低I/O以CPU运算为主的线程尽量降低优先级,避免过多的占用CPU。因此,不能依据线程优先级的高低来运行程序,需要保证每个线程都有运行的机会。 并发访问对象 6 Java支持多个线程同时的访问一个对象,或者对象的变量,由于每个线程可以拥有这个变量的拷贝(这么做的目的是能够快速的执行,虽然变量分配的内存在共享内存中,但是每个执行的线程还是可以拥有一份拷贝,这样做的目的是加速程序的执行,这是现代多核处理器的一个显著特性)。因此,程序在执行过程中,可能一个线程看到的变量并不一定是最新的。 Volatile Volatile关键字,就是告知任何对该变量的访问均需要从共享内存中获取,而对它的改变必须同步刷新会共享内存。 比如,表示一个程序是否运行的变量,boolean on = true,那么可能是另一个线程来对它进行关闭动作,因此将其设置成为volatile boolean on,这样就会再其他线程对它进行改变时,能够让原有的线程立刻感知到。 但是过多的使用volatile是不必要的,相反它会降低程序执行的效率。 Synchronized 同步,在带来可见性的同时,它主要是对多个线程在同一个时刻,只能有一个处于方法或者块中。 可以通过将synchronized关键字加在方法前面或者采用同步快的方式来进行表现: 01 static synchronized void m() { 02 System.out.println("T"); 03 } 04 05 public static void main(String[] args) { 06 m(); 07 08 synchronized(Synchronized.class) { 09 m(); 10 } 11 } } Java同步是针对普通的Java对象而言的,每个Java对象均有一把“锁”,这个锁在一个线程进入时会排斥其他线程进入,是一个排他锁。通过javap来观察字节码,可以看到: 01 public static void main(java.lang.String[]); 02 Code: 03 Stack=2, Locals=2, Args_size=1 04 0: invokestatic #31; //Method m:()V 05 3: ldc #1; //class com/murdock/books/multithread/example/Synchronized 06 5: dup 07 6: astore_1 08 7: monitorenter 09 8: invokestatic #31; //Method m:()V 10 11: aload_1 11 12: monitorexit 12 13: goto 19 13 16: aload_1 14 17: monitorexit 15 18: athrow 16 19: return 当出现命令monitorenter时代获得了该对象的锁,当运行命令monitorexit时代表释放了该对象的锁。 同步化集合 同步化访问 在Java的集合api中有非常多的同步集合,比如:Vector和Hashtable,这些集合的所有方法都是synchronized,也就是说对这些集合的访问是同步的,但是如果每个接口都有一个专属的同步集合实现是非常不现实的,因此用过使用Collections.synchronizedXxx方法,可以包装一个同步的集合对象进行使用。 比如,摘自Collections 1 public static <T> List<T> synchronizedList(List<T> list) { 2 return (list instanceof RandomAccess ? 3 new SynchronizedRandomAccessList<T>(list) : 4 new SynchronizedList<T>(list)); 5 } 该方法返回的就是一个实现了List接口的同步数据结构,这个同步的数据结构每个方法均是同步的,但是如果需要对其进行额外的操作,需要将其加入到同步块中。 1 SynchronizedCollection(Collection<E> c) { 2 if (c==null) 3 throw new NullPointerException(); 4 this.c = c; 5 mutex = this; 6 } 上面可以看到同步集合均是对自身进行同步。 01 public class Synchronized { 02 static synchronized void m() { 03 System.out.println("T"); 04 } 05 06 public static void main(String[] args) throws Exception { 07 List<String> s = new ArrayList<String>(); 08 s.add("1"); 09 10 List<String> synchronizedList = Collections.synchronizedList(s); 11 12 Thread t = new Thread(new AccessSynchronizedCollections( 13 synchronizedList)); 14 t.start(); 15 16 synchronized (synchronizedList) { 17 Thread.sleep(5000); 18 System.out.println("Main-thread" + synchronizedList.size()); 19 } 20 21 } 22 23 /** 24 * 这个线程将会首先休息2000ms,然后唤醒后去请求锁,并执行操作 25 */ 26 static class AccessSynchronizedCollections implements Runnable { 27 List<String> list; 28 29 public AccessSynchronizedCollections(List<String> list) { 30 this.list = list; 31 } 32 33 @Override 34 public void run() { 35 try { 36 Thread.sleep(2000); 37 } catch (InterruptedException e) { 38 e.printStackTrace(); 39 } 40 System.out.println("AccessSynchronizedCollections" + list.size()); 41 list.add("2"); 42 } 43 } 44 } 上述执行的结果: Main-thread1 AccessSynchronizedCollections1 可以看到,在自定义对集合操作,比如缺少就添加,就需要将集合进行同步,然后在进行操作,否则很容易在判定过程中加入了其他线程对集合的操作。 安全复制集合 有时一个集合对象是进程内共享的,可能会发生一些变化,因此在作出一些操作的时候,希望能够拿到一份瞬时的拷贝,这个拷贝可能和执行中的这一时刻的集合有了变化,但是能够保证是稳定的。就像我们出门买了一份报纸,我们回家阅读报纸的时候,上面的新闻可能随时会发生变化,但是这并不妨碍我们去阅读它。 第一种复制的方式: 1 List<String> synchronizedList = Collections.synchronizedList(list); 2 3 long currentTime = System.currentTimeMillis(); 4 for (int i = 0; i < 10000; i++) { 5 String[] array = synchronizedList.toArray(new String[0]); 6 } 7 System.out.println(System.currentTimeMillis() - currentTime); 第二种复制的方式: 1 for (int i = 0; i < 10000; i++) { 2 synchronized (synchronizedList) { 3 int size = synchronizedList.size(); 4 String[] array = new String[size]; 5 synchronizedList.toArray(array); 6 } 7 } 第一种比较简单,第二种对于new String[0]没有做过多的浪费,但是时间测算,第二种没有第一种好,因为主要比拼的是toArray的实现,在给定的数组大于等于列表时,将会使用给定的数组,否则将会通过反射构造一个数组,而这个还是很高效的。 因此对于集合的数组复制,使用第一种方式是比较适合的。 死锁 两个线程或者多个线程在请求其永远无法获取资源的锁时,就是死锁状态。这里不演示死锁产生的范例。 避免死锁的主要原则: 首先,对于资源的加锁时间必须足够短,也就是必要时进行锁; 其次,访问资源过程中的锁需要按照一致的顺序进行获取,否则需要提升出一个更大的锁来确保资源的获取; 最后,尽量通过封装的形式,避免将锁暴露给外部,从而造成不必要的资源死锁。 线程间通信 7 线程开始运行,就如同一个脚本一样,有自己的栈空间,按照既定的代码一步一步的执行,直到最后的终结。但是每个运作中的线程,如果仅仅是孤立的运作,那么没有一点用处,或者说用处很少,但是多个运作的线程能够相互配合,各司其职将会带来巨大的好处。 线程间通信的必要性 一个运作的脚本(线程)修改了一个对象的值,另一个线程捕获到这个对象的变化,然后进行对应的操作,这个过程事件的触发启于一个线程,而最终的执行又是一个线程。因此前者好比生产者,后者就是消费者,这样的模式隔开了生产和消费,在功能上和架构上具有良好的伸缩性。但是在Java语言中怎样能够做到上述的过程呢? 当然,简单的办法是不断的循环去查看,比如: while (value != desire) { Thread.sleep(1000); } doXxx 这段伪码就是相当与如果值不是这个消费线程所要的,那么就睡眠一段时间,这样的方式看似能够解决这个问题,但是有两个矛盾的问题。 第一个,在睡眠时,基本不消耗CPU,但是如果睡得久,那么就不能及时的发现value已经变化,也就是及时性难以保证; 第二个,如果降低睡眠的时间,比如睡1毫秒,这样消费者能更加迅速的捕获出变化,但是它却占用了更多的CPU时间,造成了无端的浪费。 面对这个矛盾,Java通过固有的wait/notify机制能够很好的实现这个模式。 等待/通知机制 等待通知机制,是指一个线程调用了对象A上的wait方法,而另外的一个线程在进行了某些操作后,在对象A上的notify或者notifyAll方法,这样完成了两个线程之间的交互。而这个wait和notify之间的关系就像一个信号量一样来完成二者之间的交互工作。 一个标准的wait和notify的例子,这个例子有两个线程,第一个等待共享的一个值为false,当为false时它进行print,另外一个在睡眠了一段时间后,将这个值由原有的true改为false并notify。 01 /** 02 * @author weipeng 03 */ 04 public class WaitNotify { 05 static boolean flag = true; 06 static Object OBJ = new Object(); 07 public static void main(String[] args) { 08 Thread t1 = new Thread(new Waiter()); 09 t1.start(); 10 try { 11 Thread.sleep(1000); 12 } catch (InterruptedException e) { 13 e.printStackTrace(); 14 } 15 Thread t2 = new Thread(new Notifier()); 16 t2.start(); 17 } 18 19 /** 20 * 等待,如果flag为false则打印 21 */ 22 static class Waiter implements Runnable { 23 24 @Override 25 public void run() { 26 // 加锁,拥有OBJ的Monitor 27 synchronized (OBJ) { 28 // 当条件不满足时,继续wait,同时释放了OBJ的锁 29 while (flag) { 30 try { 31 System.out.println(Thread.currentThread() 32 + " still true. wait......"); 33 OBJ.wait(); 34 } catch (InterruptedException e) { 35 e.printStackTrace(); 36 } 37 } 38 // 条件满足时,完成工作 39 System.out 40 .println(Thread.currentThread() + " is false. doXXX."); 41 } 42 } 43 } 44 45 static class Notifier implements Runnable { 46 47 @Override 48 public void run() { 49 synchronized (OBJ) { 50 51 // 获取OBJ的锁,然后进行通知,通知时不会释放OBJ的锁 52 // 这也类似于过早通知 53 OBJ.notifyAll(); 54 try { 55 Thread.sleep(100); 56 } catch (InterruptedException e) { 57 e.printStackTrace(); 58 } 59 flag = false; 60 OBJ.notifyAll(); 61 } 62 } 63 } 64 } 从上面的例子中能够提炼出经典的等待和通知机制,对于等待的一方,遵循如下的原则: (1)获得对象的锁; (2)如果条件不满足,那么调用对象的wait,释放锁,被通知后继续检查(2) (3)条件已经满足,执行对应的逻辑。 synchronized(OBJ) { while(Condition not hold) { OBJ.wait(); } // Condition hold do XXX; } 通知的一方,遵循如下原则: (1)获得对象的锁; (2)更新变量或者条件,然后通知。 synchronized(OBJ) { value = newvalue; OBJ.notifyAll(); } 等待/通知的API 等待和通知机制被深深植入了Java语言中,在Object方法中有5个final的方法,也就是子类不能复写的方法。 方法名称 简介 notify() 随机通知调用notify对象上正在等待的线程,注意这个通知没有放弃对对象的锁,仅在通知notify完成之后直到释放了对象的锁才在对方线程的wait方法处返回; notifyAll() 这个方法会依次通知所有的正在等待在该对象上的线程,是一种比较保险的做法; wait() 该方法会让调用线程进入休眠状态,只有等待另外线程的notify或者被中断才会返回,注意的是,调用wait后,会释放对象的锁; wait(long) 等待,这里的参数时间是毫秒,也就是等待长达n毫秒,如果没有通知就超时返回,但是这里很难区分出是其他线程的notify还是超时返回; wait(long, int) 对于超时更细粒度的控制,达到纳秒,但是这个方法用的不多。 这里要说明notify方法不会释放对象的锁,而也只有释放了对象的锁,另一个线程才能从wait中竞争获得对象的锁并从wait方法中返回。 01 /** 02 * @author weipeng 03 */ 04 public class WaitNotify { 05 static boolean flag = true; 06 07 static Object OBJ = new Object(); 08 09 public static void main(String[] args) { 10 Thread t1 = new Thread(new Waiter()); 11 t1.start(); 12 13 try { 14 Thread.sleep(1000); 15 } catch (InterruptedException e) { 16 e.printStackTrace(); 17 } 18 19 Thread t2 = new Thread(new Notifier()); 20 t2.start(); 21 22 } 23 24 /** 25 * 等待,如果flag为false则打印 26 */ 27 static class Waiter implements Runnable { 28 29 @Override 30 public void run() { 31 // 加锁,拥有OBJ的Monitor 32 synchronized (OBJ) { 33 // 当条件不满足时,继续wait,同时释放了OBJ的锁 34 while (flag) { 35 try { 36 System.out.println(Thread.currentThread() 37 + " still true. wait......" + new Date()); 38 OBJ.wait(); 39 } catch (InterruptedException e) { 40 e.printStackTrace(); 41 } 42 } 43 // 条件满足时,完成工作 44 System.out 45 .println(Thread.currentThread() + " is false. doXXX." + new Date()); 46 } 47 } 48 } 49 50 static class Notifier implements Runnable { 51 52 @Override 53 public void run() { 54 synchronized (OBJ) { 55 56 // 获取OBJ的锁,然后进行通知,不会在notify调用中,释放OBJ的锁 57 // 这也类似于过早通知 58 // 直到当前线程释放了OBJ后,Waiter才能从wait方法中返回 59 OBJ.notifyAll(); 60 61 flag = false; 62 63 try { 64 Thread.sleep(10000); 65 } catch (InterruptedException e) { 66 e.printStackTrace(); 67 } 68 } 69 } 70 } 71 } 程序的输出: Thread[Thread-0,5,main] still true. wait……Sun Jun 24 20:53:03 CST 2012 Thread[Thread-0,5,main] is false. doXXX.Sun Jun 24 20:53:14 CST 2012 可以看到,二者之间相差了10秒,也就是Thread.sleep(10000)这段代码造成的,可以看出Notifier没有释放OBJ的锁,而Waiter在对方没有释放前是不会返回的。 PipedStream管道 Piped这个词就是管道,相当于从一端入一端出的输入输出流。只是不是从网络和文件上读入内容,而是在线程之间传递数据,而传输的媒介为内存。 管道主要包括了: PipedOutputStream、PipedInputStream、PipedReader和PipedWriter四个,面向的处理内容为字节和字符。 01 public class PipedTest { 02 03 static class Print implements Runnable { 04 private PipedInputStream in; 05 06 public Print(PipedInputStream in) { 07 this.in = in; 08 } 09 10 @Override 11 public void run() { 12 int receive = 0; 13 try { 14 while ((receive = in.read()) != -1) { 15 System.out.println(receive); 16 } 17 } catch (IOException ex) { 18 ex.printStackTrace(); 19 } 20 } 21 22 } 23 24 /** 25 * @param args 26 */ 27 public static void main(String[] args) throws Exception { 28 PipedOutputStream out = new PipedOutputStream(); 29 PipedInputStream in = new PipedInputStream(); 30 31 // Out ==> In 32 out.connect(in); 33 34 Thread t = new Thread(new Print(in)); 35 t.start(); 36 37 int receive = 0; 38 39 while ((receive = System.in.read()) != -1) { 40 out.write(receive); 41 } 42 } 43 44 } 上述程序,以main线程作为输入,而另外的Print作为输出。对于Piped类型的流,必须要进行connect,如果没有绑定,对于该流的访问会抛出异常。 ThreadLocal ThreadLocal线程变量,这是一个以ThreadLocal对象为Key,一个Object为value的存储结构。它被附带在线程上,也就是说一个线程可以根据一个ThreadLocal拥有一个变量。 在线程对象中,有一个成员变量,类型如下: 01 static class ThreadLocalMap { 02 03 /** 04 * The entries in this hash map extend WeakReference, using 05 * its main ref field as the key (which is always a 06 * ThreadLocal object). Note that null keys (i.e. entry.get() 07 * == null) mean that the key is no longer referenced, so the 08 * entry can be expunged from table. Such entries are referred to 09 * as "stale entries" in the code that follows. 10 */ 11 static class Entry extends WeakReference<ThreadLocal> { 12 /** The value associated with this ThreadLocal. */ 13 Object value; 14 15 Entry(ThreadLocal k, Object v) { 16 super(k); 17 value = v; 18 } 19 } 可以看到线程对象中的这个ThreadLocalMap是以ThreadLocal作为Key的。那么对于一个ThreadLocal在线程对其调用get方法时,会获取对应的Object,下面是get方法。 01 public T get() { 02 Thread t = Thread.currentThread(); 03 ThreadLocalMap map = getMap(t); 04 if (map != null) { 05 ThreadLocalMap.Entry e = map.getEntry(this); 06 if (e != null) 07 return (T)e.value; 08 } 09 return setInitialValue(); 10 } 下面对这些代码做些说明: 首先调用方会获得掉用线程Thread t = Thread.currentThread(); 其次会获得线程对象的ThreadLocalMap对象; 然后在ThreadLocalMap对象上,以this,也就是ThreadLocal为key去获得对应的值; 如果ThreadLocalMap这个对象为NULL,这里做延迟加载,通过setInitialValue()方法来初始化线程对象的ThreadLocalMap变量。 可以看出只有线程执行了任意ThreadLocal的get方法后,才会拥有ThreadLocalMap这个对象,而该变量又是包访问级别的,所以不会担心被其他类修改。 完全等待超时 8 有时我们需要在调用一个方法时等待一段时间(一般来说是设置一个值,有更改),等待条件的满足,而等待是有时限的,比如:1000ms,如果在1000ms后无法满足条件那么返回,否则在时限内如果成功则立刻返回。 模式 之前提到了基于wait的经典模式,即:同步,while,wait加doXxx的逻辑,那么这种模式无法做到一点,就是能够让客户端超时返回。 如果加入超时的话,对于经典模式的修改其实不会很复杂,假设超时时间是t ms,那么可以推知在now + t之后就会超时,则定义: remaining = t; future = now + t; 这时仅需要wait(remaining)即可,在醒来之后会将future – now,这个会设置到remaining上,但是如果remaining为负数,则直接退出。 01 public synchronized Object get(long mills) throws InterruptedException { 02 long future = System.currentTimeMillis() + mills; 03 long remained = mills; 04 05 // 当结果为空并没有超时 06 while ((result == null) && remained > 0) { 07 wait(remained); 08 09 remained = future - System.currentTimeMillis(); 10 } 11 12 return result; 13 } 在while的判断中加入了remained > 0的约束。这个模式就可以实现等待超时,在mills毫秒内无法获取到result或者result已经获取到了,都会返回。 使用实例与场景 这里我们模拟一个数据库链接获取的过程,这是一个消费者和生产者的案例。 生产者每1000ms生产一个链接到池子中,每个消费者从池子中获取一个链接,如果在800ms获取不到,那么就返回,并告知获取链接超时。初始的池子里有10个链接,消费者有5个,生产者有2个。 Connection的定义 01 public class Connection { 02 public void sendStatement() { 03 try { 04 Thread.sleep(10); 05 System.out.println(Thread.currentThread() + " Send Statement"); 06 } catch (InterruptedException e) { 07 Thread.currentThread().interrupt(); 08 } 09 } 10 } ConnectionPool的定义 01 public class ConnectionPool { 02 03 private LinkedList<Connection> pool = new LinkedList<Connection>(); 04 private static final int MAX_SIZE = 20; 05 06 public ConnectionPool(int initialSize){ 07 if (initialSize > 0) { 08 for (int i = 0; i < initialSize; i++) { 09 pool.addLast(new Connection()); 10 } 11 } 12 } 13 14 public void releaseConnection() throws InterruptedException { 15 synchronized (pool) { 16 while (pool.size() >= MAX_SIZE) { 17 pool.wait(); 18 } 19 20 // 添加后需要进行通知,这样其他消费者能够感知到链接池中已经增加了一个链接 21 pool.addLast(new Connection()); 22 pool.notifyAll(); 23 } 24 } 25 26 public Connection fetchConnection(long mills) throws InterruptedException { 27 synchronized (pool) { 28 // 完全超时 29 if (mills <= 0) { 30 while (pool.isEmpty()) { 31 pool.wait(); 32 } 33 34 return pool.removeFirst(); 35 } else { 36 long futureTime = System.currentTimeMillis() + mills; 37 long deltaTime = mills; 38 39 while (pool.isEmpty() && deltaTime > 0) { 40 pool.wait(deltaTime); 41 deltaTime = futureTime - System.currentTimeMillis(); 42 } 43 44 Connection result = null; 45 if (!pool.isEmpty()) { 46 result = pool.removeFirst(); 47 } 48 49 return result; 50 } 51 } 52 } 53 } 这里主要看一下fecthConnection,它提供了完全超时的实现,主要是通过计算出将要超时的时间点futureTime,和超时的时间距离deltaTime,在这个基础上复用了仅点的同步、while和do的结构,只不过是在while的不通过条件中增加了时间距离的消耗判断,如果小于0直接返回,当然面对过早通知,将会更新deltaTime。 当执行从pool.wait方法中返回后,有可能是超时,也有可能是已经满足了池中有连接的状况,因此如果有连接则直接返回,否则返回空。 测试用例 001 public class ConnectionPoolTest { 002 003 static ConnectionPool pool = new ConnectionPool(10); 004 005 static CountDownLatch latch = new CountDownLatch(1); 006 007 /** 008 * <pre> 009 * Thread[Thread-5,5,main] put a connection. 010 * Thread[Thread-6,5,main] put a connection. 011 * Thread[Thread-4,5,main] got a connection 012 * Thread[Thread-3,5,main] got a connection 013 * Thread[Thread-5,5,main] put a connection. 014 * Thread[Thread-6,5,main] put a connection. 015 * Thread[Thread-1,5,main] got a connection 016 * Thread[Thread-4,5,main] got a connection 017 * </pre> 018 * 019 * @param args 020 */ 021 public static void main(String[] args) { 022 for (int i = 0; i < 5; i++) { 023 Consumer p = new Consumer(latch); 024 Thread t = new Thread(p); 025 t.start(); 026 } 027 028 for (int i = 0; i < 2; i++) { 029 Producer p = new Producer(latch); 030 Thread t = new Thread(p); 031 t.start(); 032 } 033 034 latch.countDown(); 035 } 036 037 static class Producer implements Runnable { 038 039 private CountDownLatch latch; 040 041 public Producer(CountDownLatch latch){ 042 this.latch = latch; 043 } 044 045 public void run() { 046 try { 047 latch.await(); 048 } catch (InterruptedException e) { 049 Thread.currentThread().interrupt(); 050 } 051 while (true) { 052 try { 053 Thread.sleep(1000); 054 } catch (InterruptedException e) { 055 e.printStackTrace(); 056 } 057 058 try { 059 pool.releaseConnection(); 060 } catch (InterruptedException e) { 061 e.printStackTrace(); 062 } 063 064 System.out.println(Thread.currentThread() + " put a connection."); 065 } 066 } 067 } 068 069 static class Consumer implements Runnable { 070 071 private CountDownLatch latch; 072 073 public Consumer(CountDownLatch latch){ 074 this.latch = latch; 075 } 076 077 public void run() { 078 try { 079 latch.await(); 080 } catch (InterruptedException e) { 081 Thread.currentThread().interrupt(); 082 } 083 while (true) { 084 try { 085 Thread.sleep(1000); 086 } catch (InterruptedException e) { 087 e.printStackTrace(); 088 } 089 090 try { 091 Connection connection = pool.fetchConnection(0); 092 093 if (connection == null) { 094 System.out.println(Thread.currentThread() + " can not got a connection"); 095 } else { 096 System.out.println(Thread.currentThread() + " got a connection"); 097 } 098 } catch (InterruptedException e) { 099 e.printStackTrace(); 100 } 101 102 } 103 } 104 } 105 } 这是一个执行了一段时间的结果: 01 Thread[Thread-5,5,main] put a connection. 02 Thread[Thread-0,5,main] got a connection 03 Thread[Thread-6,5,main] put a connection. 04 Thread[Thread-0,5,main] got a connection 05 Thread[Thread-6,5,main] put a connection. 06 Thread[Thread-5,5,main] put a connection. 07 Thread[Thread-4,5,main] got a connection 08 Thread[Thread-5,5,main] put a connection. 09 Thread[Thread-6,5,main] put a connection. 10 Thread[Thread-4,5,main] got a connection 11 Thread[Thread-0,5,main] got a connection 可以看到,因为生产者少,所以每次生产连接后,都被等待的消费者取走,而超时是完全超时,如果我们吧等待的时间长度调整到2000ms,就可以看到如下结果: 1 Thread[Thread-6,5,main] put a connection. 2 Thread[Thread-0,5,main] got a connection 3 Thread[Thread-2,5,main] got a connection 4 Thread[Thread-1,5,main] can not got a connection 5 Thread[Thread-5,5,main] put a connection. 6 Thread[Thread-6,5,main] put a connection. 有部分消费者,等待了2000ms没有得到连接后,就返回了,这里就非常类似数据库链接池的实现。 阻塞队列(FIFO) 阻塞队列是对于资源获取和释放的一个良好数据结构,比如:作为资源的生产方,如果生产方生产的数据没有位置存放,那么生产方将会阻塞在生产的这个方法上,当然也可以选择阻塞多少毫秒。消费方也是同样的道理。 阻塞队列 001 /** 002 * @author weipeng 2012-7-24 下午4:34:22 003 */ 004 public class BlockingQueue<E> { 005 006 /** 007 * 默认队列长度 008 */ 009 private static final int DEFAULT_SIZE = 10; 010 /** 011 * 队列数组 012 */ 013 private Object[] array; 014 /** 015 * 当前的长度 016 */ 017 private int size; 018 /** 019 * 将要放置的位置 020 */ 021 private int head; 022 /** 023 * 将要移除的位置 024 */ 025 private int tail; 026 027 public BlockingQueue(int size){ 028 array = size > 0 ? new Object[size] : new Object[DEFAULT_SIZE]; 029 } 030 031 public BlockingQueue(){ 032 this(DEFAULT_SIZE); 033 } 034 035 public int getCapacity() { 036 return array.length; 037 } 038 039 /** 040 * @return 041 */ 042 public int getSize() { 043 synchronized (array) { 044 return size; 045 } 046 } 047 048 @SuppressWarnings("unchecked") 049 public E take(long millis) throws InterruptedException { 050 long waitTime = millis > 0 ? millis : 0; 051 synchronized (array) { 052 Object result = null; 053 if (waitTime == 0) { 054 while (size <= 0) { 055 array.wait(); 056 } 057 058 result = array[tail]; 059 size--; 060 tail = (tail + 1) % getCapacity(); 061 062 } else { 063 long future = System.currentTimeMillis() + waitTime; 064 long remain = waitTime; 065 066 while (size <= 0 && remain > 0) { 067 array.wait(remain); 068 remain = future - System.currentTimeMillis(); 069 } 070 071 if (size > 0) { 072 result = array[tail]; 073 size--; 074 tail = (tail + 1) % getCapacity(); 075 076 } 077 078 } 079 080 array.notifyAll(); 081 return (E) result; 082 } 083 } 084 085 public E take() throws InterruptedException { 086 return take(0); 087 } 088 089 public boolean offer(E e, long mills) throws InterruptedException { 090 long waitTime = mills > 0 ? mills : 0; 091 boolean result = false; 092 if (e != null) { 093 synchronized (array) { 094 if (waitTime <= 0) { 095 while (size >= getCapacity()) { 096 array.wait(); 097 } 098 099 array[head] = e; 100 size++; 101 head = (head + 1) % getCapacity(); 102 103 result = true; 104 } else { 105 long future = System.currentTimeMillis() + waitTime; 106 long remain = waitTime; 107 108 while (size >= getCapacity() && remain > 0) { 109 array.wait(remain); 110 remain = future - System.currentTimeMillis(); 111 } 112 113 if (size < getCapacity()) { 114 array[head] = e; 115 size++; 116 head = (head + 1) % getCapacity(); 117 118 result = true; 119 } 120 } 121 122 array.notifyAll(); 123 } 124 } 125 126 return result; 127 } 128 129 public boolean offer(E e) throws InterruptedException { 130 return offer(e, 0); 131 } 132 133 public void printQueue() { 134 synchronized (array) { 135 System.out.println("======================"); 136 for (int i = 0; i < size; i++) { 137 System.out.println("[" + i + "]" + array[i]); 138 } 139 System.out.println("[head]" + head); 140 System.out.println("[tail] " + tail); 141 System.out.println("[size]" + size); 142 System.out.println("======================"); 143 } 144 } 145 } 其中 head是插入的位置,tail是移除的位置。下面是测试用例: 01 @Test 02 public void offer() throws InterruptedException { 03 for (int i = 0; i < 10; i++) { 04 queue.offer(new Object()); 05 } 06 07 queue.printQueue(); 08 09 System.out.println(queue.offer(new Object(), 1000)); 10 } 输出结果: 01 ====================== 02 [0]java.lang.Object@78ce5b1c 03 [1]java.lang.Object@33bfc93a 04 [2]java.lang.Object@74341960 05 [3]java.lang.Object@86e293a 06 [4]java.lang.Object@7854a328 07 [5]java.lang.Object@7ca3d4cf 08 [6]java.lang.Object@67e8a1f6 09 [7]java.lang.Object@59e152c5 10 [8]java.lang.Object@5801319c 11 [9]java.lang.Object@366025e7 12 [head]0 13 [tail] 0 14 [size]10 15 ====================== 16 false 可以看到第11次添加被阻塞了,在1秒内没有添加成功,那么直接返回false。 01 @Test 02 public void take() throws InterruptedException { 03 Thread t = new Thread() { 04 05 Thread thread; 06 { 07 thread = Thread.currentThread(); 08 } 09 10 @Override 11 public void run() { 12 try { 13 Thread.sleep(500); 14 } catch (InterruptedException e) { 15 e.printStackTrace(); 16 } 17 thread.interrupt(); 18 } 19 20 }; 21 t.start(); 22 System.out.println(queue.take(2000)); 23 } 结果是在2秒内,还没有获取到,主线程被中断,而take能够感知到中断,就提前返回了。 01 @Test 02 public void interactive() throws Exception { 03 final AtomicLong offer = new AtomicLong(); 04 final AtomicLong take = new AtomicLong(); 05 final AtomicLong notTake = new AtomicLong(); 06 07 Thread t = new Thread() { 08 09 public void run() { 10 while (true) { 11 try { 12 queue.offer(new Object()); 13 offer.incrementAndGet(); 14 } catch (InterruptedException e) { 15 e.printStackTrace(); 16 } 17 } 18 } 19 }; 20 21 t.start(); 22 23 Thread t1 = new Thread() { 24 25 public void run() { 26 while (true) { 27 try { 28 if (queue.take(1) == null) { 29 notTake.incrementAndGet(); 30 } else { 31 take.incrementAndGet(); 32 } 33 } catch (InterruptedException e) { 34 e.printStackTrace(); 35 } 36 } 37 } 38 }; 39 t1.start(); 40 41 Thread t2 = new Thread() { 42 43 public void run() { 44 while (true) { 45 try { 46 if (queue.take(1) == null) { 47 notTake.incrementAndGet(); 48 } else { 49 take.incrementAndGet(); 50 } 51 } catch (InterruptedException e) { 52 e.printStackTrace(); 53 } 54 } 55 } 56 }; 57 t2.start(); 58 59 Thread.sleep(10000); 60 t.interrupt(); 61 t1.interrupt(); 62 t2.interrupt(); 63 System.out.println(offer.get()); 64 System.out.println(take.get()); 65 System.out.println(notTake.get()); 66 67 queue.printQueue(); 68 } 运行了10秒钟,1个生产方,2个消费方,每个消费者在1ms内没有获取到的时候,就会将notTake加1。 结果输出: 01 java.lang.InterruptedException 02 at java.lang.Object.wait(Native Method) 03 at com.murdock.controller.BlockingQueue.take(BlockingQueue.java:74) 04 at com.murdock.controller.BlockingQueueTest$3.run(BlockingQueueTest.java:81) 05 java.lang.InterruptedException 06 at java.lang.Object.wait(Native Method) 07 at com.murdock.controller.BlockingQueue.take(BlockingQueue.java:74) 08 at com.murdock.controller.BlockingQueueTest$4.run(BlockingQueueTest.java:99) 09 java.lang.InterruptedException 10 at java.lang.Object.wait(Native Method) 11 at java.lang.Object.wait(Object.java:485) 12 at com.murdock.controller.BlockingQueue.offer(BlockingQueue.java:103) 13 at com.murdock.controller.BlockingQueue.offer(BlockingQueue.java:137) 14 at com.murdock.controller.BlockingQueueTest$2.run(BlockingQueueTest.java:65) 8828338 8828338 6283 ====================== [head]8 [tail] 8 [size]0 ====================== 可以看到有6283次没有获取到,生产了8828338次,消费了8828338次,一致的,但是有6283次没有获取到数据,因为超时返回了。 线程池(ThreadPool) 9 线程池技术简介 对于服务端的程序,经常处理的场景是: 面对客户端传入的短小任务,快速的处理并返回。 如果每次接受到一个任务,创建一个线程,然后进行执行,这种模式在原型阶段是个不错的选择,但是如果面对的是成千上万的任务递交进服务器时,如果还是采用一个任务一个线程的方式,那么将会创建数以万记的线程,从而是操作系统进入到频繁上下文切换的状态,而如文中第一章所述,线程的创建和消亡是需要耗费系统资源的,这样无疑是无法满足要求的。 而线程池技术能够很好的解决这个问题,它预先的创建了若干的线程,也就是说线程的创建是托管的,并不能由用户直接完全控制,从而使用固定或较为固定数目的线程来完成任务的执行,一方面消除了频繁创建和消亡线程的开销,另一方面,随着任务的请求多少能够平缓的进行响应。 在最优的状态下,系统面临大量的请求和较小的请求时,总体线程数量水平波动不大,当请求的规模变大时,响应处于平缓的劣化。 线程池的实现 线程池接口的定义 01 /** 02 * @author weipeng 03 */ 04 public interface ThreadPool<Job extends Runnable> { 05 06 /** 07 * <pre> 08 * 执行一个Job,这个Job需要实现Runnable 09 * 10 * </pre> 11 * 12 * @param job 13 */ 14 void execute(Job job); 15 16 /** 17 * <pre> 18 * 关闭线程池 19 * 20 * </pre> 21 */ 22 void shutdown(); 23 24 /** 25 * <pre> 26 * 增加工作线程 27 * 28 * </pre> 29 * 30 * @param workerNum 31 */ 32 void addWorkers(int workerNum); 33 34 /** 35 * <pre> 36 * 减少工作线程 37 * 38 * </pre> 39 * 40 * @param workerNum 41 */ 42 void removeWorker(int workerNum); 43 44 /** 45 * <pre> 46 * 得到Jobs的列表 47 * 48 * </pre> 49 * 50 * @return 51 */ 52 int getJobSize(); 53 54 } 可以看到上面的接口可以完成一个Runnable的执行,并且能够将线程池中的工作线程进行增加和减少,同时可以支持优雅的关闭。 线程池的实现 001 /** 002 * <pre> 003 * 默认的线程池实现,可以新增工作线程也可以减少工作线程 004 * 005 * 当然提交JOB后会进入队列中,而Worker进行消费 006 * 007 * 这是一个简单的生产和消费者模式 008 * 009 * </pre> 010 * 011 * @author weipeng 012 * 013 */ 014 public class DefaultThreadPool<Job extends Runnable> implements ThreadPool<Job> { 015 016 /** 017 * 线程池最大限制数 018 */ 019 private static final int MAX_WORKER_NUMBERS = 10; 020 /** 021 * 线程池默认的数量 022 */ 023 private static final int DEFAULT_WORKER_NUMBERS = 5; 024 /** 025 * 线程池最小的数量 026 */ 027 private static final int MIN_WORKER_NUMBERS = 1; 028 /** 029 * 这是一个工作列表,将会向里面插入工作 030 */ 031 private final LinkedList<Job> jobs = new LinkedList<Job>(); 032 /** 033 * 工作者列表 034 */ 035 private final List<Worker> workers = Collections 036 .synchronizedList(new ArrayList<Worker>()); 037 /** 038 * 工作者线程的数量 039 */ 040 private int workerNum = DEFAULT_WORKER_NUMBERS; 041 042 public DefaultThreadPool() { 043 initializeWokers(DEFAULT_WORKER_NUMBERS); 044 } 045 046 public DefaultThreadPool(int num) { 047 workerNum = num > MAX_WORKER_NUMBERS ? MAX_WORKER_NUMBERS 048 : num < MIN_WORKER_NUMBERS ? MIN_WORKER_NUMBERS : num; 049 initializeWokers(workerNum); 050 } 051 052 /* 053 * (non-Javadoc) 054 * 055 * @see 056 * com.murdock.books.multithread.example.ThreadPool#execute(java.lang.Runnable 057 * ) 058 */ 059 @Override 060 public void execute(Job job) { 061 if (job != null) { 062 // 添加一个工作,然后进行通知 063 synchronized (jobs) { 064 jobs.addLast(job); 065 jobs.notify(); 066 } 067 } 068 } 069 070 /* 071 * (non-Javadoc) 072 * 073 * @see com.murdock.books.multithread.example.ThreadPool#shutdown() 074 */ 075 @Override 076 public void shutdown() { 077 for (Worker worker : workers) { 078 worker.shutdown(); 079 } 080 } 081 082 @Override 083 public void addWorkers(int workerNum) { 084 int addedNum = workerNum; 085 if (workerNum + this.workerNum > MAX_WORKER_NUMBERS) { 086 addedNum = MAX_WORKER_NUMBERS - this.workerNum; 087 } 088 089 synchronized (jobs) { 090 initializeWokers(addedNum); 091 this.workerNum = this.workerNum + addedNum; 092 } 093 } 094 095 @Override 096 public void removeWorker(int workerNum) { 097 if (workerNum >= this.workerNum) { 098 throw new IllegalArgumentException( 099 "can not remove beyond workerNum. now num is " 100 + this.workerNum); 101 } 102 103 synchronized (jobs) { 104 int count = 0; 105 while (count < workerNum) { 106 workers.get(count).shutdown(); 107 count++; 108 } 109 110 this.workerNum = this.workerNum - count; 111 } 112 } 113 114 @Override 115 public int getJobSize() { 116 return jobs.size(); 117 } 118 119 /** 120 * 初始化线程工作者 121 */ 122 private void initializeWokers(int num) { 123 for (int i = 0; i < num; i++) { 124 Worker worker = new Worker(); 125 workers.add(worker); 126 127 Thread thread = new Thread(worker); 128 thread.start(); 129 } 130 } 131 132 /** 133 * <pre> 134 * 工作者,负责消费任务 135 * 136 * </pre> 137 */ 138 class Worker implements Runnable { 139 /** 140 * 工作 141 */ 142 private volatile boolean running = true; 143 144 @Override 145 public void run() { 146 while (running) { 147 148 Job job = null; 149 synchronized (jobs) { 150 // 如果工作者列表是空的,那么就wait,放弃cpu执行占用 151 while (jobs.isEmpty()) { 152 try { 153 jobs.wait(); 154 } catch (InterruptedException ex) { 155 Thread.currentThread().interrupt(); 156 return; 157 } 158 } 159 160 // 取出一个Job 161 job = jobs.removeFirst(); 162 } 163 if (job != null) { 164 try { 165 job.run(); 166 } catch (Exception ex) { 167 ex.printStackTrace(); 168 } 169 } 170 } 171 } 172 173 public void shutdown() { 174 running = false; 175 } 176 177 } 178 } 上面的逻辑中,客户端调用execute时,会不断的向jobs中添加工作,而每个Worker在不断将jobs取出并执行,当jobs为空时,Worker进行阻塞状态。 这里有一点需要注意,也就是execute时,使用了notify,而不是notifyAll,因为我能够确定有消费者Worker被唤醒,这时使用notify将会比notifyAll获得更小的开销,这在高性能的并发处理中是非常重要的。 测试用例 测试提交工作 01 @Test 02 public void testExe() { 03 for (int i = 0; i < 1000; i++) { 04 threadPoolNoPrint.execute(new NoPrint()); 05 } 06 07 sleep(20); 08 09 System.out.println(threadPoolNoPrint.getJobSize()); 10 11 sleep(20); 12 13 System.out.println(threadPoolNoPrint.getJobSize()); 14 15 sleep(20); 16 17 System.out.println(threadPoolNoPrint.getJobSize()); 18 19 sleep(5000); 20 21 System.out.println(threadPoolNoPrint.getJobSize()); 22 } 执行结果: 991 985 980 可以看到提交后,每个20ms,查看已经堆积的任务,发现在不断的减少。 测试增加工作线程 01 @Test 02 public void addExe() { 03 for (int i = 0; i < 1000; i++) { 04 threadPoolNoPrint.execute(new NoPrint()); 05 } 06 07 sleep(20); 08 09 System.out.println(threadPoolNoPrint.getJobSize()); 10 11 sleep(20); 12 13 System.out.println(threadPoolNoPrint.getJobSize()); 14 15 sleep(20); 16 17 System.out.println(threadPoolNoPrint.getJobSize()); 18 19 System.out.println("============Add Worker============"); 20 21 threadPoolNoPrint.addWorkers(5); 22 23 System.out.println(threadPoolNoPrint.getJobSize()); 24 25 sleep(20); 26 27 System.out.println(threadPoolNoPrint.getJobSize()); 28 29 sleep(20); 30 31 System.out.println(threadPoolNoPrint.getJobSize()); 32 33 sleep(5000); 34 35 System.out.println(threadPoolNoPrint.getJobSize()); 36 } 执行结果: 990 985 980 ============Add Worker============ 980 967 955 在起初的5个线程运作时,可以看到每隔一段时间,消耗了5个工作,而增加了线程(并发度增加)后,没个间隔消耗量12个左右工作,提升了1倍多。 减少工作线程 01 @Test 02 public void reduceExe() { 03 for (int i = 0; i < 1000; i++) { 04 threadPoolNoPrint.execute(new NoPrint()); 05 } 06 07 sleep(20); 08 09 System.out.println(threadPoolNoPrint.getJobSize()); 10 11 sleep(20); 12 13 System.out.println(threadPoolNoPrint.getJobSize()); 14 15 sleep(20); 16 17 System.out.println(threadPoolNoPrint.getJobSize()); 18 19 System.out.println("============Add Worker============"); 20 21 threadPoolNoPrint.addWorkers(5); 22 23 System.out.println(threadPoolNoPrint.getJobSize()); 24 25 sleep(20); 26 27 System.out.println(threadPoolNoPrint.getJobSize()); 28 29 sleep(20); 30 31 System.out.println(threadPoolNoPrint.getJobSize()); 32 33 System.out.println("==============Reduce Worker=============="); 34 35 threadPoolNoPrint.removeWorker(7); 36 37 System.out.println(threadPoolNoPrint.getJobSize()); 38 39 sleep(20); 40 41 System.out.println(threadPoolNoPrint.getJobSize()); 42 43 sleep(20); 44 45 System.out.println(threadPoolNoPrint.getJobSize()); 46 47 sleep(5000); 48 49 System.out.println(threadPoolNoPrint.getJobSize()); 50 51 } 执行结果: 990 985 980 ============Add Worker============ 980 965 955 ==============Reduce Worker============== 955 952 949 可以看到5个线程开始执行,然后增加到了10个,最后减少到了3个,执行的单位时间完成工作出现了先上扬再回落的过程。 关闭线程池 01 @Test 02 public void gracefulShutdown() { 03 for (int i = 0; i < 1000; i++) { 04 threadPoolPrint.execute(new Print()); 05 } 06 07 sleep(50); 08 09 threadPoolPrint.shutdown(); 10 } 执行结果: 01 Thread[Thread-1,5,main], time=1347615521118 02 Thread[Thread-3,5,main], time=1347615521118 03 Thread[Thread-0,5,main], time=1347615521118 04 Thread[Thread-4,5,main], time=1347615521118 05 Thread[Thread-2,5,main], time=1347615521118 06 Thread[Thread-1,5,main], time=1347615521124 07 Thread[Thread-4,5,main], time=1347615521124 08 Thread[Thread-0,5,main], time=1347615521124 09 Thread[Thread-3,5,main], time=1347615521124 10 Thread[Thread-2,5,main], time=1347615521124 11 Thread[Thread-1,5,main], time=1347615521129 12 Thread[Thread-3,5,main], time=1347615521129 13 Thread[Thread-0,5,main], time=1347615521129 14 Thread[Thread-4,5,main], time=1347615521129 15 Thread[Thread-2,5,main], time=1347615521129 16 Thread[Thread-1,5,main], time=1347615521134 17 Thread[Thread-3,5,main], time=1347615521134 18 Thread[Thread-0,5,main], time=1347615521135 19 Thread[Thread-4,5,main], time=1347615521135 20 Thread[Thread-2,5,main], time=1347615521135 21 Thread[Thread-1,5,main], time=1347615521140 22 Thread[Thread-3,5,main], time=1347615521140 23 Thread[Thread-0,5,main], time=1347615521140 24 Thread[Thread-4,5,main], time=1347615521140 25 Thread[Thread-2,5,main], time=1347615521140 26 Thread[Thread-1,5,main], time=1347615521145 27 Thread[Thread-3,5,main], time=1347615521145 28 Thread[Thread-0,5,main], time=1347615521145 29 Thread[Thread-4,5,main], time=1347615521145 30 Thread[Thread-2,5,main], time=1347615521145 31 Thread[Thread-1,5,main], time=1347615521150 32 Thread[Thread-3,5,main], time=1347615521150 33 Thread[Thread-0,5,main], time=1347615521150 34 Thread[Thread-4,5,main], time=1347615521151 35 Thread[Thread-2,5,main], time=1347615521151 36 Thread[Thread-1,5,main], time=1347615521155 37 Thread[Thread-3,5,main], time=1347615521156 38 Thread[Thread-0,5,main], time=1347615521156 39 Thread[Thread-4,5,main], time=1347615521156 40 Thread[Thread-2,5,main], time=1347615521156 41 Thread[Thread-1,5,main], time=1347615521161 42 Thread[Thread-3,5,main], time=1347615521161 43 Thread[Thread-0,5,main], time=1347615521161 44 Thread[Thread-2,5,main], time=1347615521161 45 Thread[Thread-4,5,main], time=1347615521161 46 Thread[Thread-1,5,main], time=1347615521166 47 Thread[Thread-3,5,main], time=1347615521166 48 Thread[Thread-0,5,main], time=1347615521166 49 Thread[Thread-4,5,main], time=1347615521167 50 Thread[Thread-2,5,main], time=1347615521166 可以看到1000个工作,在50ms后消耗了上图所示的工作,而非1000个全部,整个关闭过程没有异常发生,俗称“优雅关闭”。 一个基于线程池的简单文本web服务器 我们将一个Http请求作为一个工作,提交到线程池中,然后由线程池的工作者来完成对请求的分析以及响应的回复,这样做能够极大的提升服务的效率,这也是传统、经典的Web服务器运作方式。 001 /** 002 * 003 */ 004 package com.murdock.books.multithread.example; 005 006 import java.io.BufferedReader; 007 import java.io.FileInputStream; 008 import java.io.InputStreamReader; 009 import java.io.PrintWriter; 010 import java.net.ServerSocket; 011 import java.net.Socket; 012 013 /** 014 * <pre> 015 * 请求: 016 * GET /p/1845211588 HTTP/1.1 017 * 018 * 响应: 019 * HTTP/1.1 200 OK 020 * Date: Fri, 14 Sep 2012 11:39:26 GMT 021 * Content-Type: text/html; charset=GBK 022 * Transfer-Encoding: chunked 023 * Connection: Keep-Alive 024 * Vary: Accept-Encoding 025 * tracecode: 23665957650539960842091419, 23665874971177305354091419 026 * Content-Encoding: gzip 027 * Server: Apache 028 * </pre> 029 * 030 * @author weipeng 031 * 032 */ 033 public class HttpTextServer { 034 035 static ThreadPool<TextHandler> threadPool = new DefaultThreadPool<TextHandler>( 036 10); 037 038 static String basePath = "/home/weipeng/project/multithread"; 039 040 public static void main(String[] args) throws Exception { 041 ServerSocket ss = new ServerSocket(8080); 042 Socket socket = null; 043 while ((socket = ss.accept()) != null) { 044 threadPool.execute(new TextHandler(socket)); 045 } 046 047 ss.close(); 048 } 049 050 static class TextHandler implements Runnable { 051 052 private Socket socket; 053 054 public TextHandler(Socket socket) { 055 this.socket = socket; 056 } 057 058 @Override 059 public void run() { 060 String line = null; 061 BufferedReader br = null; 062 BufferedReader reader = null; 063 PrintWriter out = null; 064 try { 065 reader = new BufferedReader(new InputStreamReader( 066 socket.getInputStream())); 067 068 String header = reader.readLine(); 069 String filePath = basePath + header.split(" ")[1]; 070 071 br = new BufferedReader(new InputStreamReader( 072 new FileInputStream(filePath))); 073 out = new PrintWriter(socket.getOutputStream()); 074 075 out.println("HTTP/1.1 200 OK"); 076 out.println("Content-Type: text/html; charset=UTF-8"); 077 out.println("Server: SimpleMolly"); 078 out.println(""); 079 080 while ((line = br.readLine()) != null) { 081 out.println(line); 082 } 083 out.println("CURRENT-THREAD ===> " + Thread.currentThread()); 084 out.flush(); 085 } catch (Exception ex) { 086 ex.printStackTrace(); 087 } finally { 088 if (br != null) { 089 try { 090 br.close(); 091 } catch (Exception ex) { 092 ex.printStackTrace(); 093 } finally { 094 br = null; 095 } 096 } 097 098 if (reader != null) { 099 try { 100 reader.close(); 101 } catch (Exception ex) { 102 ex.printStackTrace(); 103 } finally { 104 reader = null; 105 } 106 } 107 108 if (out != null) { 109 try { 110 out.close(); 111 } catch (Exception ex) { 112 ex.printStackTrace(); 113 } finally { 114 out = null; 115 } 116 } 117 118 if (socket != null) { 119 try { 120 socket.close(); 121 } catch (Exception ex) { 122 ex.printStackTrace(); 123 } finally { 124 socket = null; 125 } 126 } 127 } 128 } 129 } 130 } 实现简介: (1)服务端监听8080端口; (2)当一个socket链接上来后,将其放置入线程池; (3)线程池中的worker也就是TextHandler从socket中获取需要访问的资源; (4)根据资源的路径找到资源并读取同时输出到socket的输出流; (5)关闭输出流和相关资源。 访问效果: 第一次访问: 10 第二次访问:转载地址:http://yrxei.baihongyu.com/