public class

SimpleHostConnectionPoolTest

extends Object
package com.netflix.astyanax.connectionpool.impl;

import junit.framework.Assert;

import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.astyanax.connectionpool.Connection;
import com.netflix.astyanax.connectionpool.ConnectionPoolConfiguration;
import com.netflix.astyanax.connectionpool.Host;
import com.netflix.astyanax.connectionpool.HostConnectionPool;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.HostDownException;
import com.netflix.astyanax.connectionpool.exceptions.PoolTimeoutException;
import com.netflix.astyanax.test.TestClient;
import com.netflix.astyanax.test.TestConnectionFactory;
import com.netflix.astyanax.test.TestHostType;

public class SimpleHostConnectionPoolTest {
    private static Logger LOG = LoggerFactory
            .getLogger(SimpleHostConnectionPoolTest.class);

    private static int WAIT_TIMEOUT = 100;

    public static class NoOpListener implements
            SimpleHostConnectionPool.Listener<TestClient> {
        @Override
        public void onHostDown(HostConnectionPool<TestClient> pool) {
        }

        @Override
        public void onHostUp(HostConnectionPool<TestClient> pool) {
        }

    }

    @Test
    public void testAddHost() {
        Host host = new Host("127.0.0.1", TestHostType.GOOD_FAST.ordinal());

        CountingConnectionPoolMonitor monitor = new CountingConnectionPoolMonitor();

        ConnectionPoolConfiguration config = createConfig();
        SimpleHostConnectionPool<TestClient> pool = new SimpleHostConnectionPool<TestClient>(
                host, new TestConnectionFactory(config, monitor), monitor,
                config, new NoOpListener());
        Assert.assertEquals(0, pool.getActiveConnectionCount());
        Assert.assertEquals(false, pool.isShutdown());
        Assert.assertEquals(0, pool.getIdleConnectionCount());

        try {
            pool.growConnections(1);
        } catch (ConnectionException e) {
            LOG.error(e.getMessage());
            Assert.fail();
        } catch (InterruptedException e) {
            LOG.error(e.getMessage());
            Assert.fail();
        }
        Assert.assertEquals(1, pool.getActiveConnectionCount());
        Assert.assertEquals(false, pool.isShutdown());
        Assert.assertEquals(1, pool.getIdleConnectionCount());
    }

    @Test
    public void testAddHostWithCheckedException() {
        Host host = new Host("127.0.0.1",
                TestHostType.CONNECT_TIMEOUT.ordinal());

        ConnectionPoolConfiguration config = createConfig();
        CountingConnectionPoolMonitor monitor = new CountingConnectionPoolMonitor();
        SimpleHostConnectionPool<TestClient> pool = new SimpleHostConnectionPool<TestClient>(
                host, new TestConnectionFactory(config, monitor), monitor,
                config, new NoOpListener());

        try {
            pool.growConnections(1);
            Assert.fail();
        } catch (InterruptedException e) {
            LOG.error(e.getMessage());
            Assert.fail();
        } catch (ConnectionException e) {
        }
        Assert.assertEquals(0, pool.getActiveConnectionCount());
        Assert.assertEquals(true, pool.isShutdown());
        Assert.assertEquals(0, pool.getIdleConnectionCount());
    }

    @Test
    public void testAddHostWithUncheckedException() {
        Host host = new Host("127.0.0.1",
                TestHostType.CONNECT_WITH_UNCHECKED_EXCEPTION.ordinal());

        ConnectionPoolConfiguration config = createConfig();
        CountingConnectionPoolMonitor monitor = new CountingConnectionPoolMonitor();
        SimpleHostConnectionPool<TestClient> pool = new SimpleHostConnectionPool<TestClient>(
                host, new TestConnectionFactory(config, monitor), monitor,
                config, new NoOpListener());

        try {
            pool.growConnections(1);
            Assert.fail();
        } catch (InterruptedException e) {
            LOG.error(e.getMessage());
            Assert.fail();
        } catch (ConnectionException e) {
        }
        Assert.assertEquals(0, pool.getActiveConnectionCount());
        Assert.assertEquals(true, pool.isShutdown());
        Assert.assertEquals(0, pool.getIdleConnectionCount());
    }

    @Test
    public void testGrowConnections() {
        Host host = new Host("127.0.0.1", TestHostType.GOOD_FAST.ordinal());

        ConnectionPoolConfiguration config = createConfig();
        CountingConnectionPoolMonitor monitor = new CountingConnectionPoolMonitor();
        SimpleHostConnectionPool<TestClient> pool = new SimpleHostConnectionPool<TestClient>(
                host, new TestConnectionFactory(config, monitor), monitor,
                config, new NoOpListener());

        try {
            pool.growConnections(2);
        } catch (InterruptedException e) {
            LOG.error(e.getMessage());
            Assert.fail();
        } catch (ConnectionException e) {
            LOG.error(e.getMessage());
            Assert.fail();
        }
        Assert.assertEquals(2, pool.getActiveConnectionCount());
        Assert.assertEquals(false, pool.isShutdown());
        Assert.assertEquals(2, pool.getIdleConnectionCount());

        try {
            pool.growConnections(2);
            Assert.fail();
        } catch (InterruptedException e) {
            LOG.error(e.getMessage());
            Assert.fail();
        } catch (ConnectionException e) {
        }
        Assert.assertEquals(2, pool.getActiveConnectionCount());
        Assert.assertEquals(false, pool.isShutdown());
    }

