public abstract class

AbstractHostPartitionConnectionPool

extends Object
implements ConnectionPool<CL> SimpleHostConnectionPool.Listener<CL>
/*******************************************************************************
 * Copyright 2011 Netflix
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 ******************************************************************************/
package com.netflix.astyanax.connectionpool.impl;

import java.math.BigInteger;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;

import org.cliffc.high_scale_lib.NonBlockingHashMap;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.netflix.astyanax.connectionpool.ConnectionFactory;
import com.netflix.astyanax.connectionpool.ConnectionPool;
import com.netflix.astyanax.connectionpool.ConnectionPoolConfiguration;
import com.netflix.astyanax.connectionpool.ConnectionPoolMonitor;
import com.netflix.astyanax.connectionpool.ExecuteWithFailover;
import com.netflix.astyanax.connectionpool.Host;
import com.netflix.astyanax.connectionpool.HostConnectionPool;
import com.netflix.astyanax.connectionpool.LatencyScoreStrategy;
import com.netflix.astyanax.connectionpool.LatencyScoreStrategy.Listener;
import com.netflix.astyanax.connectionpool.Operation;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.OperationException;
import com.netflix.astyanax.retry.RetryPolicy;

/**
 * Base for all connection pools that keep a separate pool of connections for
 * each host.
 * 
 * @author elandau
 * 
 * @param <CL>
 */
public abstract class AbstractHostPartitionConnectionPool<CL> implements ConnectionPool<CL>,
        SimpleHostConnectionPool.Listener<CL> {
    protected final NonBlockingHashMap<Host, HostConnectionPool<CL>> hosts;
    protected final ConnectionPoolConfiguration config;
    protected final ConnectionFactory<CL> factory;
    protected final ConnectionPoolMonitor monitor;
    private final LatencyScoreStrategy latencyScoreStrategy;
    protected final Topology<CL> topology;

    public AbstractHostPartitionConnectionPool(ConnectionPoolConfiguration config, ConnectionFactory<CL> factory,
            ConnectionPoolMonitor monitor) {
        this.config = config;
        this.factory = factory;
        this.hosts = new NonBlockingHashMap<Host, HostConnectionPool<CL>>();
        this.monitor = monitor;
        this.latencyScoreStrategy = config.getLatencyScoreStrategy();
        this.topology = new TokenPartitionedTopology<CL>(this.latencyScoreStrategy);
    }

    @Override
    public void start() {
        ConnectionPoolMBeanManager.getInstance().registerMonitor(config.getName(), this);

        String seeds = config.getSeeds();
        if (seeds != null && !seeds.isEmpty()) {
            Map<BigInteger, List<Host>> ring = Maps.newHashMap();
            ring.put(new BigInteger("0"), config.getSeedHosts());
            setHosts(ring);
        }

        latencyScoreStrategy.start(new Listener() {
            @Override
            public void onUpdate() {
                rebuildPartitions();
            }

            @Override
            public void onReset() {
                rebuildPartitions();
            }
        });
    }

    @Override
    public void shutdown() {
        ConnectionPoolMBeanManager.getInstance().unregisterMonitor(config.getName(), this);

        for (Entry<Host, HostConnectionPool<CL>> pool : hosts.entrySet()) {
            pool.getValue().shutdown();
        }

        latencyScoreStrategy.shutdown();
    }

    protected HostConnectionPool<CL> newHostConnectionPool(Host host, ConnectionFactory<CL> factory,
            ConnectionPoolConfiguration config) {
        return new SimpleHostConnectionPool<CL>(host, factory, monitor, config, this);
    }

    @Override
    public void onHostDown(HostConnectionPool<CL> pool) {
        topology.suspendPool(pool);
    }

    @Override
    public void onHostUp(HostConnectionPool<CL> pool) {
        topology.resumePool(pool);
    }

    @Override
    public final boolean addHost(Host host, boolean refresh) {
        // Already exists
        if (hosts.get(host) != null) {
            return false;
        }

        HostConnectionPool<CL> pool = newHostConnectionPool(host, factory, config);
        if (null == hosts.putIfAbsent(host, pool)) {
            try {
                monitor.onHostAdded(host, pool);
                if (refresh) {
                    topology.addPool(pool);
                    rebuildPartitions();
                }
                pool.growConnections(config.getInitConnsPerHost());
            }
            catch (Exception e) {
                // Ignore, pool will have been marked down internally
            }
            return true;
        }
        else {
            return false;
        }
    }

    @Override
    public boolean isHostUp(Host host) {
        HostConnectionPool<CL> pool = hosts.get(host);
        if (pool != null) {
            return !pool.isShutdown();
        }
        return false;
    }

    @Override
    public boolean hasHost(Host host) {
        return hosts.containsKey(host);
    }

    @Override
    public List<HostConnectionPool<CL>> getActivePools() {
        return ImmutableList.copyOf(topology.getAllPools().getPools());
    }

    @Override
    public boolean removeHost(Host host, boolean refresh) {
        HostConnectionPool<CL> pool = hosts.remove(host);
        if (pool != null) {
            monitor.onHostRemoved(host);
            pool.shutdown();
            if (refresh) {
                topology.removePool(pool);
                rebuildPartitions();
            }
            return true;
        }
        else {
            return false;
        }
    }

    @Override
    public HostConnectionPool<CL> getHostPool(Host host) {
        return hosts.get(host);
    }

    @Override
    public synchronized void setHosts(Map<BigInteger, List<Host>> ring) {
        // Temporary list of hosts to remove. Any host not in the new ring will
        // be removed
        Set<Host> hostsToRemove = Sets.newHashSet(hosts.keySet());

        // Add new hosts.
        for (Map.Entry<BigInteger, List<Host>> entry : ring.entrySet()) {
            for (Host host : entry.getValue()) {
                addHost(host, false);
                hostsToRemove.remove(host);
            }
        }

        // Remove any hosts that are no longer in the ring
        for (Host host : hostsToRemove) {
            removeHost(host, false);
        }

        // Recreate the toplogy
        Map<BigInteger, Collection<HostConnectionPool<CL>>> tokens = Maps.newHashMap();
        for (Map.Entry<BigInteger, List<Host>> entry : ring.entrySet()) {
            Set<HostConnectionPool<CL>> pools = Sets.newHashSet();
            for (Host host : entry.getValue()) {
                pools.add(getHostPool(host));
            }
            tokens.put(entry.getKey(), pools);
        }

        topology.setPools(tokens);
    }

    @Override
    public <R> OperationResult<R> executeWithFailover(Operation<CL, R> op, RetryPolicy retry)
            throws ConnectionException {
        retry.begin();
        ConnectionException lastException = null;
        do {
            try {
                OperationResult<R> result = newExecuteWithFailover(op).tryOperation(op);
                retry.success();
                return result;
            }
            catch (OperationException e) {
                retry.failure(e);
                throw e;
            }
            catch (ConnectionException e) {
                lastException = e;
            }
        } while (retry.allowRetry());
        retry.failure(lastException);
        throw lastException;
    }

    /**
     * Return a new failover context. The context captures the connection pool
     * state and implements the necessary failover logic.
     * 
     * @param <R>
     * @return
     * @throws ConnectionException
     */
    protected abstract <R> ExecuteWithFailover<CL, R> newExecuteWithFailover(Operation<CL, R> op)
            throws ConnectionException;

    /**
     * Called every time a host is added, removed or is marked as down
     */
    protected void rebuildPartitions() {
        topology.refresh();
    }
}