public class

BagOfConnectionsConnectionPoolImpl

extends AbstractHostPartitionConnectionPool<CL>
package com.netflix.astyanax.connectionpool.impl;

import java.util.List;
import java.util.Random;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import com.netflix.astyanax.connectionpool.Connection;
import com.netflix.astyanax.connectionpool.ConnectionFactory;
import com.netflix.astyanax.connectionpool.ConnectionPoolConfiguration;
import com.netflix.astyanax.connectionpool.ConnectionPoolMonitor;
import com.netflix.astyanax.connectionpool.ExecuteWithFailover;
import com.netflix.astyanax.connectionpool.HostConnectionPool;
import com.netflix.astyanax.connectionpool.Operation;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.IsDeadConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.IsTimeoutException;
import com.netflix.astyanax.connectionpool.exceptions.NoAvailableHostsException;
import com.netflix.astyanax.connectionpool.exceptions.OperationException;
import com.netflix.astyanax.connectionpool.exceptions.TimeoutException;

/**
 * Connection pool which puts all connections in a single queue. The load
 * balancing is essentially random here.
 * 
 * @author elandau
 * 
 * @param <CL>
 */
public class BagOfConnectionsConnectionPoolImpl<CL> extends AbstractHostPartitionConnectionPool<CL> {

    private final LinkedBlockingQueue<Connection<CL>> idleConnections = new LinkedBlockingQueue<Connection<CL>>();
    private final AtomicInteger activeConnectionCount = new AtomicInteger(0);
    private final Random randomIndex = new Random();

    public BagOfConnectionsConnectionPoolImpl(ConnectionPoolConfiguration config, ConnectionFactory<CL> factory,
            ConnectionPoolMonitor monitor) {
        super(config, factory, monitor);
    }

    private <R> Connection<CL> borrowConnection(Operation<CL, R> op) throws ConnectionException, OperationException {
        long startTime = System.currentTimeMillis();

        // Try to get an open connection from the bag
        Connection<CL> connection = null;

        boolean newConnection = false;
        try {
            connection = idleConnections.poll();
            if (connection != null) {
                return connection;
            }

            // Already reached max connections so just wait
            if (activeConnectionCount.incrementAndGet() > config.getMaxConns()) {
                activeConnectionCount.decrementAndGet();
                try {
                    connection = idleConnections.poll(config.getMaxTimeoutWhenExhausted(), TimeUnit.MILLISECONDS);
                    if (connection == null) {
                        this.monitor.incPoolExhaustedTimeout();
                        throw new TimeoutException("Timed out waiting for connection from bag");
                    }
                    return connection;
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new TimeoutException("Interrupted waiting to borrow a connection");
                }
            }
            // Try to create a new one
            try {
                newConnection = true;
                // Find a random node and open a connection on it. If that node
                // has been exhausted
                // then try the next one in array order until a connection can
                // be created
                List<HostConnectionPool<CL>> pools = topology.getAllPools().getPools();
                if (pools != null && pools.size() > 0) {
                    int index = randomIndex.nextInt(pools.size());
                    for (int i = 0; i < pools.size(); ++i, ++index) {
                        HostConnectionPool<CL> pool = pools.get(index % pools.size());
                        try {
                            connection = pool.borrowConnection(config.getConnectTimeout());
                            return connection;
                        }
                        catch (ConnectionException connectionException) {
                            // Ignore
                        }
                    }
                    monitor.incNoHosts();
                    throw new NoAvailableHostsException("Too many errors trying to open a connection");
                }
                else {
                    monitor.incNoHosts();
                    throw new NoAvailableHostsException("No hosts to borrow from");
                }
            }
            finally {
                if (connection == null)
                    activeConnectionCount.decrementAndGet();
            }
        }
        finally {
            if (connection != null && newConnection == false)
                monitor.incConnectionBorrowed(connection.getHostConnectionPool().getHost(), System.currentTimeMillis()
                        - startTime);
        }
    }

    protected boolean returnConnection(Connection<CL> connection) {
        if (connection != null) {
            if (connection.getHostConnectionPool().isShutdown()
                    || connection.getOperationCount() > config.getMaxOperationsPerConnection()) {
                closeConnection(connection);
            }
            else {
                ConnectionException ce = connection.getLastException();
                if (ce != null && (ce instanceof IsDeadConnectionException || ce instanceof IsTimeoutException)) {
                    closeConnection(connection);
                }
                else if (!this.idleConnections.offer(connection)) {
                    closeConnection(connection);
                }
                else {
                    this.monitor.incConnectionReturned(connection.getHostConnectionPool().getHost());
                }
            }
            return true;
        }
        return false;
    }

    private void closeConnection(Connection<CL> connection) {
        connection.getHostConnectionPool().closeConnection(connection);
        activeConnectionCount.decrementAndGet();
    }

    class BagExecuteWithFailover<R> extends AbstractExecuteWithFailoverImpl<CL, R> {
        private int retryCountdown;
        private HostConnectionPool<CL> pool = null;
        private int size = 0;

        public BagExecuteWithFailover(ConnectionPoolConfiguration config) throws ConnectionException {
            super(config, monitor);

            size = topology.getAllPools().getPools().size();
            retryCountdown = Math.min(config.getMaxFailoverCount(), size);
            if (retryCountdown < 0)
                retryCountdown = size;
        }

        @Override
        public HostConnectionPool<CL> getCurrentHostConnectionPool() {
            return pool;
        }

        @Override
        public Connection<CL> borrowConnection(Operation<CL, R> operation) throws ConnectionException {
            pool = null;
            connection = BagOfConnectionsConnectionPoolImpl.this.borrowConnection(operation);
            pool = connection.getHostConnectionPool();
            return connection;
        }

        @Override
        public boolean canRetry() {
            return --retryCountdown >= 0;
        }

        @Override
        public void releaseConnection() {
            BagOfConnectionsConnectionPoolImpl.this.returnConnection(connection);
            connection = null;
        }

    }

    @Override
    public <R> ExecuteWithFailover<CL, R> newExecuteWithFailover(Operation<CL, R> op) throws ConnectionException {
        return new BagExecuteWithFailover<R>(config);
    }
}