    @Test
    public void testShutdown() {
        Host host = new Host("127.0.0.1", TestHostType.GOOD_FAST.ordinal());

        ConnectionPoolConfigurationImpl config = createConfig();
        config.setRetryBackoffStrategy(new FixedRetryBackoffStrategy(200, 2000));
        config.setMaxConnsPerHost(3);
        config.setMaxPendingConnectionsPerHost(2);
        CountingConnectionPoolMonitor monitor = new CountingConnectionPoolMonitor();
        SimpleHostConnectionPool<TestClient> pool = new SimpleHostConnectionPool<TestClient>(
                host, new TestConnectionFactory(config, monitor), monitor,
                config, new NoOpListener());

        try {
            pool.growConnections(2);
        } catch (InterruptedException e) {
            LOG.error(e.getMessage());
            Assert.fail();
        } catch (ConnectionException e) {
            LOG.error(e.getMessage());
            Assert.fail();
        }
        Assert.assertEquals(2, pool.getActiveConnectionCount());
        Assert.assertEquals(false, pool.isShutdown());
        Assert.assertEquals(2, pool.getIdleConnectionCount());

        pool.markAsDown(null);

        Assert.assertEquals(0, pool.getActiveConnectionCount());
        Assert.assertEquals(0, pool.getIdleConnectionCount());
        Assert.assertEquals(true, pool.isShutdown());

        try {
            pool.growConnections(1);
            Assert.fail();
        } catch (HostDownException e) {
        } catch (Exception e) {
            Assert.fail();
        }

        Assert.assertEquals(0, pool.getActiveConnectionCount());
        Assert.assertEquals(0, pool.getIdleConnectionCount());
        Assert.assertEquals(true, pool.isShutdown());

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        Assert.assertEquals(1, pool.getActiveConnectionCount());
        Assert.assertEquals(1, pool.getIdleConnectionCount());
        Assert.assertEquals(false, pool.isShutdown());

        try {
            pool.growConnections(2);
        } catch (InterruptedException e) {
            LOG.error(e.getMessage());
            Assert.fail();
        } catch (ConnectionException e) {
            LOG.error(e.getMessage());
            Assert.fail();
        }
        Assert.assertEquals(3, pool.getActiveConnectionCount());
        Assert.assertEquals(false, pool.isShutdown());
        Assert.assertEquals(3, pool.getIdleConnectionCount());
    }

    @Test
    public void testCloseOrReturnConnection() {
        Host host = new Host("127.0.0.1", TestHostType.GOOD_FAST.ordinal());

        ConnectionPoolConfiguration config = createConfig();
        CountingConnectionPoolMonitor monitor = new CountingConnectionPoolMonitor();
        SimpleHostConnectionPool<TestClient> pool = new SimpleHostConnectionPool<TestClient>(
                host, new TestConnectionFactory(config, monitor), monitor,
                config, new NoOpListener());

        try {
            pool.growConnections(2);

            Assert.assertEquals(2, pool.getActiveConnectionCount());
            Assert.assertEquals(false, pool.isShutdown());
            Assert.assertEquals(2, pool.getIdleConnectionCount());

            Connection<TestClient> connection = pool.borrowConnection(0);

            Assert.assertEquals(2, pool.getActiveConnectionCount());
            Assert.assertEquals(false, pool.isShutdown());
            Assert.assertEquals(1, pool.getIdleConnectionCount());

            pool.returnConnection(connection);

            Assert.assertEquals(2, pool.getActiveConnectionCount());
            Assert.assertEquals(false, pool.isShutdown());
            Assert.assertEquals(2, pool.getIdleConnectionCount());

            connection = pool.borrowConnection(0);

            Assert.assertEquals(2, pool.getActiveConnectionCount());
            Assert.assertEquals(false, pool.isShutdown());
            Assert.assertEquals(1, pool.getIdleConnectionCount());

        } catch (InterruptedException e) {
            LOG.error(e.getMessage());
            Assert.fail();
        } catch (ConnectionException e) {
            LOG.error(e.getMessage());
            Assert.fail();
        }
    }

