public class

HostConnectionPoolPartition

extends Object
package com.netflix.astyanax.connectionpool.impl;

import java.math.BigInteger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.cliffc.high_scale_lib.NonBlockingHashSet;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.netflix.astyanax.connectionpool.HostConnectionPool;
import com.netflix.astyanax.connectionpool.LatencyScoreStrategy;

public class HostConnectionPoolPartition<CL> {

    private final BigInteger id;
    private final AtomicBoolean prioritize = new AtomicBoolean(false);
    private final NonBlockingHashSet<HostConnectionPool<CL>> pools = new NonBlockingHashSet<HostConnectionPool<CL>>();
    private final AtomicReference<List<HostConnectionPool<CL>>> activePools = new AtomicReference<List<HostConnectionPool<CL>>>();
    private final LatencyScoreStrategy strategy;

    public HostConnectionPoolPartition(BigInteger id, LatencyScoreStrategy strategy) {
        this.id = id;
        this.strategy = strategy;
        this.activePools.set(new ArrayList<HostConnectionPool<CL>>());
    }

    /**
     * Sets all pools for this partition. Removes old partitions and adds new
     * one.
     * 
     * @param pools
     */
    public synchronized boolean setPools(Collection<HostConnectionPool<CL>> pools) {
        Set<HostConnectionPool<CL>> toRemove = Sets.newHashSet(this.pools);

        boolean didChange = false;
        for (HostConnectionPool<CL> pool : pools) {
            didChange |= this.pools.add(pool);
            toRemove.remove(pool);
        }

        for (HostConnectionPool<CL> pool : toRemove) {
            didChange |= this.pools.remove(pool);
        }

        if (didChange)
            refresh();
        return didChange;
    }

    public synchronized boolean addPool(HostConnectionPool<CL> pool) {
        if (this.pools.add(pool)) {
            refresh();
            return true;
        }
        return false;
    }

    public synchronized boolean removePool(HostConnectionPool<CL> pool) {
        if (this.pools.remove(pool)) {
            refresh();
            return true;
        }
        return false;
    }

    /**
     * Token or shard identifying this partition.
     * 
     * @return
     */
    public BigInteger id() {
        return this.id;
    }

    public List<HostConnectionPool<CL>> getPools() {
        return this.activePools.get();
    }

    public boolean isSorted() {
        return prioritize.get();
    }

    public void refresh() {
        List<HostConnectionPool<CL>> pools = Lists.newArrayList(this.pools);
        Iterator<HostConnectionPool<CL>> iter = pools.iterator();
        while (iter.hasNext()) {
            if (iter.next().isShutdown())
                iter.remove();
        }
        this.activePools.set(strategy.sortAndfilterPartition(pools, prioritize));
    }

    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append("HostConnectionPoolPartition[");
        sb.append(id).append(": ");
        for (HostConnectionPool<CL> pool : getPools()) {
            sb.append(pool.getHost().getHostName()).append(",");
        }
        sb.append("]");
        return sb.toString();
    }
}