Tomcat源码分析—线程池原理

Tomcat源码分析—线程池原理

这篇文章是我在2011年时写的,现转到我自己的博客上

Tomcat线程池目前出现过三种,第一个是5.0的线程池模型,这个线程池目前在6.x的版本还存在,主要是用于AJP的,
第二个是5.5.x时代使用了一仲线程池
第三个是6.x版本的线程池,实际上这个不是tomcat的线程池了,tomcat使用的是JDK的线程池ThreadPoolExecutor

首先来分析一下最老的版本5.0使用的线程池
线程池只有一个类,ThreadPool,它里面包含了两个内部类一个接口
静态内部类 ControlRunnable,这是用来运行工作线程的
静态内部类  MonitorRunnable,这是监控线程,用来回收空闲的线程
接口 ThreadPoolListener,这个很奇怪,我全文搜索了,只有ThreadPool的一些方法使用了这个接口,但是它却没有实现累

首先来看看监控类 MonitorRunnable,它是通过ControlRunnable类来启动的,它的run()方法如下:

        public void run() {
            while(true) {
                try {
                    // Sleep for a while.
                    synchronized(this) {
                        this.wait(interval);
                    }
                    // Check if should terminate.
                    // termination happens when the pool is shutting down.
                    if(shouldTerminate) {
                        break;
                    }
                    // Harvest idle threads.
                    p.checkSpareControllers();
                } catch(Throwable t) {
                     ThreadPool.log.error("Unexpected exception", t);
                }
            }
        }
                                                                                     

interval在创建的时候就已经指定是,是60 * 1000,也就是会睡眠60秒,这个类最主要的工作是每过60秒检查一下是否有空闲的线程,就是空闲线程数 是否大于 最大空闲线程数,如果是的话,就回收一些线程,保证最多空闲的线程数=最大空闲线程数
checkSpareControllers()内容如下:

    protected synchronized void checkSpareControllers() {
        if(stopThePool) {
            return;
        }
        if((currentThreadCount - currentThreadsBusy) > maxSpareThreads) {
            int toFree = currentThreadCount - currentThreadsBusy -maxSpareThreads;
            for(int i = 0 ; i < toFree ; i++) {
                ControlRunnable c = pool[currentThreadCount - currentThreadsBusy - 1];
                c.terminate();
                pool[currentThreadCount - currentThreadsBusy - 1] = null;
                currentThreadCount --;
            }
        }
    }
                                                                              

再来看看运行工作线程 ControlRunnable,它的run()方法如下:

        public void run() {
            boolean _shouldRun = false;
            boolean _shouldTerminate = false;
            ThreadPoolRunnable _toRun = null;
            try {
                while (true) {
                    try {
                        /* Wait for work. */
                        synchronized (this) {
                            while (!shouldRun && !shouldTerminate) {
                                this.wait();
                            }
                            _shouldRun = shouldRun;
                            _shouldTerminate = shouldTerminate;
                            _toRun = toRun;
                        }
                        if (_shouldTerminate) {
                            if (ThreadPool.log.isDebugEnabled())
                                ThreadPool.log.debug("Terminate");
                            break;
                        }
                        /* Check if should execute a runnable.  */
                        try {
                            if (noThData) {
                                if (_toRun != null) {
                                    Object thData[] = _toRun.getInitData();
                                    t.setThreadData(p, thData);
                                    if (ThreadPool.log.isDebugEnabled())
                                        ThreadPool.log.debug("Getting new thread data");
                                }
                                noThData = false;
                            }

                            if (_shouldRun) {
                                if (_toRun != null) {
                                    _toRun.runIt(t.getThreadData(p));
                                } else if (toRunRunnable != null) {
                                    toRunRunnable.run();
                                } else {
                                    if (ThreadPool.log.isDebugEnabled())
                                    ThreadPool.log.debug("No toRun ???");
                                }
                            }
                        } catch (Throwable t) {
                            ThreadPool.log.error(sm.getString
                                ("threadpool.thread_error", t, toRun.toString()));
                            /*
                             * The runnable throw an exception (can be even a ThreadDeath),
                             * signalling that the thread die.
                             *
                            * The meaning is that we should release the thread from
                            * the pool.
                            */
                            _shouldTerminate = true;
                            _shouldRun = false;
                            p.notifyThreadEnd(this);
                        } finally {
                            if (_shouldRun) {
                                shouldRun = false;
                                /*
                                * Notify the pool that the thread is now idle.
                                 */
                                p.returnController(this);
                            }
                        }
                        /*
                        * Check if should terminate.
                        * termination happens when the pool is shutting down.
                        */
                        if (_shouldTerminate) {
                            break;
                        }
                    } catch (InterruptedException ie) { /* for the wait operation */
                        // can never happen, since we don't call interrupt
                        ThreadPool.log.error("Unexpected exception", ie);
                    }
                }
            } finally {
                p.removeThread(Thread.currentThread());
            }
        }
                                                                                                   

