// DBPortPool.java /** * Copyright (C) 2008 10gen Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.mongodb; import java.lang.management.ManagementFactory; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import javax.management.JMException; import javax.management.MBeanServer; import javax.management.MalformedObjectNameException; import javax.management.ObjectName; import com.mongodb.util.SimplePool; public class DBPortPool extends SimplePool<DBPort> { static class Holder { Holder( MongoOptions options ){ _options = options; { MBeanServer temp = null; try { temp = ManagementFactory.getPlatformMBeanServer(); } catch ( Throwable t ){ } _server = temp; } } DBPortPool get( ServerAddress addr ){ DBPortPool p = _pools.get( addr ); if (p != null) return p; synchronized (_pools) { p = _pools.get( addr ); if (p != null) { return p; } p = new DBPortPool( addr , _options ); _pools.put( addr , p); if ( _server != null ){ try { ObjectName on = createObjectName( addr ); if ( _server.isRegistered( on ) ){ _server.unregisterMBean( on ); Bytes.LOGGER.log( Level.INFO , "multiple Mongo instances for same host, jmx numbers might be off" ); } _server.registerMBean( p , on ); } catch ( JMException e ){ Bytes.LOGGER.log( Level.WARNING , "jmx registration error: " + e + " continuing..." ); } catch ( java.security.AccessControlException e ){ Bytes.LOGGER.log( Level.WARNING , "jmx registration error: " + e + " continuing..." ); } } } return p; } void close(){ synchronized ( _pools ){ for ( DBPortPool p : _pools.values() ){ p.close(); try { ObjectName on = createObjectName( p._addr ); if ( _server.isRegistered( on ) ){ _server.unregisterMBean( on ); } } catch ( JMException e ){ Bytes.LOGGER.log( Level.WARNING , "jmx de-registration error, continuing" , e ); } } } } private ObjectName createObjectName( ServerAddress addr ) throws MalformedObjectNameException { String name = "com.mongodb:type=ConnectionPool,host=" + addr.toString().replace( ":" , ",port=" ) + ",instance=" + _serial; if ( _options.description != null ) name += ",description=" + _options.description; return new ObjectName( name ); } final MongoOptions _options; final Map<ServerAddress,DBPortPool> _pools = Collections.synchronizedMap( new HashMap<ServerAddress,DBPortPool>() ); final MBeanServer _server; final int _serial = nextSerial.incrementAndGet(); // we use this to give each Holder a different mbean name static AtomicInteger nextSerial = new AtomicInteger(0); } // ---- public static class NoMoreConnection extends MongoInternalException { private static final long serialVersionUID = -4415279469780082174L; NoMoreConnection( String msg ){ super( msg ); } } public static class SemaphoresOut extends NoMoreConnection { private static final long serialVersionUID = -4415279469780082174L; SemaphoresOut(){ super( "Out of semaphores to get db connection" ); } } public static class ConnectionWaitTimeOut extends NoMoreConnection { private static final long serialVersionUID = -4415279469780082174L; ConnectionWaitTimeOut(int timeout) { super("Connection wait timeout after " + timeout + " ms"); } } // ---- DBPortPool( ServerAddress addr , MongoOptions options ){ super( "DBPortPool-" + addr.toString() + ", options = " + options.toString() , options.connectionsPerHost , options.connectionsPerHost ); _options = options; _addr = addr; _waitingSem = new Semaphore( _options.connectionsPerHost * _options.threadsAllowedToBlockForConnectionMultiplier ); } protected long memSize( DBPort p ){ return 0; } protected int pick( int iThink , boolean couldCreate ){ final int id = System.identityHashCode(Thread.currentThread()); final int s = _availSafe.size(); for ( int i=0; i<s; i++ ){ DBPort p = _availSafe.get(i); if ( p._lastThread == id ) return i; } if ( couldCreate ) return -1; return iThink; } public DBPort get(){ DBPort port = null; if ( ! _waitingSem.tryAcquire() ) throw new SemaphoresOut(); try { port = get( _options.maxWaitTime ); } finally { _waitingSem.release(); } if ( port == null ) throw new ConnectionWaitTimeOut( _options.maxWaitTime ); port._lastThread = System.identityHashCode(Thread.currentThread()); return port; } void gotError( Exception e ){ if ( e instanceof java.nio.channels.ClosedByInterruptException || e instanceof InterruptedException ){ // this is probably a request that is taking too long // so usually doesn't mean there is a real db problem return; } if ( e instanceof java.net.SocketTimeoutException ){ // we don't want to clear the port pool for a connection timing out return; } Bytes.LOGGER.log( Level.WARNING , "emptying DBPortPool to " + getServerAddress() + " b/c of error" , e ); // force close all sockets List<DBPort> all = new ArrayList<DBPort>(); while ( true ){ DBPort temp = get(0); if ( temp == null ) break; all.add( temp ); } for ( DBPort p : all ){ p.close(); done(p); } } void close(){ clear(); } public void cleanup( DBPort p ){ p.close(); } public boolean ok( DBPort t ){ return _addr.getSocketAddress().equals( t._addr ); } protected DBPort createNew(){ return new DBPort( _addr , this , _options ); } public ServerAddress getServerAddress() { return _addr; } final MongoOptions _options; final private Semaphore _waitingSem; final ServerAddress _addr; boolean _everWorked = false; }