// DBApiLayer.java /** * Copyright (C) 2008 10gen Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.mongodb; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.logging.Level; import java.util.logging.Logger; import org.bson.BSONObject; import org.bson.types.ObjectId; import com.mongodb.util.JSON; /** Database API * This cannot be directly instantiated, but the functions are available * through instances of Mongo. */ public class DBApiLayer extends DB { static final boolean D = Boolean.getBoolean( "DEBUG.DB" ); /** The maximum number of cursors allowed */ static final int NUM_CURSORS_BEFORE_KILL = 100; static final int NUM_CURSORS_PER_BATCH = 20000; // --- show static final Logger TRACE_LOGGER = Logger.getLogger( "com.mongodb.TRACE" ); static final Level TRACE_LEVEL = Boolean.getBoolean( "DB.TRACE" ) ? Level.INFO : Level.FINEST; static final boolean willTrace(){ return TRACE_LOGGER.isLoggable( TRACE_LEVEL ); } static final void trace( String s ){ TRACE_LOGGER.log( TRACE_LEVEL , s ); } static int chooseBatchSize(int batchSize, int limit, int fetched) { int bs = Math.abs(batchSize); int remaining = limit > 0 ? limit - fetched : 0; int res = 0; if (bs == 0 && remaining > 0) res = remaining; else if (bs > 0 && remaining == 0) res = bs; else res = Math.min(bs, remaining); if (batchSize < 0) { // force close res = -res; } if (res == 1) { // optimization: use negative batchsize to close cursor res = -1; } return res; } /** * @param mongo the Mongo instance * @param name the database name * @param connector the connector */ protected DBApiLayer( Mongo mongo, String name , DBConnector connector ){ super( mongo, name ); if ( connector == null ) throw new IllegalArgumentException( "need a connector: " + name ); _root = name; _rootPlusDot = _root + "."; _connector = connector; } public void requestStart(){ _connector.requestStart(); } public void requestDone(){ _connector.requestDone(); } public void requestEnsureConnection(){ _connector.requestEnsureConnection(); } protected MyCollection doGetCollection( String name ){ MyCollection c = _collections.get( name ); if ( c != null ) return c; c = new MyCollection( name ); MyCollection old = _collections.putIfAbsent(name, c); return old != null ? old : c; } String _removeRoot( String ns ){ if ( ! ns.startsWith( _rootPlusDot ) ) return ns; return ns.substring( _root.length() + 1 ); } public void cleanCursors( boolean force ) throws MongoException { int sz = _deadCursorIds.size(); if ( sz == 0 || ( ! force && sz < NUM_CURSORS_BEFORE_KILL)) return; Bytes.LOGGER.info( "going to kill cursors : " + sz ); Map<ServerAddress,List<Long>> m = new HashMap<ServerAddress,List<Long>>(); DeadCursor c; while (( c = _deadCursorIds.poll()) != null ){ List<Long> x = m.get( c.host ); if ( x == null ){ x = new LinkedList<Long>(); m.put( c.host , x ); } x.add( c.id ); } for ( Map.Entry<ServerAddress,List<Long>> e : m.entrySet() ){ try { killCursors( e.getKey() , e.getValue() ); } catch ( Throwable t ){ Bytes.LOGGER.log( Level.WARNING , "can't clean cursors" , t ); for ( Long x : e.getValue() ) _deadCursorIds.add( new DeadCursor( x , e.getKey() ) ); } } } void killCursors( ServerAddress addr , List<Long> all ) throws MongoException { if ( all == null || all.size() == 0 ) return; OutMessage om = new OutMessage( _mongo , 2007 ); om.writeInt( 0 ); // reserved om.writeInt( Math.min( NUM_CURSORS_PER_BATCH , all.size() ) ); int soFar = 0; int totalSoFar = 0; for (Long l : all) { om.writeLong(l); totalSoFar++; soFar++; if ( soFar >= NUM_CURSORS_PER_BATCH ){ _connector.say( this , om ,com.mongodb.WriteConcern.NONE ); om = new OutMessage( _mongo , 2007 ); om.writeInt( 0 ); // reserved om.writeInt( Math.min( NUM_CURSORS_PER_BATCH , all.size() - totalSoFar ) ); soFar = 0; } } _connector.say( this , om ,com.mongodb.WriteConcern.NONE , addr ); } class MyCollection extends DBCollection { MyCollection( String name ){ super( DBApiLayer.this , name ); _fullNameSpace = _root + "." + name; } public void doapply( DBObject o ){ } @Override public void drop() throws MongoException { _collections.remove(getName()); super.drop(); } public WriteResult insert(DBObject[] arr, com.mongodb.WriteConcern concern, DBEncoder encoder ) throws MongoException { return insert( arr, true, concern, encoder ); } protected WriteResult insert(DBObject[] arr, boolean shouldApply , com.mongodb.WriteConcern concern, DBEncoder encoder ) throws MongoException { if (encoder == null) encoder = DefaultDBEncoder.FACTORY.create(); if ( willTrace() ) { for (DBObject o : arr) { trace( "save: " + _fullNameSpace + " " + JSON.serialize( o ) ); } } if ( shouldApply ){ for ( int i=0; i<arr.length; i++ ){ DBObject o=arr[i]; apply( o ); _checkObject( o , false , false ); Object id = o.get( "_id" ); if ( id instanceof ObjectId ){ ((ObjectId)id).notNew(); } } } WriteResult last = null; int cur = 0; int maxsize = _mongo.getMaxBsonObjectSize(); while ( cur < arr.length ){ OutMessage om = new OutMessage( _mongo , 2002, encoder ); int flags = 0; if ( concern.getContinueOnErrorForInsert() ) flags |= 1; om.writeInt( flags ); om.writeCString( _fullNameSpace ); for ( ; cur<arr.length; cur++ ){ DBObject o = arr[cur]; om.putObject( o ); // limit for batch insert is 4 x maxbson on server, use 2 x to be safe if ( om.size() > 2 * maxsize ){ cur++; break; } } last = _connector.say( _db , om , concern ); } return last; } public WriteResult remove( DBObject o , com.mongodb.WriteConcern concern, DBEncoder encoder ) throws MongoException { if (encoder == null) encoder = DefaultDBEncoder.FACTORY.create(); if ( willTrace() ) trace( "remove: " + _fullNameSpace + " " + JSON.serialize( o ) ); OutMessage om = new OutMessage( _mongo , 2006, encoder ); om.writeInt( 0 ); // reserved om.writeCString( _fullNameSpace ); Collection<String> keys = o.keySet(); if ( keys.size() == 1 && keys.iterator().next().equals( "_id" ) && o.get( keys.iterator().next() ) instanceof ObjectId ) om.writeInt( 1 ); else om.writeInt( 0 ); om.putObject( o ); return _connector.say( _db , om , concern ); } @Override Iterator<DBObject> __find( DBObject ref , DBObject fields , int numToSkip , int batchSize, int limit , int options, ReadPreference readPref, DBDecoder decoder ) throws MongoException { return __find(ref, fields, numToSkip, batchSize, limit, options, readPref, decoder, DefaultDBEncoder.FACTORY.create()); } @Override Iterator<DBObject> __find( DBObject ref , DBObject fields , int numToSkip , int batchSize , int limit, int options, ReadPreference readPref, DBDecoder decoder, DBEncoder encoder ) throws MongoException { if ( ref == null ) ref = new BasicDBObject(); if ( willTrace() ) trace( "find: " + _fullNameSpace + " " + JSON.serialize( ref ) ); OutMessage query = OutMessage.query( _mongo , options , _fullNameSpace , numToSkip , chooseBatchSize(batchSize, limit, 0) , ref , fields, readPref, encoder); Response res = _connector.call( _db , this , query , null , 2, readPref, decoder ); if ( res.size() == 1 ){ BSONObject foo = res.get(0); MongoException e = MongoException.parse( foo ); if ( e != null && ! _name.equals( "$cmd" ) ) throw e; } return new Result( this , res , batchSize, limit , options, decoder ); } @Override public WriteResult update( DBObject query , DBObject o , boolean upsert , boolean multi , com.mongodb.WriteConcern concern, DBEncoder encoder ) throws MongoException { if (encoder == null) encoder = DefaultDBEncoder.FACTORY.create(); if (o != null && !o.keySet().isEmpty()) { // if 1st key doesn't start with $, then object will be inserted as is, need to check it String key = o.keySet().iterator().next(); if (!key.startsWith("$")) _checkObject(o, false, false); } if ( willTrace() ) trace( "update: " + _fullNameSpace + " " + JSON.serialize( query ) + " " + JSON.serialize( o ) ); OutMessage om = new OutMessage( _mongo , 2001, encoder ); om.writeInt( 0 ); // reserved om.writeCString( _fullNameSpace ); int flags = 0; if ( upsert ) flags |= 1; if ( multi ) flags |= 2; om.writeInt( flags ); om.putObject( query ); om.putObject( o ); return _connector.say( _db , om , concern ); } public void createIndex( final DBObject keys, final DBObject options, DBEncoder encoder ) throws MongoException { if (encoder == null) encoder = DefaultDBEncoder.FACTORY.create(); DBObject full = new BasicDBObject(); for ( String k : options.keySet() ) full.put( k , options.get( k ) ); full.put( "key" , keys ); MyCollection idxs = DBApiLayer.this.doGetCollection( "system.indexes" ); //query first, maybe we should do an update w/upsert? -- need to test performance and lock behavior if ( idxs.findOne( full ) == null ) idxs.insert( new DBObject[] { full }, false, WriteConcern.SAFE, encoder ); } final String _fullNameSpace; } class Result implements Iterator<DBObject> { Result( MyCollection coll , Response res , int batchSize, int limit , int options, DBDecoder decoder ){ _collection = coll; _batchSize = batchSize; _limit = limit; _options = options; _host = res._host; _decoder = decoder; init( res ); } private void init( Response res ){ _totalBytes += res._len; _curResult = res; _cur = res.iterator(); _sizes.add( res.size() ); _numFetched += res.size(); if ( ( res._flags & Bytes.RESULTFLAG_CURSORNOTFOUND ) > 0 ){ throw new MongoException.CursorNotFound(res._cursor, res.serverUsed()); } if (res._cursor != 0 && _limit > 0 && _limit - _numFetched <= 0) { // fetched all docs within limit, close cursor server-side killCursor(); } } public DBObject next(){ if ( _cur.hasNext() ) { return _cur.next(); } if ( ! _curResult.hasGetMore( _options ) ) throw new RuntimeException( "no more" ); _advance(); return next(); } public boolean hasNext(){ boolean hasNext = _cur.hasNext(); while ( !hasNext ) { if ( ! _curResult.hasGetMore( _options ) ) return false; _advance(); hasNext = _cur.hasNext(); if (!hasNext) { if ( ( _options & Bytes.QUERYOPTION_AWAITDATA ) == 0 ) { // dont block waiting for data if no await return false; } else { // if await, driver should block until data is available // if server does not support await, driver must sleep to avoid busy loop if ((_curResult._flags & Bytes.RESULTFLAG_AWAITCAPABLE) == 0) { try { Thread.sleep(500); } catch (Exception e) { } } } } } return hasNext; } private void _advance(){ if ( _curResult.cursor() <= 0 ) throw new RuntimeException( "can't advance a cursor <= 0" ); OutMessage m = new OutMessage( _mongo , 2005 ); m.writeInt( 0 ); m.writeCString( _collection._fullNameSpace ); m.writeInt( chooseBatchSize(_batchSize, _limit, _numFetched) ); m.writeLong( _curResult.cursor() ); Response res = _connector.call( DBApiLayer.this , _collection , m , _host, _decoder ); _numGetMores++; init( res ); } public void remove(){ throw new RuntimeException( "can't remove this way" ); } public int getBatchSize(){ return _batchSize; } public void setBatchSize(int size){ _batchSize = size; } public String toString(){ return "DBCursor"; } protected void finalize() throws Throwable { if (_curResult != null) { long curId = _curResult.cursor(); _curResult = null; _cur = null; if (curId != 0) { _deadCursorIds.add(new DeadCursor(curId, _host)); } } super.finalize(); } public long totalBytes(){ return _totalBytes; } public long getCursorId(){ if ( _curResult == null ) return 0; return _curResult._cursor; } int numGetMores(){ return _numGetMores; } List<Integer> getSizes(){ return Collections.unmodifiableList( _sizes ); } void close(){ // not perfectly thread safe here, may need to use an atomicBoolean if (_curResult != null) { killCursor(); _curResult = null; _cur = null; } } void killCursor() { if (_curResult == null) return; long curId = _curResult.cursor(); if (curId == 0) return; List<Long> l = new ArrayList<Long>(); l.add(curId); try { killCursors(_host, l); } catch (Throwable t) { Bytes.LOGGER.log(Level.WARNING, "can't clean 1 cursor", t); _deadCursorIds.add(new DeadCursor(curId, _host)); } _curResult._cursor = 0; } public ServerAddress getServerAddress() { return _host; } Response _curResult; Iterator<DBObject> _cur; int _batchSize; int _limit; final DBDecoder _decoder; final MyCollection _collection; final int _options; final ServerAddress _host; // host where first went. all subsequent have to go there private long _totalBytes = 0; private int _numGetMores = 0; private List<Integer> _sizes = new ArrayList<Integer>(); private int _numFetched = 0; } // class Result static class DeadCursor { DeadCursor( long a , ServerAddress b ){ id = a; host = b; } final long id; final ServerAddress host; } final String _root; final String _rootPlusDot; final DBConnector _connector; final ConcurrentHashMap<String,MyCollection> _collections = new ConcurrentHashMap<String,MyCollection>(); ConcurrentLinkedQueue<DeadCursor> _deadCursorIds = new ConcurrentLinkedQueue<DeadCursor>(); static final List<DBObject> EMPTY = Collections.unmodifiableList( new LinkedList<DBObject>() ); }