    @Test
    public void testAsyncOpenConnection() {
        Host host = new Host("127.0.0.1", TestHostType.GOOD_FAST.ordinal());

        ConnectionPoolConfigurationImpl config = createConfig();
        config.setMaxConnsPerHost(1);

        // Open the first connection
        CountingConnectionPoolMonitor monitor = new CountingConnectionPoolMonitor();
        SimpleHostConnectionPool<TestClient> pool = new SimpleHostConnectionPool<TestClient>(
                host, new TestConnectionFactory(config, monitor), monitor,
                config, new NoOpListener());

        try {
            Connection<TestClient> connection = pool
                    .borrowConnection(WAIT_TIMEOUT);
        } catch (ConnectionException e) {
            LOG.error(e.getMessage());
            Assert.fail();
        }
        Assert.assertEquals(1, pool.getActiveConnectionCount());
        Assert.assertEquals(false, pool.isShutdown());
        Assert.assertEquals(0, pool.getIdleConnectionCount());

        // Subsequent open should fail
        try {
            Connection<TestClient> connection = pool
                    .borrowConnection(WAIT_TIMEOUT);
            Assert.fail();
        } catch (PoolTimeoutException e) {
        } catch (ConnectionException e) {
            LOG.error(e.getMessage());
            Assert.fail();
        }
        Assert.assertEquals(1, pool.getActiveConnectionCount());
        Assert.assertEquals(false, pool.isShutdown());
        Assert.assertEquals(0, pool.getIdleConnectionCount());
    }

    @Test
    @Ignore
    public void testAsyncOpenConnectionWithShutdown() {
        Host host = new Host("127.0.0.1", TestHostType.GOOD_SLOW.ordinal());

        ConnectionPoolConfigurationImpl config = createConfig();
        config.setMaxConnsPerHost(1);

        // Open the first connection
        CountingConnectionPoolMonitor monitor = new CountingConnectionPoolMonitor();
        SimpleHostConnectionPool<TestClient> pool = new SimpleHostConnectionPool<TestClient>(
                host, new TestConnectionFactory(config, monitor), monitor,
                config, new NoOpListener());

        try {
            Connection<TestClient> connection = pool.borrowConnection(1);
        } catch (PoolTimeoutException e) {
        } catch (ConnectionException e) {
            LOG.error(e.getMessage());
            Assert.fail();
        }
        pool.markAsDown(null);
        Assert.assertEquals(1, pool.getActiveConnectionCount());
        Assert.assertEquals(true, pool.isShutdown());
        Assert.assertEquals(0, pool.getIdleConnectionCount());

        // This should fail because we shut down
        try {
            Connection<TestClient> connection = pool
                    .borrowConnection(WAIT_TIMEOUT);
            Assert.fail();
        } catch (HostDownException e) {
        } catch (ConnectionException e) {
            LOG.error(e.getMessage());
            Assert.fail();
        }

        // Count should still be 1 because we have a pending connection
        Assert.assertEquals(1, pool.getActiveConnectionCount());
        Assert.assertEquals(true, pool.isShutdown());
        Assert.assertEquals(0, pool.getIdleConnectionCount());

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }

        Assert.assertEquals(0, pool.getActiveConnectionCount());
        Assert.assertEquals(true, pool.isShutdown());
        Assert.assertEquals(0, pool.getIdleConnectionCount());
    }

    @Test
    public void testAsyncOpenConnectionWithCheckedException() {
        Host host = new Host("127.0.0.1",
                TestHostType.CONNECT_TIMEOUT.ordinal());

        ConnectionPoolConfigurationImpl config = createConfig();

        // Open the first connection
        CountingConnectionPoolMonitor monitor = new CountingConnectionPoolMonitor();
        SimpleHostConnectionPool<TestClient> pool = new SimpleHostConnectionPool<TestClient>(
                host, new TestConnectionFactory(config, monitor), monitor,
                config, new NoOpListener());

        try {
            Connection<TestClient> connection = pool
                    .borrowConnection(WAIT_TIMEOUT);
            Assert.fail();
        } catch (ConnectionException e) {
        }
        Assert.assertEquals(0, pool.getActiveConnectionCount());
        Assert.assertEquals(true, pool.isShutdown());
        Assert.assertEquals(0, pool.getIdleConnectionCount());
    }

    @Test
    public void testAsyncOpenConnectionWithUnCheckedException() {
        Host host = new Host("127.0.0.1",
                TestHostType.CONNECT_WITH_UNCHECKED_EXCEPTION.ordinal());

        ConnectionPoolConfigurationImpl config = createConfig();

        // Open the first connection
        CountingConnectionPoolMonitor monitor = new CountingConnectionPoolMonitor();
        SimpleHostConnectionPool<TestClient> pool = new SimpleHostConnectionPool<TestClient>(
                host, new TestConnectionFactory(config, monitor), monitor,
                config, new NoOpListener());

        try {
            Connection<TestClient> connection = pool
                    .borrowConnection(WAIT_TIMEOUT);
            Assert.fail();
        } catch (ConnectionException e) {
        }
        Assert.assertEquals(0, pool.getActiveConnectionCount());
        Assert.assertEquals(true, pool.isShutdown());
        Assert.assertEquals(0, pool.getIdleConnectionCount());
    }

    public ConnectionPoolConfigurationImpl createConfig() {
        ConnectionPoolConfigurationImpl config = new ConnectionPoolConfigurationImpl(
                "cluster_keyspace");
        config.setMaxConnsPerHost(2);
        config.setInitConnsPerHost(1);
        return config;
    }

}