public abstract class

ThreadPool

extends Object
// ThreadPool.java

/**
 *      Copyright (C) 2008 10gen Inc.
 *  
 *   Licensed under the Apache License, Version 2.0 (the "License");
 *   you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
 * 
 *      http://www.apache.org/licenses/LICENSE-2.0
 * 
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
 */

package com.mongodb.util;

import java.util.List;
import java.util.Vector;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;

public abstract class ThreadPool<T> {

    /** Initializes a new thread pool with a given name and number of threads.
     * @param name identifying name
     * @param numThreads the number of threads allowed in the pool
     */
    public ThreadPool( String name , int numThreads ){
        this( name , numThreads , Integer.MAX_VALUE );
    }

    /** Initializes a new thread pool with a given name, number of threads, and queue size.
     * @param name identifying name
     * @param numThreads the number of threads allowed in the pool
     * @param maxQueueSize the size of the pool entry queue
     */
    public ThreadPool( String name , int numThreads , int maxQueueSize ){
        _name = name;
        _maxThreads = numThreads;
        _queue = new LinkedBlockingQueue<T>( maxQueueSize );
        _myThreadGroup = new MyThreadGroup();
        _threads.add( new MyThread() );
    }

    /** Handles a given object.
     * @param t the object to handle
     * @throws Exception
     */
    public abstract void handle( T t )
        throws Exception ;

    /** Handles a given object and exception.
     * @param t the object to handle
     * @param e  the exception to handle
     */
    public abstract void handleError( T t , Exception e );

    /** Returns the size of the pool's queue.
     * @return pool size
     */
    public int queueSize(){
        return _queue.size();
    }

    /** Adds a new object to the pool, if possible.
     * @param t the object to be added
     * @return if the object was successfully added
     */
    public boolean offer( T t ){
        if ( ( _queue.size() > 0 || _inProgress.get() == _threads.size() ) && 
             _threads.size() < _maxThreads )
            _threads.add( new MyThread() );
        return _queue.offer( t );
    }

    public int inProgress(){
	return _inProgress.get();
    }

    public int numThreads(){
        return _threads.size();
    }

    class MyThreadGroup extends ThreadGroup {
        MyThreadGroup(){
            super( "ThreadPool.MyThreadGroup:" + _name );
        }

        public void uncaughtException( Thread t, Throwable e ){
            for ( int i=0; i<_threads.size(); i++ ){
                if ( _threads.get( i ) == t ){
                    _threads.remove( i );
                    break;
                }
            }
        }
    }

    class MyThread extends Thread {
        MyThread(){
            super( _myThreadGroup , "ThreadPool.MyThread:" + _name + ":" + _threads.size() );
            setDaemon( true );
            start();
        }

        public void run(){
            while ( true ){
                T t = null;

                try {
                    t = _queue.take();
                }
                catch ( InterruptedException ie ){
                }

                if ( t == null )
                    continue;

                try {
                    _inProgress.incrementAndGet();
                    handle( t );
                }
                catch ( Exception e ){
                    handleError( t , e );
                }
                finally {
                    _inProgress.decrementAndGet();
                }
            }
        }
    }

    final String _name;
    final int _maxThreads;

    private final AtomicInteger _inProgress = new AtomicInteger(0);
    private final List<MyThread> _threads = new Vector<MyThread>();
    private final BlockingQueue<T> _queue;
    private final MyThreadGroup _myThreadGroup;
}