// DBApiLayer.java
/**
* Copyright (C) 2008 10gen Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mongodb;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.bson.BSONObject;
import org.bson.types.ObjectId;
import com.mongodb.util.JSON;
/** Database API
* This cannot be directly instantiated, but the functions are available
* through instances of Mongo.
*/
public class DBApiLayer extends DB {
static final boolean D = Boolean.getBoolean( "DEBUG.DB" );
/** The maximum number of cursors allowed */
static final int NUM_CURSORS_BEFORE_KILL = 100;
static final int NUM_CURSORS_PER_BATCH = 20000;
// --- show
static final Logger TRACE_LOGGER = Logger.getLogger( "com.mongodb.TRACE" );
static final Level TRACE_LEVEL = Boolean.getBoolean( "DB.TRACE" ) ? Level.INFO : Level.FINEST;
static final boolean willTrace(){
return TRACE_LOGGER.isLoggable( TRACE_LEVEL );
}
static final void trace( String s ){
TRACE_LOGGER.log( TRACE_LEVEL , s );
}
static int chooseBatchSize(int batchSize, int limit, int fetched) {
int bs = Math.abs(batchSize);
int remaining = limit > 0 ? limit - fetched : 0;
int res = 0;
if (bs == 0 && remaining > 0)
res = remaining;
else if (bs > 0 && remaining == 0)
res = bs;
else
res = Math.min(bs, remaining);
if (batchSize < 0) {
// force close
res = -res;
}
if (res == 1) {
// optimization: use negative batchsize to close cursor
res = -1;
}
return res;
}
/**
* @param mongo the Mongo instance
* @param name the database name
* @param connector the connector
*/
protected DBApiLayer( Mongo mongo, String name , DBConnector connector ){
super( mongo, name );
if ( connector == null )
throw new IllegalArgumentException( "need a connector: " + name );
_root = name;
_rootPlusDot = _root + ".";
_connector = connector;
}
public void requestStart(){
_connector.requestStart();
}
public void requestDone(){
_connector.requestDone();
}
public void requestEnsureConnection(){
_connector.requestEnsureConnection();
}
protected MyCollection doGetCollection( String name ){
MyCollection c = _collections.get( name );
if ( c != null )
return c;
c = new MyCollection( name );
MyCollection old = _collections.putIfAbsent(name, c);
return old != null ? old : c;
}
String _removeRoot( String ns ){
if ( ! ns.startsWith( _rootPlusDot ) )
return ns;
return ns.substring( _root.length() + 1 );
}
public void cleanCursors( boolean force )
throws MongoException {
int sz = _deadCursorIds.size();
if ( sz == 0 || ( ! force && sz < NUM_CURSORS_BEFORE_KILL))
return;
Bytes.LOGGER.info( "going to kill cursors : " + sz );
Map<ServerAddress,List<Long>> m = new HashMap<ServerAddress,List<Long>>();
DeadCursor c;
while (( c = _deadCursorIds.poll()) != null ){
List<Long> x = m.get( c.host );
if ( x == null ){
x = new LinkedList<Long>();
m.put( c.host , x );
}
x.add( c.id );
}
for ( Map.Entry<ServerAddress,List<Long>> e : m.entrySet() ){
try {
killCursors( e.getKey() , e.getValue() );
}
catch ( Throwable t ){
Bytes.LOGGER.log( Level.WARNING , "can't clean cursors" , t );
for ( Long x : e.getValue() )
_deadCursorIds.add( new DeadCursor( x , e.getKey() ) );
}
}
}
void killCursors( ServerAddress addr , List<Long> all )
throws MongoException {
if ( all == null || all.size() == 0 )
return;
OutMessage om = new OutMessage( _mongo , 2007 );
om.writeInt( 0 ); // reserved
om.writeInt( Math.min( NUM_CURSORS_PER_BATCH , all.size() ) );
int soFar = 0;
int totalSoFar = 0;
for (Long l : all) {
om.writeLong(l);
totalSoFar++;
soFar++;
if ( soFar >= NUM_CURSORS_PER_BATCH ){
_connector.say( this , om ,com.mongodb.WriteConcern.NONE );
om = new OutMessage( _mongo , 2007 );
om.writeInt( 0 ); // reserved
om.writeInt( Math.min( NUM_CURSORS_PER_BATCH , all.size() - totalSoFar ) );
soFar = 0;
}
}
_connector.say( this , om ,com.mongodb.WriteConcern.NONE , addr );
}
class MyCollection extends DBCollection {
MyCollection( String name ){
super( DBApiLayer.this , name );
_fullNameSpace = _root + "." + name;
}
public void doapply( DBObject o ){
}
@Override
public void drop() throws MongoException {
_collections.remove(getName());
super.drop();
}
public WriteResult insert(DBObject[] arr, com.mongodb.WriteConcern concern, DBEncoder encoder )
throws MongoException {
return insert( arr, true, concern, encoder );
}
protected WriteResult insert(DBObject[] arr, boolean shouldApply , com.mongodb.WriteConcern concern, DBEncoder encoder )
throws MongoException {
if (encoder == null)
encoder = DefaultDBEncoder.FACTORY.create();
if ( willTrace() ) {
for (DBObject o : arr) {
trace( "save: " + _fullNameSpace + " " + JSON.serialize( o ) );
}
}
if ( shouldApply ){
for ( int i=0; i<arr.length; i++ ){
DBObject o=arr[i];
apply( o );
_checkObject( o , false , false );
Object id = o.get( "_id" );
if ( id instanceof ObjectId ){
((ObjectId)id).notNew();
}
}
}
WriteResult last = null;
int cur = 0;
int maxsize = _mongo.getMaxBsonObjectSize();
while ( cur < arr.length ){
OutMessage om = new OutMessage( _mongo , 2002, encoder );
int flags = 0;
if ( concern.getContinueOnErrorForInsert() ) flags |= 1;
om.writeInt( flags );
om.writeCString( _fullNameSpace );
for ( ; cur<arr.length; cur++ ){
DBObject o = arr[cur];
om.putObject( o );
// limit for batch insert is 4 x maxbson on server, use 2 x to be safe
if ( om.size() > 2 * maxsize ){
cur++;
break;
}
}
last = _connector.say( _db , om , concern );
}
return last;
}
public WriteResult remove( DBObject o , com.mongodb.WriteConcern concern, DBEncoder encoder )
throws MongoException {
if (encoder == null)
encoder = DefaultDBEncoder.FACTORY.create();
if ( willTrace() ) trace( "remove: " + _fullNameSpace + " " + JSON.serialize( o ) );
OutMessage om = new OutMessage( _mongo , 2006, encoder );
om.writeInt( 0 ); // reserved
om.writeCString( _fullNameSpace );
Collection<String> keys = o.keySet();
if ( keys.size() == 1 &&
keys.iterator().next().equals( "_id" ) &&
o.get( keys.iterator().next() ) instanceof ObjectId )
om.writeInt( 1 );
else
om.writeInt( 0 );
om.putObject( o );
return _connector.say( _db , om , concern );
}
@Override
Iterator<DBObject> __find( DBObject ref , DBObject fields , int numToSkip , int batchSize, int limit , int options, ReadPreference readPref, DBDecoder decoder )
throws MongoException {
return __find(ref, fields, numToSkip, batchSize, limit, options, readPref, decoder, DefaultDBEncoder.FACTORY.create());
}
@Override
Iterator<DBObject> __find( DBObject ref , DBObject fields , int numToSkip , int batchSize , int limit, int options,
ReadPreference readPref, DBDecoder decoder, DBEncoder encoder ) throws MongoException {
if ( ref == null )
ref = new BasicDBObject();
if ( willTrace() ) trace( "find: " + _fullNameSpace + " " + JSON.serialize( ref ) );
OutMessage query = OutMessage.query( _mongo , options , _fullNameSpace , numToSkip , chooseBatchSize(batchSize, limit, 0) , ref , fields, readPref,
encoder);
Response res = _connector.call( _db , this , query , null , 2, readPref, decoder );
if ( res.size() == 1 ){
BSONObject foo = res.get(0);
MongoException e = MongoException.parse( foo );
if ( e != null && ! _name.equals( "$cmd" ) )
throw e;
}
return new Result( this , res , batchSize, limit , options, decoder );
}
@Override
public WriteResult update( DBObject query , DBObject o , boolean upsert , boolean multi , com.mongodb.WriteConcern concern, DBEncoder encoder )
throws MongoException {
if (encoder == null)
encoder = DefaultDBEncoder.FACTORY.create();
if (o != null && !o.keySet().isEmpty()) {
// if 1st key doesn't start with $, then object will be inserted as is, need to check it
String key = o.keySet().iterator().next();
if (!key.startsWith("$"))
_checkObject(o, false, false);
}
if ( willTrace() ) trace( "update: " + _fullNameSpace + " " + JSON.serialize( query ) + " " + JSON.serialize( o ) );
OutMessage om = new OutMessage( _mongo , 2001, encoder );
om.writeInt( 0 ); // reserved
om.writeCString( _fullNameSpace );
int flags = 0;
if ( upsert ) flags |= 1;
if ( multi ) flags |= 2;
om.writeInt( flags );
om.putObject( query );
om.putObject( o );
return _connector.say( _db , om , concern );
}
public void createIndex( final DBObject keys, final DBObject options, DBEncoder encoder )
throws MongoException {
if (encoder == null)
encoder = DefaultDBEncoder.FACTORY.create();
DBObject full = new BasicDBObject();
for ( String k : options.keySet() )
full.put( k , options.get( k ) );
full.put( "key" , keys );
MyCollection idxs = DBApiLayer.this.doGetCollection( "system.indexes" );
//query first, maybe we should do an update w/upsert? -- need to test performance and lock behavior
if ( idxs.findOne( full ) == null )
idxs.insert( new DBObject[] { full }, false, WriteConcern.SAFE, encoder );
}
final String _fullNameSpace;
}
class Result implements Iterator<DBObject> {
Result( MyCollection coll , Response res , int batchSize, int limit , int options, DBDecoder decoder ){
_collection = coll;
_batchSize = batchSize;
_limit = limit;
_options = options;
_host = res._host;
_decoder = decoder;
init( res );
}
private void init( Response res ){
_totalBytes += res._len;
_curResult = res;
_cur = res.iterator();
_sizes.add( res.size() );
_numFetched += res.size();
if ( ( res._flags & Bytes.RESULTFLAG_CURSORNOTFOUND ) > 0 ){
throw new MongoException.CursorNotFound(res._cursor, res.serverUsed());
}
if (res._cursor != 0 && _limit > 0 && _limit - _numFetched <= 0) {
// fetched all docs within limit, close cursor server-side
killCursor();
}
}
public DBObject next(){
if ( _cur.hasNext() ) {
return _cur.next();
}
if ( ! _curResult.hasGetMore( _options ) )
throw new RuntimeException( "no more" );
_advance();
return next();
}
public boolean hasNext(){
boolean hasNext = _cur.hasNext();
while ( !hasNext ) {
if ( ! _curResult.hasGetMore( _options ) )
return false;
_advance();
hasNext = _cur.hasNext();
if (!hasNext) {
if ( ( _options & Bytes.QUERYOPTION_AWAITDATA ) == 0 ) {
// dont block waiting for data if no await
return false;
} else {
// if await, driver should block until data is available
// if server does not support await, driver must sleep to avoid busy loop
if ((_curResult._flags & Bytes.RESULTFLAG_AWAITCAPABLE) == 0) {
try {
Thread.sleep(500);
} catch (Exception e) {
}
}
}
}
}
return hasNext;
}
private void _advance(){
if ( _curResult.cursor() <= 0 )
throw new RuntimeException( "can't advance a cursor <= 0" );
OutMessage m = new OutMessage( _mongo , 2005 );
m.writeInt( 0 );
m.writeCString( _collection._fullNameSpace );
m.writeInt( chooseBatchSize(_batchSize, _limit, _numFetched) );
m.writeLong( _curResult.cursor() );
Response res = _connector.call( DBApiLayer.this , _collection , m , _host, _decoder );
_numGetMores++;
init( res );
}
public void remove(){
throw new RuntimeException( "can't remove this way" );
}
public int getBatchSize(){
return _batchSize;
}
public void setBatchSize(int size){
_batchSize = size;
}
public String toString(){
return "DBCursor";
}
protected void finalize() throws Throwable {
if (_curResult != null) {
long curId = _curResult.cursor();
_curResult = null;
_cur = null;
if (curId != 0) {
_deadCursorIds.add(new DeadCursor(curId, _host));
}
}
super.finalize();
}
public long totalBytes(){
return _totalBytes;
}
public long getCursorId(){
if ( _curResult == null )
return 0;
return _curResult._cursor;
}
int numGetMores(){
return _numGetMores;
}
List<Integer> getSizes(){
return Collections.unmodifiableList( _sizes );
}
void close(){
// not perfectly thread safe here, may need to use an atomicBoolean
if (_curResult != null) {
killCursor();
_curResult = null;
_cur = null;
}
}
void killCursor() {
if (_curResult == null)
return;
long curId = _curResult.cursor();
if (curId == 0)
return;
List<Long> l = new ArrayList<Long>();
l.add(curId);
try {
killCursors(_host, l);
} catch (Throwable t) {
Bytes.LOGGER.log(Level.WARNING, "can't clean 1 cursor", t);
_deadCursorIds.add(new DeadCursor(curId, _host));
}
_curResult._cursor = 0;
}
public ServerAddress getServerAddress() {
return _host;
}
Response _curResult;
Iterator<DBObject> _cur;
int _batchSize;
int _limit;
final DBDecoder _decoder;
final MyCollection _collection;
final int _options;
final ServerAddress _host; // host where first went. all subsequent have to go there
private long _totalBytes = 0;
private int _numGetMores = 0;
private List<Integer> _sizes = new ArrayList<Integer>();
private int _numFetched = 0;
} // class Result
static class DeadCursor {
DeadCursor( long a , ServerAddress b ){
id = a;
host = b;
}
final long id;
final ServerAddress host;
}
final String _root;
final String _rootPlusDot;
final DBConnector _connector;
final ConcurrentHashMap<String,MyCollection> _collections = new ConcurrentHashMap<String,MyCollection>();
ConcurrentLinkedQueue<DeadCursor> _deadCursorIds = new ConcurrentLinkedQueue<DeadCursor>();
static final List<DBObject> EMPTY = Collections.unmodifiableList( new LinkedList<DBObject>() );
}