// DBTCPConnector.java /** * Copyright (C) 2008 10gen Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.mongodb; import com.mongodb.ReadPreference.TaggedReadPreference; import java.io.IOException; import java.net.SocketTimeoutException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Level; import java.util.logging.Logger; public class DBTCPConnector implements DBConnector { static Logger _logger = Logger.getLogger( Bytes.LOGGER.getName() + ".tcp" ); static Logger _createLogger = Logger.getLogger( _logger.getName() + ".connect" ); public DBTCPConnector( Mongo m , ServerAddress addr ) throws MongoException { _mongo = m; _portHolder = new DBPortPool.Holder( m._options ); _checkAddress( addr ); _createLogger.info( addr.toString() ); setMasterAddress(addr); _allHosts = null; _rsStatus = null; } public DBTCPConnector( Mongo m , ServerAddress ... all ) throws MongoException { this( m , Arrays.asList( all ) ); } public DBTCPConnector( Mongo m , List<ServerAddress> all ) throws MongoException { _mongo = m; _portHolder = new DBPortPool.Holder( m._options ); _checkAddress( all ); _allHosts = new ArrayList<ServerAddress>( all ); // make a copy so it can't be modified _rsStatus = new ReplicaSetStatus( m, _allHosts ); _createLogger.info( all + " -> " + getAddress() ); } public void start() { if (_rsStatus != null) _rsStatus.start(); } private static ServerAddress _checkAddress( ServerAddress addr ){ if ( addr == null ) throw new NullPointerException( "address can't be null" ); return addr; } private static ServerAddress _checkAddress( List<ServerAddress> addrs ){ if ( addrs == null ) throw new NullPointerException( "addresses can't be null" ); if ( addrs.size() == 0 ) throw new IllegalArgumentException( "need to specify at least 1 address" ); return addrs.get(0); } /** * Start a "request". * * A "request" is a group of operations in which order matters. Examples * include inserting a document and then performing a query which expects * that document to have been inserted, or performing an operation and * then using com.mongodb.Mongo.getLastError to perform error-checking * on that operation. When a thread performs operations in a "request", all * operations will be performed on the same socket, so they will be * correctly ordered. */ @Override public void requestStart(){ _myPort.get().requestStart(); } /** * End the current "request", if this thread is in one. * * By ending a request when it is safe to do so the built-in connection- * pool is allowed to reassign requests to different sockets in order to * more effectively balance load. See requestStart for more information. */ @Override public void requestDone(){ _myPort.get().requestDone(); } @Override public void requestEnsureConnection(){ _myPort.get().requestEnsureConnection(); } void _checkClosed(){ if ( _closed.get() ) throw new IllegalStateException( "this Mongo has been closed" ); } WriteResult _checkWriteError( DB db , MyPort mp , DBPort port , WriteConcern concern ) throws MongoException, IOException { CommandResult e = null; e = port.runCommand( db , concern.getCommand() ); if ( ! e.hasErr() ) return new WriteResult( e , concern ); e.throwOnError(); return null; } @Override public WriteResult say( DB db , OutMessage m , WriteConcern concern ) throws MongoException { return say( db , m , concern , null ); } @Override public WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddress hostNeeded ) throws MongoException { _checkClosed(); checkMaster( false , true ); MyPort mp = _myPort.get(); DBPort port = mp.get( true , ReadPreference.PRIMARY, hostNeeded ); try { port.checkAuth( db ); port.say( m ); if ( concern.callGetLastError() ){ return _checkWriteError( db , mp , port , concern ); } else { return new WriteResult( db , port , concern ); } } catch ( IOException ioe ){ mp.error( port , ioe ); _error( ioe, false ); if ( concern.raiseNetworkErrors() ) throw new MongoException.Network( "can't say something" , ioe ); CommandResult res = new CommandResult(port.serverAddress()); res.put( "ok" , false ); res.put( "$err" , "NETWORK ERROR" ); return new WriteResult( res , concern ); } catch ( MongoException me ){ throw me; } catch ( RuntimeException re ){ mp.error( port , re ); throw re; } finally { mp.done( port ); m.doneWithMessage(); } } @Override public Response call( DB db , DBCollection coll , OutMessage m, ServerAddress hostNeeded, DBDecoder decoder ) throws MongoException { return call( db , coll , m , hostNeeded , 2, null, decoder ); } public Response call( DB db , DBCollection coll , OutMessage m , ServerAddress hostNeeded , int retries ) throws MongoException { return call( db, coll, m, hostNeeded, retries, null, null); } @Override public Response call( DB db, DBCollection coll, OutMessage m, ServerAddress hostNeeded, int retries, ReadPreference readPref, DBDecoder decoder ) throws MongoException{ if (readPref == null) readPref = ReadPreference.PRIMARY; if (readPref == ReadPreference.PRIMARY && m.hasOption( Bytes.QUERYOPTION_SLAVEOK )) readPref = ReadPreference.SECONDARY; boolean secondaryOk = !(readPref == ReadPreference.PRIMARY); _checkClosed(); checkMaster( false, !secondaryOk ); final MyPort mp = _myPort.get(); final DBPort port = mp.get( false , readPref, hostNeeded ); Response res = null; boolean retry = false; try { port.checkAuth( db ); res = port.call( m , coll, readPref, decoder ); if ( res._responseTo != m.getId() ) throw new MongoException( "ids don't match" ); } catch ( IOException ioe ){ mp.error( port , ioe ); retry = retries > 0 && !coll._name.equals( "$cmd" ) && !(ioe instanceof SocketTimeoutException) && _error( ioe, secondaryOk ); if ( !retry ){ throw new MongoException.Network( "can't call something : " + port.host() + "/" + db, ioe ); } } catch ( RuntimeException re ){ mp.error( port , re ); throw re; } finally { mp.done( port ); } if (retry) return call( db , coll , m , hostNeeded , retries - 1 , readPref, decoder ); ServerError err = res.getError(); if ( err != null && err.isNotMasterError() ){ checkMaster( true , true ); if ( retries <= 0 ){ throw new MongoException( "not talking to master and retries used up" ); } return call( db , coll , m , hostNeeded , retries -1, readPref, decoder ); } m.doneWithMessage(); return res; } public ServerAddress getAddress(){ DBPortPool pool = _masterPortPool; return pool != null ? pool.getServerAddress() : null; } /** * Gets the list of seed server addresses * @return */ public List<ServerAddress> getAllAddress() { return _allHosts; } /** * Gets the list of server addresses currently seen by the connector. * This includes addresses auto-discovered from a replica set. * @return */ public List<ServerAddress> getServerAddressList() { if (_rsStatus != null) { return _rsStatus.getServerAddressList(); } ServerAddress master = getAddress(); if (master != null) { // single server List<ServerAddress> list = new ArrayList<ServerAddress>(); list.add(master); return list; } return null; } public ReplicaSetStatus getReplicaSetStatus() { return _rsStatus; } public String getConnectPoint(){ ServerAddress master = getAddress(); return master != null ? master.toString() : null; } /** * This method is called in case of an IOException. * It will potentially trigger a checkMaster() to check the status of all servers. * @param t the exception thrown * @param secondaryOk secondaryOk flag * @return true if the request should be retried, false otherwise * @throws MongoException */ boolean _error( Throwable t, boolean secondaryOk ) throws MongoException { if (_rsStatus == null) { // single server, no need to retry return false; } // the replset has at least 1 server up, try to see if should switch master // if no server is up, we wont retry until the updater thread finds one // this is to cut down the volume of requests/errors when all servers are down if ( _rsStatus.hasServerUp() ){ checkMaster( true , !secondaryOk ); } return _rsStatus.hasServerUp(); } class MyPort { DBPort get( boolean keep , ReadPreference readPref, ServerAddress hostNeeded ){ if ( hostNeeded != null ){ if (_requestPort != null && _requestPort.serverAddress().equals(hostNeeded)) { return _requestPort; } // asked for a specific host return _portHolder.get( hostNeeded ).get(); } if ( _requestPort != null ){ // we are within a request, and have a port, should stick to it if ( _requestPort.getPool() == _masterPortPool || !keep ) { // if keep is false, it's a read, so we use port even if master changed return _requestPort; } // it's write and master has changed // we fall back on new master and try to go on with request // this may not be best behavior if spec of request is to stick with same server _requestPort.getPool().done(_requestPort); _requestPort = null; } if ( !(readPref == ReadPreference.PRIMARY) && _rsStatus != null ){ // if not a primary read set, try to use a secondary // Do they want a Secondary, or a specific tag set? if (readPref == ReadPreference.SECONDARY) { ServerAddress slave = _rsStatus.getASecondary(); if ( slave != null ){ return _portHolder.get( slave ).get(); } } else if (readPref instanceof ReadPreference.TaggedReadPreference) { // Tag based read ServerAddress secondary = _rsStatus.getASecondary( ( (TaggedReadPreference) readPref ).getTags() ); if (secondary != null) return _portHolder.get( secondary ).get(); else throw new MongoException( "Could not find any valid secondaries with the supplied tags ('" + ( (TaggedReadPreference) readPref ).getTags() + "'"); } } if (_masterPortPool == null) { // this should only happen in rare case that no master was ever found // may get here at startup if it's a read, slaveOk=true, and ALL servers are down throw new MongoException("Rare case where master=null, probably all servers are down"); } // use master DBPort p = _masterPortPool.get(); if ( _inRequest ) { // if within request, remember port to stick to same server _requestPort = p; } return p; } void done( DBPort p ){ // keep request port if ( p != _requestPort ){ p.getPool().done(p); } } /** * call this method when there is an IOException or other low level error on port. * @param p * @param e */ void error( DBPort p , Exception e ){ p.close(); _requestPort = null; // _logger.log( Level.SEVERE , "MyPort.error called" , e ); // depending on type of error, may need to close other connections in pool p.getPool().gotError(e); } void requestEnsureConnection(){ if ( ! _inRequest ) return; if ( _requestPort != null ) return; _requestPort = _masterPortPool.get(); } void requestStart(){ _inRequest = true; } void requestDone(){ if ( _requestPort != null ) _requestPort.getPool().done( _requestPort ); _requestPort = null; _inRequest = false; } DBPort _requestPort; // DBPortPool _requestPool; boolean _inRequest; } void checkMaster( boolean force , boolean failIfNoMaster ) throws MongoException { if ( _rsStatus != null ){ if ( _masterPortPool == null || force ){ ReplicaSetStatus.Node master = _rsStatus.ensureMaster(); if ( master == null ){ if ( failIfNoMaster ) throw new MongoException( "can't find a master" ); } else { setMaster(master); } } } else { // single server, may have to obtain max bson size if (_maxBsonObjectSize.get() == 0) fetchMaxBsonObjectSize(); } } synchronized void setMaster(ReplicaSetStatus.Node master) { if (_closed.get()) { return; } setMasterAddress(master.getServerAddress()); _maxBsonObjectSize.set(master.getMaxBsonObjectSize()); } /** * Fetches the maximum size for a BSON object from the current master server * @return the size, or 0 if it could not be obtained */ int fetchMaxBsonObjectSize() { if (_masterPortPool == null) return 0; DBPort port = _masterPortPool.get(); try { CommandResult res = port.runCommand(_mongo.getDB("admin"), new BasicDBObject("isMaster", 1)); // max size was added in 1.8 if (res.containsField("maxBsonObjectSize")) { _maxBsonObjectSize.set(((Integer) res.get("maxBsonObjectSize")).intValue()); } else { _maxBsonObjectSize.set(Bytes.MAX_OBJECT_SIZE); } } catch (Exception e) { _logger.log(Level.WARNING, "Exception determining maxBSONObjectSize ", e); } finally { port.getPool().done(port); } return _maxBsonObjectSize.get(); } private synchronized boolean setMasterAddress(ServerAddress addr) { DBPortPool newPool = _portHolder.get( addr ); if (newPool == _masterPortPool) return false; if ( _masterPortPool != null ) _logger.log(Level.WARNING, "Master switching from " + _masterPortPool.getServerAddress() + " to " + addr); _masterPortPool = newPool; return true; } public String debugString(){ StringBuilder buf = new StringBuilder( "DBTCPConnector: " ); if ( _rsStatus != null ) { buf.append( "replica set : " ).append( _allHosts ); } else { ServerAddress master = getAddress(); buf.append( master ).append( " " ).append( master != null ? master._addr : null ); } return buf.toString(); } public void close(){ _closed.set( true ); if ( _portHolder != null ) { try { _portHolder.close(); _portHolder = null; } catch (final Throwable t) { /* nada */ } } if ( _rsStatus != null ) { try { _rsStatus.close(); _rsStatus = null; } catch (final Throwable t) { /* nada */ } } // below this will remove the myport for this thread only // client using thread pool in web framework may need to call close() from all threads _myPort.remove(); } /** * Assigns a new DBPortPool for a given ServerAddress. * This is used to obtain a new pool when the resolved IP of a host changes, for example. * User application should not have to call this method directly. * @param addr */ public void updatePortPool(ServerAddress addr) { // just remove from map, a new pool will be created lazily _portHolder._pools.remove(addr); } /** * Gets the DBPortPool associated with a ServerAddress. * @param addr * @return */ public DBPortPool getDBPortPool(ServerAddress addr) { return _portHolder.get(addr); } public boolean isOpen(){ return ! _closed.get(); } /** * Gets the maximum size for a BSON object supported by the current master server. * Note that this value may change over time depending on which server is master. * @return the maximum size, or 0 if not obtained from servers yet. */ public int getMaxBsonObjectSize() { return _maxBsonObjectSize.get(); } // expose for unit testing MyPort getMyPort() { return _myPort.get(); } private volatile DBPortPool _masterPortPool; private final Mongo _mongo; private DBPortPool.Holder _portHolder; private final List<ServerAddress> _allHosts; private ReplicaSetStatus _rsStatus; private final AtomicBoolean _closed = new AtomicBoolean(false); private final AtomicInteger _maxBsonObjectSize = new AtomicInteger(0); private ThreadLocal<MyPort> _myPort = new ThreadLocal<MyPort>(){ protected MyPort initialValue(){ return new MyPort(); } }; }