// DBTCPConnector.java
/**
* Copyright (C) 2008 10gen Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mongodb;
import com.mongodb.ReadPreference.TaggedReadPreference;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
public class DBTCPConnector implements DBConnector {
static Logger _logger = Logger.getLogger( Bytes.LOGGER.getName() + ".tcp" );
static Logger _createLogger = Logger.getLogger( _logger.getName() + ".connect" );
public DBTCPConnector( Mongo m , ServerAddress addr )
throws MongoException {
_mongo = m;
_portHolder = new DBPortPool.Holder( m._options );
_checkAddress( addr );
_createLogger.info( addr.toString() );
setMasterAddress(addr);
_allHosts = null;
_rsStatus = null;
}
public DBTCPConnector( Mongo m , ServerAddress ... all )
throws MongoException {
this( m , Arrays.asList( all ) );
}
public DBTCPConnector( Mongo m , List<ServerAddress> all )
throws MongoException {
_mongo = m;
_portHolder = new DBPortPool.Holder( m._options );
_checkAddress( all );
_allHosts = new ArrayList<ServerAddress>( all ); // make a copy so it can't be modified
_rsStatus = new ReplicaSetStatus( m, _allHosts );
_createLogger.info( all + " -> " + getAddress() );
}
public void start() {
if (_rsStatus != null)
_rsStatus.start();
}
private static ServerAddress _checkAddress( ServerAddress addr ){
if ( addr == null )
throw new NullPointerException( "address can't be null" );
return addr;
}
private static ServerAddress _checkAddress( List<ServerAddress> addrs ){
if ( addrs == null )
throw new NullPointerException( "addresses can't be null" );
if ( addrs.size() == 0 )
throw new IllegalArgumentException( "need to specify at least 1 address" );
return addrs.get(0);
}
/**
* Start a "request".
*
* A "request" is a group of operations in which order matters. Examples
* include inserting a document and then performing a query which expects
* that document to have been inserted, or performing an operation and
* then using com.mongodb.Mongo.getLastError to perform error-checking
* on that operation. When a thread performs operations in a "request", all
* operations will be performed on the same socket, so they will be
* correctly ordered.
*/
@Override
public void requestStart(){
_myPort.get().requestStart();
}
/**
* End the current "request", if this thread is in one.
*
* By ending a request when it is safe to do so the built-in connection-
* pool is allowed to reassign requests to different sockets in order to
* more effectively balance load. See requestStart for more information.
*/
@Override
public void requestDone(){
_myPort.get().requestDone();
}
@Override
public void requestEnsureConnection(){
_myPort.get().requestEnsureConnection();
}
void _checkClosed(){
if ( _closed.get() )
throw new IllegalStateException( "this Mongo has been closed" );
}
WriteResult _checkWriteError( DB db , MyPort mp , DBPort port , WriteConcern concern )
throws MongoException, IOException {
CommandResult e = null;
e = port.runCommand( db , concern.getCommand() );
if ( ! e.hasErr() )
return new WriteResult( e , concern );
e.throwOnError();
return null;
}
@Override
public WriteResult say( DB db , OutMessage m , WriteConcern concern )
throws MongoException {
return say( db , m , concern , null );
}
@Override
public WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddress hostNeeded )
throws MongoException {
_checkClosed();
checkMaster( false , true );
MyPort mp = _myPort.get();
DBPort port = mp.get( true , ReadPreference.PRIMARY, hostNeeded );
try {
port.checkAuth( db );
port.say( m );
if ( concern.callGetLastError() ){
return _checkWriteError( db , mp , port , concern );
}
else {
return new WriteResult( db , port , concern );
}
}
catch ( IOException ioe ){
mp.error( port , ioe );
_error( ioe, false );
if ( concern.raiseNetworkErrors() )
throw new MongoException.Network( "can't say something" , ioe );
CommandResult res = new CommandResult(port.serverAddress());
res.put( "ok" , false );
res.put( "$err" , "NETWORK ERROR" );
return new WriteResult( res , concern );
}
catch ( MongoException me ){
throw me;
}
catch ( RuntimeException re ){
mp.error( port , re );
throw re;
}
finally {
mp.done( port );
m.doneWithMessage();
}
}
@Override
public Response call( DB db , DBCollection coll , OutMessage m, ServerAddress hostNeeded, DBDecoder decoder )
throws MongoException {
return call( db , coll , m , hostNeeded , 2, null, decoder );
}
public Response call( DB db , DBCollection coll , OutMessage m , ServerAddress hostNeeded , int retries ) throws MongoException {
return call( db, coll, m, hostNeeded, retries, null, null);
}
@Override
public Response call( DB db, DBCollection coll, OutMessage m, ServerAddress hostNeeded, int retries, ReadPreference readPref, DBDecoder decoder ) throws MongoException{
if (readPref == null)
readPref = ReadPreference.PRIMARY;
if (readPref == ReadPreference.PRIMARY && m.hasOption( Bytes.QUERYOPTION_SLAVEOK ))
readPref = ReadPreference.SECONDARY;
boolean secondaryOk = !(readPref == ReadPreference.PRIMARY);
_checkClosed();
checkMaster( false, !secondaryOk );
final MyPort mp = _myPort.get();
final DBPort port = mp.get( false , readPref, hostNeeded );
Response res = null;
boolean retry = false;
try {
port.checkAuth( db );
res = port.call( m , coll, readPref, decoder );
if ( res._responseTo != m.getId() )
throw new MongoException( "ids don't match" );
}
catch ( IOException ioe ){
mp.error( port , ioe );
retry = retries > 0 && !coll._name.equals( "$cmd" )
&& !(ioe instanceof SocketTimeoutException) && _error( ioe, secondaryOk );
if ( !retry ){
throw new MongoException.Network( "can't call something : " + port.host() + "/" + db,
ioe );
}
}
catch ( RuntimeException re ){
mp.error( port , re );
throw re;
} finally {
mp.done( port );
}
if (retry)
return call( db , coll , m , hostNeeded , retries - 1 , readPref, decoder );
ServerError err = res.getError();
if ( err != null && err.isNotMasterError() ){
checkMaster( true , true );
if ( retries <= 0 ){
throw new MongoException( "not talking to master and retries used up" );
}
return call( db , coll , m , hostNeeded , retries -1, readPref, decoder );
}
m.doneWithMessage();
return res;
}
public ServerAddress getAddress(){
DBPortPool pool = _masterPortPool;
return pool != null ? pool.getServerAddress() : null;
}
/**
* Gets the list of seed server addresses
* @return
*/
public List<ServerAddress> getAllAddress() {
return _allHosts;
}
/**
* Gets the list of server addresses currently seen by the connector.
* This includes addresses auto-discovered from a replica set.
* @return
*/
public List<ServerAddress> getServerAddressList() {
if (_rsStatus != null) {
return _rsStatus.getServerAddressList();
}
ServerAddress master = getAddress();
if (master != null) {
// single server
List<ServerAddress> list = new ArrayList<ServerAddress>();
list.add(master);
return list;
}
return null;
}
public ReplicaSetStatus getReplicaSetStatus() {
return _rsStatus;
}
public String getConnectPoint(){
ServerAddress master = getAddress();
return master != null ? master.toString() : null;
}
/**
* This method is called in case of an IOException.
* It will potentially trigger a checkMaster() to check the status of all servers.
* @param t the exception thrown
* @param secondaryOk secondaryOk flag
* @return true if the request should be retried, false otherwise
* @throws MongoException
*/
boolean _error( Throwable t, boolean secondaryOk )
throws MongoException {
if (_rsStatus == null) {
// single server, no need to retry
return false;
}
// the replset has at least 1 server up, try to see if should switch master
// if no server is up, we wont retry until the updater thread finds one
// this is to cut down the volume of requests/errors when all servers are down
if ( _rsStatus.hasServerUp() ){
checkMaster( true , !secondaryOk );
}
return _rsStatus.hasServerUp();
}
class MyPort {
DBPort get( boolean keep , ReadPreference readPref, ServerAddress hostNeeded ){
if ( hostNeeded != null ){
if (_requestPort != null && _requestPort.serverAddress().equals(hostNeeded)) {
return _requestPort;
}
// asked for a specific host
return _portHolder.get( hostNeeded ).get();
}
if ( _requestPort != null ){
// we are within a request, and have a port, should stick to it
if ( _requestPort.getPool() == _masterPortPool || !keep ) {
// if keep is false, it's a read, so we use port even if master changed
return _requestPort;
}
// it's write and master has changed
// we fall back on new master and try to go on with request
// this may not be best behavior if spec of request is to stick with same server
_requestPort.getPool().done(_requestPort);
_requestPort = null;
}
if ( !(readPref == ReadPreference.PRIMARY) && _rsStatus != null ){
// if not a primary read set, try to use a secondary
// Do they want a Secondary, or a specific tag set?
if (readPref == ReadPreference.SECONDARY) {
ServerAddress slave = _rsStatus.getASecondary();
if ( slave != null ){
return _portHolder.get( slave ).get();
}
} else if (readPref instanceof ReadPreference.TaggedReadPreference) {
// Tag based read
ServerAddress secondary = _rsStatus.getASecondary( ( (TaggedReadPreference) readPref ).getTags() );
if (secondary != null)
return _portHolder.get( secondary ).get();
else
throw new MongoException( "Could not find any valid secondaries with the supplied tags ('" +
( (TaggedReadPreference) readPref ).getTags() + "'");
}
}
if (_masterPortPool == null) {
// this should only happen in rare case that no master was ever found
// may get here at startup if it's a read, slaveOk=true, and ALL servers are down
throw new MongoException("Rare case where master=null, probably all servers are down");
}
// use master
DBPort p = _masterPortPool.get();
if ( _inRequest ) {
// if within request, remember port to stick to same server
_requestPort = p;
}
return p;
}
void done( DBPort p ){
// keep request port
if ( p != _requestPort ){
p.getPool().done(p);
}
}
/**
* call this method when there is an IOException or other low level error on port.
* @param p
* @param e
*/
void error( DBPort p , Exception e ){
p.close();
_requestPort = null;
// _logger.log( Level.SEVERE , "MyPort.error called" , e );
// depending on type of error, may need to close other connections in pool
p.getPool().gotError(e);
}
void requestEnsureConnection(){
if ( ! _inRequest )
return;
if ( _requestPort != null )
return;
_requestPort = _masterPortPool.get();
}
void requestStart(){
_inRequest = true;
}
void requestDone(){
if ( _requestPort != null )
_requestPort.getPool().done( _requestPort );
_requestPort = null;
_inRequest = false;
}
DBPort _requestPort;
// DBPortPool _requestPool;
boolean _inRequest;
}
void checkMaster( boolean force , boolean failIfNoMaster )
throws MongoException {
if ( _rsStatus != null ){
if ( _masterPortPool == null || force ){
ReplicaSetStatus.Node master = _rsStatus.ensureMaster();
if ( master == null ){
if ( failIfNoMaster )
throw new MongoException( "can't find a master" );
}
else {
setMaster(master);
}
}
} else {
// single server, may have to obtain max bson size
if (_maxBsonObjectSize.get() == 0)
fetchMaxBsonObjectSize();
}
}
synchronized void setMaster(ReplicaSetStatus.Node master) {
if (_closed.get()) {
return;
}
setMasterAddress(master.getServerAddress());
_maxBsonObjectSize.set(master.getMaxBsonObjectSize());
}
/**
* Fetches the maximum size for a BSON object from the current master server
* @return the size, or 0 if it could not be obtained
*/
int fetchMaxBsonObjectSize() {
if (_masterPortPool == null)
return 0;
DBPort port = _masterPortPool.get();
try {
CommandResult res = port.runCommand(_mongo.getDB("admin"), new BasicDBObject("isMaster", 1));
// max size was added in 1.8
if (res.containsField("maxBsonObjectSize")) {
_maxBsonObjectSize.set(((Integer) res.get("maxBsonObjectSize")).intValue());
} else {
_maxBsonObjectSize.set(Bytes.MAX_OBJECT_SIZE);
}
} catch (Exception e) {
_logger.log(Level.WARNING, "Exception determining maxBSONObjectSize ", e);
} finally {
port.getPool().done(port);
}
return _maxBsonObjectSize.get();
}
private synchronized boolean setMasterAddress(ServerAddress addr) {
DBPortPool newPool = _portHolder.get( addr );
if (newPool == _masterPortPool)
return false;
if ( _masterPortPool != null )
_logger.log(Level.WARNING, "Master switching from " + _masterPortPool.getServerAddress() + " to " + addr);
_masterPortPool = newPool;
return true;
}
public String debugString(){
StringBuilder buf = new StringBuilder( "DBTCPConnector: " );
if ( _rsStatus != null ) {
buf.append( "replica set : " ).append( _allHosts );
} else {
ServerAddress master = getAddress();
buf.append( master ).append( " " ).append( master != null ? master._addr : null );
}
return buf.toString();
}
public void close(){
_closed.set( true );
if ( _portHolder != null ) {
try {
_portHolder.close();
_portHolder = null;
} catch (final Throwable t) { /* nada */ }
}
if ( _rsStatus != null ) {
try {
_rsStatus.close();
_rsStatus = null;
} catch (final Throwable t) { /* nada */ }
}
// below this will remove the myport for this thread only
// client using thread pool in web framework may need to call close() from all threads
_myPort.remove();
}
/**
* Assigns a new DBPortPool for a given ServerAddress.
* This is used to obtain a new pool when the resolved IP of a host changes, for example.
* User application should not have to call this method directly.
* @param addr
*/
public void updatePortPool(ServerAddress addr) {
// just remove from map, a new pool will be created lazily
_portHolder._pools.remove(addr);
}
/**
* Gets the DBPortPool associated with a ServerAddress.
* @param addr
* @return
*/
public DBPortPool getDBPortPool(ServerAddress addr) {
return _portHolder.get(addr);
}
public boolean isOpen(){
return ! _closed.get();
}
/**
* Gets the maximum size for a BSON object supported by the current master server.
* Note that this value may change over time depending on which server is master.
* @return the maximum size, or 0 if not obtained from servers yet.
*/
public int getMaxBsonObjectSize() {
return _maxBsonObjectSize.get();
}
// expose for unit testing
MyPort getMyPort() {
return _myPort.get();
}
private volatile DBPortPool _masterPortPool;
private final Mongo _mongo;
private DBPortPool.Holder _portHolder;
private final List<ServerAddress> _allHosts;
private ReplicaSetStatus _rsStatus;
private final AtomicBoolean _closed = new AtomicBoolean(false);
private final AtomicInteger _maxBsonObjectSize = new AtomicInteger(0);
private ThreadLocal<MyPort> _myPort = new ThreadLocal<MyPort>(){
protected MyPort initialValue(){
return new MyPort();
}
};
}