// SimplePool.java /** * Copyright (C) 2008 10gen Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.mongodb.util; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import javax.management.Attribute; import javax.management.AttributeList; import javax.management.DynamicMBean; import javax.management.MBeanAttributeInfo; import javax.management.MBeanInfo; public abstract class SimplePool<T> implements DynamicMBean { static final boolean TRACK_LEAKS = Boolean.getBoolean( "MONGO-TRACKLEAKS" ); static final long _sleepTime = 2; /** * See full constructor docs */ public SimplePool( String name , int maxToKeep , int maxTotal ){ this( name , maxToKeep , maxTotal , false , false ); } /** Initializes a new pool of objects. * @param name name for the pool * @param maxToKeep max to hold to at any given time. if < 0 then no limit * @param maxTotal max to have allocated at any point. if there are no more, get() will block * @param trackLeaks if leaks should be tracked */ public SimplePool( String name , int maxToKeep , int maxTotal , boolean trackLeaks , boolean debug ){ _name = name; _maxToKeep = maxToKeep; _maxTotal = maxTotal; _trackLeaks = trackLeaks || TRACK_LEAKS; _debug = debug; _mbeanInfo = new MBeanInfo( this.getClass().getName() , _name , new MBeanAttributeInfo[]{ new MBeanAttributeInfo( "name" , "java.lang.String" , "name of pool" , true , false , false ) , new MBeanAttributeInfo( "size" , "java.lang.Integer" , "total size of pool" , true , false , false ) , new MBeanAttributeInfo( "available" , "java.lang.Integer" , "total connections available" , true , false , false ) , new MBeanAttributeInfo( "inUse" , "java.lang.Integer" , "number connections in use right now" , true , false , false ) , new MBeanAttributeInfo( "everCreated" , "java.lang.Integer" , "number connections ever created" , true , false , false ) } , null , null , null ); } /** Creates a new object of this pool's type. * @return the new object. */ protected abstract T createNew(); /** * callback to determine if an object is ok to be added back to the pool or used * will be called when something is put back into the queue and when it comes out * @return true if the object is ok to be added back to pool */ public boolean ok( T t ){ return true; } /** * override this if you need to do any cleanup */ public void cleanup( T t ){} /** * @return >= 0 the one to use, -1 don't use any */ protected int pick( int iThink , boolean couldCreate ){ return iThink; } /** * call done when you are done with an object form the pool * if there is room and the object is ok will get added * @param t Object to add */ public void done( T t ){ done( t , ok( t ) ); } void done( T t , boolean ok ){ if ( _trackLeaks ){ synchronized ( _where ){ _where.remove( _hash( t ) ); } } if ( ! ok ){ synchronized ( _avail ){ _all.remove( t ); } return; } synchronized ( _avail ){ if ( _maxToKeep < 0 || _avail.size() < _maxToKeep ){ for ( int i=0; i<_avail.size(); i++ ) if ( _avail.get( i ) == t ) throw new RuntimeException( "trying to put something back in the pool that's already there" ); // if all doesn't contain it, it probably means this was cleared, so we don't want it if ( _all.contains( t ) ){ _avail.add( t ); _waiting.release(); } } else { cleanup( t ); } } } public void remove( T t ){ done( t , false ); } /** Gets an object from the pool - will block if none are available * @return An object from the pool */ public T get(){ return get(-1); } /** Gets an object from the pool - will block if none are available * @param waitTime * negative - forever * 0 - return immediately no matter what * positive ms to wait * @return An object from the pool */ public T get( long waitTime ){ final T t = _get( waitTime ); if ( t != null ){ if ( _trackLeaks ){ Throwable stack = new Throwable(); stack.fillInStackTrace(); synchronized ( _where ){ _where.put( _hash( t ) , stack ); } } } return t; } private int _hash( T t ){ return System.identityHashCode( t ); } private T _get( long waitTime ){ long totalSlept = 0; while ( true ){ synchronized ( _avail ){ boolean couldCreate = _maxTotal <= 0 || _all.size() < _maxTotal; while ( _avail.size() > 0 ){ int toTake = _avail.size() - 1; toTake = pick( toTake, couldCreate ); if ( toTake >= 0 ){ T t = _avail.remove( toTake ); if ( ok( t ) ){ _debug( "got an old one" ); return t; } _debug( "old one was not ok" ); _all.remove( t ); continue; } else if ( ! couldCreate ) { throw new IllegalStateException( "can't pick nothing if can't create" ); } break; } if ( couldCreate ){ _everCreated++; T t = createNew(); _all.add( t ); return t; } if ( _trackLeaks && _trackPrintCount++ % 200 == 0 ){ _wherePrint(); _trackPrintCount = 1; } } if ( waitTime == 0 ) return null; if ( waitTime > 0 && totalSlept >= waitTime ) return null; long start = System.currentTimeMillis(); try { _waiting.tryAcquire( _sleepTime , TimeUnit.MILLISECONDS ); } catch ( InterruptedException ie ){ } totalSlept += ( System.currentTimeMillis() - start ); } } private void _wherePrint(){ StringBuilder buf = new StringBuilder( toString() ).append( " waiting \n" ); synchronized ( _where ){ for ( Throwable t : _where.values() ){ buf.append( "--\n" ); final StackTraceElement[] st = t.getStackTrace(); for ( int i=0; i<st.length; i++ ) buf.append( " " ).append( st[i] ).append( "\n" ); buf.append( "----\n" ); } } System.out.println( buf ); } /** Clears the pool of all objects. */ protected void clear(){ synchronized( _avail ){ for ( T t : _avail ) cleanup( t ); _avail.clear(); _all.clear(); synchronized ( _where ){ _where.clear(); // is this correct } } } public int total(){ return _all.size(); } public int inUse(){ return _all.size() - _avail.size(); } public Iterator<T> getAll(){ return _all.getAll().iterator(); } public int available(){ if ( _maxTotal <= 0 ) throw new IllegalStateException( "this pool has an infinite number of things available" ); return _maxTotal - inUse(); } public int everCreated(){ return _everCreated; } private void _debug( String msg ){ if( _debug ) System.out.println( "SimplePool [" + _name + "] : " + msg ); } public int maxToKeep(){ return _maxToKeep; } public Object getAttribute(String attribute){ if ( attribute.equals( "name" ) ) return _name; if ( attribute.equals( "size" ) ) return _maxToKeep; if ( attribute.equals( "available" ) ) return available(); if ( attribute.equals( "inUse" ) ) return inUse(); if ( attribute.equals( "everCreated" ) ) return _everCreated; System.err.println( "com.mongo.util.SimplePool unknown attribute: " + attribute ); throw new RuntimeException( "unknown attribute: " + attribute ); } public AttributeList getAttributes(String[] attributes){ AttributeList l = new AttributeList(); for ( int i=0; i<attributes.length; i++ ){ String name = attributes[i]; l.add( new Attribute( name , getAttribute( name ) ) ); } return l; } public MBeanInfo getMBeanInfo(){ return _mbeanInfo; } public Object invoke(String actionName, Object[] params, String[] signature){ throw new RuntimeException( "not allowed to invoke anything" ); } public void setAttribute(Attribute attribute){ throw new RuntimeException( "not allowed to set anything" ); } public AttributeList setAttributes(AttributeList attributes){ throw new RuntimeException( "not allowed to set anything" ); } public String toString(){ StringBuilder buf = new StringBuilder(); buf.append( "pool: " ).append( _name ) .append( " maxToKeep: " ).append( _maxToKeep ) .append( " maxTotal: " ).append( _maxToKeep ) .append( " where " ).append( _where.size() ) .append( " avail " ).append( _avail.size() ) .append( " all " ).append( _all.size() ) ; return buf.toString(); } protected final String _name; protected final int _maxToKeep; protected final int _maxTotal; protected final boolean _trackLeaks; protected final boolean _debug; protected final MBeanInfo _mbeanInfo; private final List<T> _avail = new ArrayList<T>(); protected final List<T> _availSafe = Collections.unmodifiableList( _avail ); private final WeakBag<T> _all = new WeakBag<T>(); private final Map<Integer,Throwable> _where = new HashMap<Integer,Throwable>(); private final Semaphore _waiting = new Semaphore(0); private int _everCreated = 0; private int _trackPrintCount = 0; }