这里有一个ThreadPoolRunnable,这是一个接口,里面有一个runIt()方法,可以把它当做Runnable来看,凡是实现了这个接口的类,工作线程都会负责调用这个类的runIt()执行。
ControlRunnable#run()方法的最开始是让这个线程wait(),因为一开始是没有数据的所以就一直等待,直到执行了ControlRunnable#runIt()方法之后,就会唤醒一个等待的线程:

        public synchronized void runIt(ThreadPoolRunnable toRun) {
        this.toRun = toRun;
        // Do not re-init, the whole idea is to run init only once per
        // thread - the pool is supposed to run a single task, that is
        // initialized once.
            // noThData = true;
            shouldRun = true;
            this.notify();
        }
                                                                               

当调用了这个方法之后,ControlRunnable#run()就会继续执行下去,之后会做一个初始化数据的工作,也就是if (noThData) 那个判断,工作线程里每个运行的线程实际上扩展了Thread,用的是ThreadWithAttribute,它可以将一些数据绑定在当前线程上,所以这个初始化数据是从ThreadPoolRunnable的实现类上取出数据,绑定到当前线程上。
初始化完成之后就开始真正运行了,如果当前的类继承自Runnalbe或者ThreadPoolRunnable都会被工作线程调用,当工作线程执行完后会将shouldTerminate标记设置为false,这样当下次循环的时候就会继续wait()了。最后将自己返还给工作线程数组,然后notify(),这是唤醒其他线程,因为可能因为当前的线程数超过了最大线程了,被迫等待,所以这里需要有一个唤醒机制告诉工作线程数组,有空闲的线程可以调用了。
如果发生错误了就退出循环,当退出循环后会将自己从线程map中删除,注意是线程map,不是工作线程数组,所以线程池中目前的线程数不会减少,只有等到监控线程检查完后才有可能减少。

当执行ThreadPool#runIt()方法时,会将一个ThreadPoolRunnable的实现类交给线程池运行,线程池首先需要从数组中找出一个线程,然后调用运行这个线程。
runIt()方法如下:

    public void runIt(ThreadPoolRunnable r) {
        if(null == r) {
            throw new NullPointerException();
        }
        ControlRunnable c = findControlRunnable();
        c.runIt(r);
    }
                                                      

调用ControlRunnable#runIt(),这样就将ThreadPoolRunnable的实现类交给了线程池,然后会唤醒一个线程。注意这里有一步很重要,在ControlRunnable#runIt()里,这是一个同步方法,执行了语句:
this.toRun = toRun;
这样它拿到的就是成员变量toRun,因为另一个线程赋值的时候是讲ThreadPoolRunnable的实现赋给了ThreadPool的成员变量toRun,所以线程池数组里每一个正在等待的ControlRunnable都能拿到这个值,还记得happend before原则吗,这个赋值和获取值都是同步的,所以肯定没有问题,任意唤醒的一个线程在它的同步块中会拿到这个toRun:

synchronized (this) {
	while (!shouldRun && !shouldTerminate) {
		this.wait();
	}
	_shouldRun = shouldRun;
	_shouldTerminate = shouldTerminate;
	_toRun = toRun;
}
                                                    

如果不是赋给ThreadPool的成员变量,而是随便赋给一个线程池成员ControlRunnable,那就有问题了,比如赋给的是A,但是唤醒是随意的,可能唤醒了B,这样B就没有toRun的值导致无法运行,而由了这个同步的赋值,同步的取值就没问题了。

下面看一下findControlRunnable()方法:

    private ControlRunnable findControlRunnable() {
        ControlRunnable c=null;
        if ( stopThePool ) {
            throw new IllegalStateException();
        }
        // Obtain a free thread from the pool.
        synchronized(this) {
            while (currentThreadsBusy == currentThreadCount) {
                 // All threads are busy
                if (currentThreadCount < maxThreads) {
                    // Not all threads were open,
                    // Open new threads up to the max number of idel threads
                    int toOpen = currentThreadCount + minSpareThreads;
                    openThreads(toOpen);
                } else {
                    logFull(log, currentThreadCount, maxThreads);
                    // Wait for a thread to become idel.
                    try {
                        this.wait();
                    }
                    // was just catch Throwable -- but no other
                    // exceptions can be thrown by wait, right?
                    // So we catch and ignore this one, since
                    // it'll never actually happen, since nowhere
                    // do we say pool.interrupt().
                    catch(InterruptedException e) {
                        log.error("Unexpected exception", e);
                    }
            if( log.isDebugEnabled() ) {
            log.debug("Finished waiting: CTC="+currentThreadCount +", CTB=" + currentThreadsBusy);
                    }
                    // Pool was stopped. Get away of the pool.
                    if( stopThePool) {
                        break;
                    }
                }
            }
            // Pool was stopped. Get away of the pool.
            if(0 == currentThreadCount || stopThePool) {
                throw new IllegalStateException();
            }                   
            // If we are here it means that there is a free thread. Take it.
            int pos = currentThreadCount - currentThreadsBusy - 1;
            c = pool[pos];
            pool[pos] = null;
            currentThreadsBusy++;
        }
        return c;
    }
                                                                                                       

