// DBPort.java /** * Copyright (C) 2008 10gen Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.mongodb; import com.mongodb.util.ThreadUtil; import java.io.BufferedInputStream; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Level; import java.util.logging.Logger; /** * represents a Port to the database, which is effectively a single connection to a server * Methods implemented at the port level should throw the raw exceptions like IOException, * so that the connector above can make appropriate decisions on how to handle. */ public class DBPort { /** * the default port */ public static final int PORT = 27017; static final boolean USE_NAGLE = false; static final long CONN_RETRY_TIME_MS = 15000; /** * creates a new DBPort * @param addr the server address */ public DBPort( ServerAddress addr ){ this( addr , null , new MongoOptions() ); } DBPort( ServerAddress addr, DBPortPool pool, MongoOptions options ){ _options = options; _sa = addr; _addr = addr.getSocketAddress(); _pool = pool; _hashCode = _addr.hashCode(); _logger = Logger.getLogger( _rootLogger.getName() + "." + addr.toString() ); _decoder = _options.dbDecoderFactory.create(); } Response call( OutMessage msg , DBCollection coll ) throws IOException{ return go( msg, coll ); } Response call( OutMessage msg , DBCollection coll , DBDecoder decoder) throws IOException{ return go( msg, coll, false, null, decoder); } Response call( OutMessage msg , DBCollection coll , ReadPreference readPref , DBDecoder decoder) throws IOException{ return go( msg, coll, false, readPref, decoder); } void say( OutMessage msg ) throws IOException { go( msg , null ); } private synchronized Response go( OutMessage msg , DBCollection coll ) throws IOException { return go( msg , coll , false, null, null ); } private synchronized Response go( OutMessage msg , DBCollection coll , DBDecoder decoder ) throws IOException{ return go( msg, coll, false, null, decoder ); } private synchronized Response go( OutMessage msg , DBCollection coll , boolean forceReponse , ReadPreference readPref, DBDecoder decoder) throws IOException { if ( _processingResponse ){ if ( coll == null ){ // this could be a pipeline and should be safe } else { // this could cause issues since we're reading data off the wire throw new IllegalStateException( "DBPort.go called and expecting a response while processing another response" ); } } _calls++; if ( _socket == null ) _open(); if ( _out == null ) throw new IllegalStateException( "_out shouldn't be null" ); try { msg.prepare(); msg.pipe( _out ); if ( _pool != null ) _pool._everWorked = true; if ( coll == null && ! forceReponse ) return null; _processingResponse = true; return new Response( _sa , coll , _in , (decoder == null ? _decoder : decoder) ); } catch ( IOException ioe ){ close(); throw ioe; } finally { _processingResponse = false; } } synchronized CommandResult getLastError( DB db , WriteConcern concern ) throws IOException{ DBApiLayer dbAL = (DBApiLayer) db; return runCommand( dbAL, concern.getCommand() ); } synchronized private Response findOne( DB db , String coll , DBObject q ) throws IOException { OutMessage msg = OutMessage.query( db._mongo , 0 , db.getName() + "." + coll , 0 , -1 , q , null ); Response res = go( msg , db.getCollection( coll ) , null ); return res; } synchronized private Response findOne( String ns , DBObject q ) throws IOException{ OutMessage msg = OutMessage.query( null , 0 , ns , 0 , -1 , q , null ); Response res = go( msg , null , true, null, null ); return res; } synchronized CommandResult runCommand( DB db , DBObject cmd ) throws IOException { Response res = findOne( db , "$cmd" , cmd ); return convertToCommandResult(cmd, res); } private CommandResult convertToCommandResult(DBObject cmd, Response res) { if ( res.size() == 0 ) return null; if ( res.size() > 1 ) throw new MongoInternalException( "something is wrong. size:" + res.size() ); DBObject data = res.get(0); if ( data == null ) throw new MongoInternalException( "something is wrong, no command result" ); CommandResult cr = new CommandResult(cmd, res.serverUsed()); cr.putAll( data ); return cr; } synchronized CommandResult tryGetLastError( DB db , long last, WriteConcern concern) throws IOException { if ( last != _calls ) return null; return getLastError( db , concern ); } /** * makes sure that a connection to the server has been opened * @throws IOException */ public synchronized void ensureOpen() throws IOException { if ( _socket != null ) return; _open(); } boolean _open() throws IOException { long sleepTime = 100; long maxAutoConnectRetryTime = CONN_RETRY_TIME_MS; if (_options.maxAutoConnectRetryTime > 0) { maxAutoConnectRetryTime = _options.maxAutoConnectRetryTime; } final long start = System.currentTimeMillis(); while ( true ){ IOException lastError = null; try { _socket = _options.socketFactory.createSocket(); _socket.connect( _addr , _options.connectTimeout ); _socket.setTcpNoDelay( ! USE_NAGLE ); _socket.setKeepAlive( _options.socketKeepAlive ); _socket.setSoTimeout( _options.socketTimeout ); _in = new BufferedInputStream( _socket.getInputStream() ); _out = _socket.getOutputStream(); return true; } catch ( IOException ioe ){ lastError = new IOException( "couldn't connect to [" + _addr + "] bc:" + ioe ); _logger.log( Level.INFO , "connect fail to : " + _addr , ioe ); close(); } if ( ! _options.autoConnectRetry || ( _pool != null && ! _pool._everWorked ) ) throw lastError; long sleptSoFar = System.currentTimeMillis() - start; if ( sleptSoFar >= maxAutoConnectRetryTime ) throw lastError; if ( sleepTime + sleptSoFar > maxAutoConnectRetryTime ) sleepTime = maxAutoConnectRetryTime - sleptSoFar; _logger.severe( "going to sleep and retry. total sleep time after = " + ( sleptSoFar + sleptSoFar ) + "ms this time:" + sleepTime + "ms" ); ThreadUtil.sleep( sleepTime ); sleepTime *= 2; } } @Override public int hashCode(){ return _hashCode; } /** * returns a String representation of the target host * @return */ public String host(){ return _addr.toString(); } /** * @return the server address for this port */ public ServerAddress serverAddress() { return _sa; } @Override public String toString(){ return "{DBPort " + host() + "}"; } @Override protected void finalize() throws Throwable{ super.finalize(); close(); } /** * closes the underlying connection and streams */ protected void close(){ _authed.clear(); if ( _socket != null ){ try { _socket.close(); } catch ( Exception e ){ // don't care } } _in = null; _out = null; _socket = null; } void checkAuth( DB db ) throws IOException { if ( db._username == null ){ if ( db._name.equals( "admin" ) ) return; checkAuth( db._mongo.getDB( "admin" ) ); return; } if ( _authed.containsKey( db ) ) return; CommandResult res = runCommand( db , new BasicDBObject( "getnonce" , 1 ) ); res.throwOnError(); DBObject temp = db._authCommand( res.getString( "nonce" ) ); res = runCommand( db , temp ); res.throwOnError(); _authed.put( db , true ); } /** * Gets the pool that this port belongs to * @return */ public DBPortPool getPool() { return _pool; } final int _hashCode; final ServerAddress _sa; final InetSocketAddress _addr; final DBPortPool _pool; final MongoOptions _options; final Logger _logger; final DBDecoder _decoder; private Socket _socket; private InputStream _in; private OutputStream _out; private boolean _processingResponse; private Map<DB,Boolean> _authed = new ConcurrentHashMap<DB, Boolean>( ); int _lastThread; long _calls = 0; private static Logger _rootLogger = Logger.getLogger( "com.mongodb.port" ); }