public class

TokenAwareConnectionPoolTest

extends BaseConnectionPoolTest
package com.netflix.astyanax.connectionpool.impl;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.netflix.astyanax.connectionpool.ConnectionPool;
import com.netflix.astyanax.connectionpool.ConnectionPoolConfiguration;
import com.netflix.astyanax.connectionpool.Host;
import com.netflix.astyanax.connectionpool.HostConnectionPool;
import com.netflix.astyanax.connectionpool.Operation;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.OperationException;
import com.netflix.astyanax.retry.RetryPolicy;
import com.netflix.astyanax.retry.RunOnce;
import com.netflix.astyanax.test.TestClient;
import com.netflix.astyanax.test.TestConnectionFactory;
import com.netflix.astyanax.test.TestConstants;
import com.netflix.astyanax.test.TestHostType;
import com.netflix.astyanax.test.TestOperation;
import com.netflix.astyanax.test.TokenTestOperation;
import com.netflix.astyanax.util.TokenGenerator;

public class TokenAwareConnectionPoolTest extends BaseConnectionPoolTest {
    private static Logger LOG = LoggerFactory
            .getLogger(TokenAwareConnectionPoolTest.class);

    private static Operation<TestClient, String> dummyOperation = new TestOperation();


    protected ConnectionPool<TestClient> createPool() {
        ConnectionPoolConfiguration config = new ConnectionPoolConfigurationImpl(
                TestConstants.CLUSTER_NAME + "_" + TestConstants.KEYSPACE_NAME);

        CountingConnectionPoolMonitor monitor = new CountingConnectionPoolMonitor();

        return new TokenAwareConnectionPoolImpl<TestClient>(
                config, new TestConnectionFactory(config, monitor), monitor);

    }


    @Test
    public void testTokenMappingForMidRangeTokens() throws ConnectionException {
        ConnectionPool<TestClient> cp = createPool();

        Map<BigInteger, List<Host>> ring1 = makeRing(3, 1, 1);
        cp.setHosts(ring1);

        BigInteger threeNodeRingIncrement = TokenGenerator.MAXIMUM.divide(new BigInteger("3"));

        Operation<TestClient, String> firstHostOp = new TokenTestOperation(BigInteger.ZERO);
        Operation<TestClient, String> secondHostOp = new TokenTestOperation(BigInteger.ONE);
        Operation<TestClient, String> thirdHostOp = new TokenTestOperation(threeNodeRingIncrement.add(BigInteger.ONE));

        RetryPolicy retryPolicy = new RunOnce();

        OperationResult<String> result = cp.executeWithFailover(firstHostOp, retryPolicy);
        assertNotNull(result);
        assertEquals("127.0.1.0",result.getHost().getIpAddress());

        result = cp.executeWithFailover(secondHostOp, retryPolicy);
        assertNotNull(result);
        assertEquals("127.0.1.1",result.getHost().getIpAddress());

        result = cp.executeWithFailover(thirdHostOp, retryPolicy);
        assertNotNull(result);
        assertEquals("127.0.1.2",result.getHost().getIpAddress());

    }

    @Test
    public void testTokenMappingForOrdinalTokens() throws ConnectionException {
        ConnectionPool<TestClient> cp = createPool();

        // the following will generate a ring of two nodes with the following characteristics;
        //    node1 - ip = 127.0.1.0, token ownership range = (1200 , 0]
        //    node1 - ip = 127.0.1.1, token ownership range = (0 , 600]
        //    node1 - ip = 127.0.1.1, token ownership range = (600 , 1200]
        Map<BigInteger, List<Host>> ring1 = makeRing(3, 1, 1, BigInteger.ZERO, new BigInteger("1800"));
        cp.setHosts(ring1);

        BigInteger threeNodeRingIncrement = new BigInteger("600");

        Operation<TestClient, String> firstHostOp = new TokenTestOperation(BigInteger.ZERO);
        Operation<TestClient, String> secondHostOp = new TokenTestOperation(threeNodeRingIncrement);
        Operation<TestClient, String> thirdHostOp = new TokenTestOperation(threeNodeRingIncrement.multiply(new BigInteger("2")));
        Operation<TestClient, String> maxTokenHostOp = new TokenTestOperation(threeNodeRingIncrement.multiply(new BigInteger("3")));

        RetryPolicy retryPolicy = new RunOnce();

        OperationResult<String> result = cp.executeWithFailover(firstHostOp, retryPolicy);
        assertNotNull(result);
        assertEquals("127.0.1.0",result.getHost().getIpAddress());

        result = cp.executeWithFailover(secondHostOp, retryPolicy);
        assertNotNull(result);
        assertEquals("127.0.1.1",result.getHost().getIpAddress());

        result = cp.executeWithFailover(thirdHostOp, retryPolicy);
        assertNotNull(result);
        assertEquals("127.0.1.2",result.getHost().getIpAddress());

        result = cp.executeWithFailover(maxTokenHostOp, retryPolicy);
        assertNotNull(result);
        assertEquals("127.0.1.0",result.getHost().getIpAddress());

    }