这段代码的意思是坚持当前繁忙线程如果==当前线程,就是说所有的线程都在忙着执行那么会新开启一些线程,但如果当前的现在已经等于最大线程数了,那么就wait(),等有线程执行完了会notify()的。如果满足条件就新创建一些线程,最后将线程数组中的pos位置设置为null,表示这个位置的工作线程已经被取走了,然后返回这个工作线程ControlRunnable。

再来看看它的openThreads():

    protected void openThreads(int toOpen) {
        if(toOpen > maxThreads) {
            toOpen = maxThreads;
        }
        for(int i = currentThreadCount ; i < toOpen ; i++) {
            pool[i - currentThreadsBusy] = new ControlRunnable(this);
        }
        currentThreadCount = toOpen;
    }
                                                                          

这里会创建若干个新ControlRunnable

最后看看线程池的启动和停止,首先是ThreadPool#start():

    public synchronized void start() {
        stopThePool=false;
        currentThreadCount  = 0;
        currentThreadsBusy  = 0;
        adjustLimits();
        pool = new ControlRunnable[maxThreads];
        openThreads(minSpareThreads);
        if (maxSpareThreads < maxThreads) {
            monitor = new MonitorRunnable(this);
        }
    }
                                                         

可以看到,它创建了ControlRunnable和监控线程,而ControlRunnable是以数组的形式保存的,所以这个线程池使用的是数组的形式,这样比较高效,启动的时候就已经设置好数组的大小了,运行的时候就不会再改变了。这里的adjustLimits()主要是调整最大线程,最大空闲线程数的边界值,这里就不介绍了。

下面是线程池的停止方法:

    public synchronized void shutdown() {
        if(!stopThePool) {
            stopThePool = true;
            if (monitor != null) {
                monitor.terminate();
                monitor = null;
            }
            for(int i = 0; i < currentThreadCount - currentThreadsBusy; i++) {
                try {
                    pool[i].terminate();
                } catch(Throwable t) {
             /*
             * Do nothing... The show must go on, we are shutting
             * down the pool and nothing should stop that.
             */
            log.error("Ignored exception while shutting down thread pool", t);
                }
            }
            currentThreadsBusy = currentThreadCount = 0;
            pool = null;
            notifyAll();
        }
    }
                                                                                                                                                                                                                                                 

这里会调用监控线程和工作线程的terminate()方法,让停止标记置为true,从而退出执行关闭线程。

有个问题,ThreadPool#runIt()是怎么被调用的?
启动的时候就会有一个AJP线程执行监听,它是SocketAcceptor,继承了ThreadPoolRunnable。
然后它会调用ChannelSocket#acceptConnections(),这个方法内容如下:

void acceptConnections() {
if (log.isDebugEnabled())
log.debug(“Accepting ajp connections on ” + port);
while (running) {
try {
MsgContext ep = createMsgContext(packetSize);
ep.setSource(this);
ep.setWorkerEnv(wEnv);
this.accept(ep);
if (!running)
break;
// Since this is a long-running connection, we don’t care
// about the small GC
SocketConnection ajpConn = new SocketConnection(this, ep);
tp.runIt(ajpConn);
} catch (Exception ex) {
if (running)
log.warn(“Exception executing accept”, ex);
}
}
}

 

这里可以看到,是AJP的接收线程,接收到数据之后,会调用tp.runIt(ajpConn),将SocketConnection交给线程池运行,同时也会唤醒一个正在wait()的线程池,所以接收和处理不是同一个线程,但是它们都运行在线程池中。

到这里,Tomcat5.0模式的线程池就介绍完了。

关于Tomcat6.0的线程池模型,也就是JDK的ThreadPoolExecutor,请看这篇文章:
http://rdc.taobao.com/team/jm/archives/595

4 次阅读

发表评论

电子邮件地址不会被公开。