public class

TokenAwareConnectionPoolImpl

extends AbstractHostPartitionConnectionPool<CL>
/*******************************************************************************
 * Copyright 2011 Netflix
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 ******************************************************************************/
package com.netflix.astyanax.connectionpool.impl;

import java.math.BigInteger;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;

import com.netflix.astyanax.connectionpool.ConnectionFactory;
import com.netflix.astyanax.connectionpool.ConnectionPoolConfiguration;
import com.netflix.astyanax.connectionpool.ConnectionPoolMonitor;
import com.netflix.astyanax.connectionpool.ExecuteWithFailover;
import com.netflix.astyanax.connectionpool.HostConnectionPool;
import com.netflix.astyanax.connectionpool.Operation;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.NoAvailableHostsException;

/**
 * Connection pool that partitions connections by the hosts which own the token
 * being operated on. When a token is not available or an operation is known to
 * span multiple tokens (such as a batch mutate or an index query) host pools
 * are picked using round robin.
 * 
 * This implementation takes an optimistic approach which is optimized for a
 * well functioning ring with all nodes up and keeps downed hosts in the
 * internal data structures.
 * 
 * @author elandau
 * 
 * @param <CL>
 */
public class TokenAwareConnectionPoolImpl<CL> extends AbstractHostPartitionConnectionPool<CL> {

    private AtomicInteger roundRobinCounter = new AtomicInteger(new Random().nextInt(997));

    public TokenAwareConnectionPoolImpl(ConnectionPoolConfiguration configuration, ConnectionFactory<CL> factory,
            ConnectionPoolMonitor monitor) {
        super(configuration, factory, monitor);
    }

    @SuppressWarnings("unchecked")
    public <R> ExecuteWithFailover<CL, R> newExecuteWithFailover(Operation<CL, R> op) throws ConnectionException {
        List<HostConnectionPool<CL>> pools;
        boolean isSorted = false;

        if (op.getPinnedHost() != null) {
            HostConnectionPool<CL> pool = hosts.get(op.getPinnedHost());
            if (pool == null) {
                throw new NoAvailableHostsException("Host " + op.getPinnedHost() + " not active");
            }
            pools = Arrays.<HostConnectionPool<CL>> asList(pool);
        }
        else {
            HostConnectionPoolPartition<CL> partition = topology.getPartition(op.getToken());
            pools = partition.getPools();
            isSorted = partition.isSorted();
        }

        return new RoundRobinExecuteWithFailover<CL, R>(config, monitor, pools, isSorted ? 0
                : roundRobinCounter.incrementAndGet());
    }
}