public class

SmaLatencyScoreStrategyImpl

extends Object
implements LatencyScoreStrategy
package com.netflix.astyanax.connectionpool.impl;

import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import org.cliffc.high_scale_lib.NonBlockingHashSet;

import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.astyanax.connectionpool.HostConnectionPool;
import com.netflix.astyanax.connectionpool.LatencyScoreStrategy;

public class SmaLatencyScoreStrategyImpl implements LatencyScoreStrategy {

    private final ScheduledExecutorService executor;
    private final Set<Instance> instances;
    private final int updateInterval;
    private final int resetInterval;
    private final int windowSize;
    private final double badnessThreshold;

    public SmaLatencyScoreStrategyImpl(int updateInterval, int resetInterval, int windowSize, double badnessThreshold) {
        this.updateInterval = updateInterval;
        this.resetInterval = resetInterval;
        this.badnessThreshold = badnessThreshold;
        this.windowSize = windowSize;

        this.executor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder().setDaemon(true).build());
        this.instances = new NonBlockingHashSet<Instance>();
    }

    protected Instance internalCreateInstance() {
        return new SmaLatencyScoreStrategyInstanceImpl(this);
    }

    public final Instance createInstance() {
        Instance instance = internalCreateInstance();
        this.instances.add(instance);
        return instance;
    }

    public int getUpdateInterval() {
        return updateInterval;
    }

    public int getResetInterval() {
        return resetInterval;
    }

    public double getBadnessThreshold() {
        return badnessThreshold;
    }

    public int getWindowSize() {
        return windowSize;
    }

    @Override
    public void start(final Listener listener) {
        executor.schedule(new Runnable() {
            @Override
            public void run() {
                Thread.currentThread().setName(getName() + "_ScoreUpdate");
                update();
                listener.onUpdate();
                executor.schedule(this, getUpdateInterval(), TimeUnit.MILLISECONDS);
            }
        }, new Random().nextInt(getUpdateInterval()), TimeUnit.MILLISECONDS);

        executor.schedule(new Runnable() {
            @Override
            public void run() {
                Thread.currentThread().setName(getName() + "_ScoreReset");
                reset();
                listener.onReset();
                executor.schedule(this, getResetInterval(), TimeUnit.MILLISECONDS);
            }
        }, new Random().nextInt(getResetInterval()), TimeUnit.MILLISECONDS);
    }

    @Override
    public void shutdown() {
        executor.shutdown();
    }

    @Override
    public void removeInstance(Instance instance) {
        instances.remove(instance);
    }

    /**
     * Comparator used to sort hosts by score
     */
    private Comparator<HostConnectionPool<?>> scoreComparator = new Comparator<HostConnectionPool<?>>() {
        @Override
        public int compare(HostConnectionPool<?> p1, HostConnectionPool<?> p2) {
            double score1 = p1.getScore();
            double score2 = p2.getScore();
            if (score1 < score2) {
                return -1;
            }
            else if (score1 > score2) {
                return 1;
            }
            return 0;
        }
    };

    @Override
    public <CL> List<HostConnectionPool<CL>> sortAndfilterPartition(List<HostConnectionPool<CL>> srcPools,
            AtomicBoolean prioritized) {
        List<HostConnectionPool<CL>> pools = Lists.newArrayList(srcPools);
        Collections.sort(pools, scoreComparator);
        prioritized.set(true);
        return pools;
    }

    public String getName() {
        return "SMA";
    }

    public String toString() {
        return new StringBuilder().append(getName() + "[").append("win=").append(getWindowSize()).append(",rst=")
                .append(getResetInterval()).append(",upd=").append(getUpdateInterval()).append(",trh=")
                .append(getBadnessThreshold()).append("]").toString();
    }

    @Override
    public void update() {
        for (Instance inst : instances) {
            inst.update();
        }
    }

    @Override
    public void reset() {
        for (Instance inst : instances) {
            inst.reset();
        }
    }
}