    @Test
    public void testTokenToHostMappingInWrappedRange() throws ConnectionException {
        ConnectionPool<TestClient> cp = createPool();


        // the following will generate a ring of two nodes with the following characteristics;
        //    node1 - ip = 127.0.1.0, token ownership range = (510 , 10]
        //    node1 - ip = 127.0.1.1, token ownership range = (10 , 510]
        Map<BigInteger, List<Host>> ring1 = makeRing(2, 1, 1, BigInteger.TEN, new BigInteger("1010"));
        cp.setHosts(ring1);

        Operation<TestClient, String> op = new TokenTestOperation(BigInteger.ZERO);

        RetryPolicy retryPolicy = new RunOnce();

        OperationResult<String> result = cp.executeWithFailover(op, retryPolicy);
        assertNotNull(result);

        // since token ownership wraps node2 should own token 0
        assertEquals("127.0.1.0",result.getHost().getIpAddress());
    }


     @Test
    public void testTokenToHostMappingOutsideOfRing() throws ConnectionException {
        ConnectionPool<TestClient> cp = createPool();

        // the following will generate a ring of two nodes with the following characteristics;
        //    node1 - ip = 127.0.1.0, token ownership range = (500 , 0]
        //    node1 - ip = 127.0.1.1, token ownership range = (0 , 500]
        Map<BigInteger, List<Host>> ring1 = makeRing(2, 1, 1, BigInteger.ZERO, new BigInteger("1000"));
        cp.setHosts(ring1);

        Operation<TestClient, String> op = new TokenTestOperation(new BigInteger("1250"));

        RetryPolicy retryPolicy = new RunOnce();

        OperationResult<String> result = cp.executeWithFailover(op, retryPolicy);
        assertNotNull(result);

        // requests for tokens outside the ring will be serviced by host associated with
        // 1st partition in ring
        assertEquals("127.0.1.0",result.getHost().getIpAddress());
    }



    @Test
    public void changeRingTest() {
        ConnectionPool<TestClient> cp = createPool();
        Map<BigInteger, List<Host>> ring1 = makeRing(6, 3, 1);
        Map<BigInteger, List<Host>> ring2 = makeRing(6, 3, 2);

        cp.setHosts(ring1);
        List<HostConnectionPool<TestClient>> hosts1 = cp.getActivePools();

        cp.setHosts(ring2);
        List<HostConnectionPool<TestClient>> hosts2 = cp.getActivePools();

        System.out.println(hosts1);
        System.out.println(hosts2);
    }

    private Map<BigInteger, List<Host>> makeRing(int nHosts,
            int replication_factor, int id) {
        return makeRing(nHosts,replication_factor,id,TokenGenerator.MINIMUM,TokenGenerator.MAXIMUM);
    }

    private Map<BigInteger, List<Host>> makeRing(int nHosts,
            int replication_factor, int id, BigInteger minInitialToken, BigInteger maxInitialToken) {
        List<Host> hosts = Lists.newArrayList();
        for (int i = 0; i < nHosts; i++) {
            hosts.add(new Host("127.0." + id + "." + i + ":"
                    + TestHostType.GOOD_FAST.ordinal(), 7160));
        }

        Map<BigInteger, List<Host>> ring = Maps.newHashMap();
        for (int i = 0; i < nHosts; i++) {
            String token = TokenGenerator.initialToken(nHosts, i, minInitialToken, maxInitialToken);
            System.out.println(token);
            List<Host> replicas = new ArrayList<Host>();
            for (int j = 0; j < replication_factor; j++) {
                Host host = hosts.get((i + j) % nHosts);
                System.out.println(" " + host);
                replicas.add(host);
            }
            ring.put(new BigInteger(token), replicas);
        }

        return ring;
    }
}