/******************************************************************************* * Copyright 2011 Netflix * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. ******************************************************************************/ package com.netflix.astyanax.connectionpool.impl; import java.math.BigInteger; import java.util.ArrayList; import java.util.List; import java.util.Map; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.netflix.astyanax.connectionpool.ConnectionPool; import com.netflix.astyanax.connectionpool.ConnectionPoolConfiguration; import com.netflix.astyanax.connectionpool.Host; import com.netflix.astyanax.connectionpool.Operation; import com.netflix.astyanax.connectionpool.OperationResult; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; import com.netflix.astyanax.connectionpool.exceptions.NoAvailableHostsException; import com.netflix.astyanax.connectionpool.exceptions.OperationException; import com.netflix.astyanax.connectionpool.exceptions.TransportException; import com.netflix.astyanax.retry.ConstantBackoff; import com.netflix.astyanax.retry.RetryPolicy; import com.netflix.astyanax.retry.RunOnce; import com.netflix.astyanax.test.TestClient; import com.netflix.astyanax.test.TestConnectionFactory; import com.netflix.astyanax.test.TestConstants; import com.netflix.astyanax.test.TestHostType; import com.netflix.astyanax.test.TestOperation; import org.apache.log4j.Logger; import org.junit.Assert; import org.junit.Ignore; import org.junit.Test; @Ignore public abstract class BaseConnectionPoolTest { private static Logger LOG = Logger .getLogger(RoundRobinConnectionPoolImplTest.class); private static Operation<TestClient, String> dummyOperation = new TestOperation(); // private static ConnectionPoolConfigurationImpl config; // @BeforeClass public static void setup() { // config = new // ConnectionPoolConfigurationImpl(MockConstants.CLUSTER_NAME, // MockConstants.KEYSPACE_NAME); // config.setConnectionPoolFactory(ConnectionPoolType.ROUND_ROBIN); // config.setMaxTimeoutWhenExhausted(0); // config.setMaxFailoverCount(-1); } protected abstract ConnectionPool<TestClient> createPool(); @Test public void testAll() { ConnectionPool<TestClient> pool = createPool(); for (int i = 0; i < 5; i++) { pool.addHost( new Host("127.0." + i + ".0", TestHostType.GOOD_FAST .ordinal()), true); // pool.addHost(new Host("127.0." + i + ".1", // MockHostType.LOST_CONNECTION.ordinal())); // pool.addHost(new Host("127.0." + i + ".1", // MockHostType.CONNECT_TIMEOUT.ordinal())); // pool.addHost(new Host("127.0." + i + ".1", // MockHostType.ALWAYS_DOWN.ordinal())); // pool.addHost(new Host("127.0." + i + ".1", // MockHostType.THRASHING_TIMEOUT.ordinal())); // pool.addHost(new Host("127.0." + i + ".1", // MockHostType.CONNECT_BAD_REQUEST_EXCEPTION.ordinal())); } for (int i = 0; i < 10; i++) { try { OperationResult<String> result = pool.executeWithFailover( dummyOperation, RunOnce.get()); LOG.info(result.getHost()); } catch (OperationException e) { LOG.info(e.getMessage()); Assert.fail(); } catch (ConnectionException e) { LOG.info(e.getCause()); Assert.fail(); } } } @Test public void testRollingRestart() { ConnectionPool<TestClient> pool = createPool(); List<Host> hosts = new ArrayList<Host>(); for (int i = 0; i < 5; i++) { Host host = new Host("127.0." + i + ".0", TestHostType.GOOD_FAST.ordinal()); pool.addHost(host, true); hosts.add(host); } for (int i = 0; i < 5; i++) { try { OperationResult<String> result = pool.executeWithFailover( new TestOperation() { @Override public String execute(TestClient client) throws ConnectionException, OperationException { throw new TransportException("He's dead jim"); } }, RunOnce.get()); Assert.fail(); } catch (Exception e) { } } } @Test public void testAlwaysDown() { ConnectionPool<TestClient> pool = createPool(); pool.addHost(new Host("127.0.0.1", TestHostType.ALWAYS_DOWN.ordinal()), true); try { pool.executeWithFailover(dummyOperation, RunOnce.get()); Assert.fail(); } catch (OperationException e) { LOG.info(e.getMessage()); } catch (ConnectionException e) { LOG.info(e.getMessage()); } } @Test public void testConnectTimeout() { ConnectionPool<TestClient> pool = createPool(); pool.addHost( new Host("127.0.0.1", TestHostType.CONNECT_TIMEOUT.ordinal()), true); try { pool.executeWithFailover(dummyOperation, RunOnce.get()); Assert.fail(); } catch (OperationException e) { LOG.info(e.getMessage()); } catch (ConnectionException e) { LOG.info(e.getMessage()); } } @Test public void testOperationTimeoutTimeout() { ConnectionPool<TestClient> pool = createPool(); pool.addHost( new Host("127.0.0.1", TestHostType.OPERATION_TIMEOUT.ordinal()), true); try { pool.executeWithFailover(dummyOperation, RunOnce.get()); Assert.fail(); } catch (OperationException e) { LOG.info(e.getMessage()); } catch (ConnectionException e) { LOG.info(e.getMessage()); } } @Test public void testTimeoutTimeout() { ConnectionPool<TestClient> pool = createPool(); pool.addHost( new Host("127.0.0.1", TestHostType.SOCKET_TIMEOUT.ordinal()), true); try { pool.executeWithFailover(dummyOperation, RunOnce.get()); Assert.fail(); } catch (OperationException e) { LOG.info(e.getMessage()); } catch (ConnectionException e) { LOG.info(e.getMessage()); } } @Test public void testConnectBadRequest() { ConnectionPool<TestClient> pool = createPool(); pool.addHost(new Host("127.0.0.1", TestHostType.CONNECT_BAD_REQUEST_EXCEPTION.ordinal()), true); try { pool.executeWithFailover(dummyOperation, RunOnce.get()); Assert.fail(); } catch (OperationException e) { LOG.info(e.getMessage()); } catch (ConnectionException e) { LOG.info(e.getMessage()); } } @Test @Ignore public void testThrashingTimeout() { ConnectionPool<TestClient> pool = createPool(); pool.addHost( new Host("127.0.0.1", TestHostType.THRASHING_TIMEOUT.ordinal()), true); for (int i = 0; i < 100; i++) { try { think(1); pool.executeWithFailover(dummyOperation, RunOnce.get()); } catch (OperationException e) { LOG.info(e.getMessage()); } catch (ConnectionException e) { LOG.info(e.getMessage()); } } } @Test public void testGoodFast() { ConnectionPool<TestClient> pool = createPool(); pool.addHost(new Host("127.0.0.1", TestHostType.GOOD_SLOW.ordinal()), true); for (int i = 0; i < 10; i++) { try { pool.executeWithFailover(dummyOperation, RunOnce.get()); LOG.info("Success"); } catch (OperationException e) { LOG.info(e.getMessage()); } catch (ConnectionException e) { LOG.info(e.getMessage()); } } } @Test public void testDefaultConfig() { ConnectionPoolConfiguration config = new ConnectionPoolConfigurationImpl( TestConstants.CLUSTER_NAME + "_" + TestConstants.KEYSPACE_NAME); CountingConnectionPoolMonitor monitor = new CountingConnectionPoolMonitor(); try { ConnectionPool<TestClient> pool = new RoundRobinConnectionPoolImpl<TestClient>( config, new TestConnectionFactory(config, monitor), monitor); } catch (Exception e) { e.printStackTrace(); Assert.fail(); } } @Test public void testRestartedCluster() { ConnectionPool<TestClient> pool = createPool(); Host host1 = new Host("127.0.0.1", TestHostType.GOOD_FAST.ordinal()); Map<BigInteger, List<Host>> ring1 = Maps.newHashMap(); ring1.put(new BigInteger("0"), Lists.newArrayList(host1)); Host host2 = new Host("127.0.0.2", TestHostType.GOOD_FAST.ordinal()); Map<BigInteger, List<Host>> ring2 = Maps.newHashMap(); ring2.put(new BigInteger("0"), Lists.newArrayList(host2)); Map<BigInteger, List<Host>> ring3 = Maps.newHashMap(); pool.setHosts(ring1); Assert.assertTrue(pool.hasHost(host1)); Assert.assertTrue(pool.isHostUp(host1)); Assert.assertFalse(pool.hasHost(host2)); Assert.assertFalse(pool.isHostUp(host2)); try { OperationResult<String> result = pool.executeWithFailover( dummyOperation, RunOnce.get()); Assert.assertEquals(host1, result.getHost()); } catch (Exception e) { Assert.fail(); } pool.setHosts(ring3); Assert.assertFalse(pool.hasHost(host1)); Assert.assertFalse(pool.hasHost(host2)); try { OperationResult<String> result = pool.executeWithFailover( dummyOperation, RunOnce.get()); result = pool.executeWithFailover(dummyOperation, RunOnce.get()); Assert.fail(); } catch (NoAvailableHostsException e) { } catch (Exception e) { LOG.info(e); Assert.fail(); } pool.setHosts(ring2); Assert.assertTrue(pool.hasHost(host2)); Assert.assertTrue(pool.isHostUp(host2)); Assert.assertFalse(pool.hasHost(host1)); Assert.assertFalse(pool.isHostUp(host1)); try { OperationResult<String> result = pool.executeWithFailover( dummyOperation, RunOnce.get()); Assert.assertEquals(host2, result.getHost()); } catch (Exception e) { Assert.fail(); } } @Test @Ignore public void testAddHostThatIsDown() { /* * ConnectionPoolConfigurationImpl config = new * ConnectionPoolConfigurationImpl(MockConstants.CLUSTER_NAME, * MockConstants.KEYSPACE_NAME); config.setRetryBackoffStrategy(new * FixedRetryBackoffStrategy(100, 0)); * * ConnectionPool<MockClient> pool = new * RoundRobinConnectionPoolImpl<MockClient>(config, new * MockConnectionFactory(config)); */ ConnectionPool<TestClient> pool = createPool(); Host host1 = new Host("127.0.0.1", TestHostType.CONNECT_FAIL_FIRST.ordinal()); Map<BigInteger, List<Host>> ring1 = Maps.newHashMap(); ring1.put(new BigInteger("0"), Lists.newArrayList(host1)); OperationResult<String> result; pool.setHosts(ring1); Assert.assertTrue(pool.hasHost(host1)); Assert.assertFalse(pool.isHostUp(host1)); try { pool.executeWithFailover(dummyOperation, RunOnce.get()); Assert.fail(); } catch (NoAvailableHostsException e) { } catch (ConnectionException e) { Assert.fail(); } think(500); Assert.assertTrue(pool.hasHost(host1)); Assert.assertTrue(pool.isHostUp(host1)); try { pool.executeWithFailover(dummyOperation, RunOnce.get()); } catch (Exception e) { LOG.error(e.getMessage()); Assert.fail(); } } @Test @Ignore public void testConnectionAborted() { ConnectionPool<TestClient> pool = createPool(); Host host = new Host("127.0.0.1", TestHostType.ABORTED_CONNECTION.ordinal()); pool.addHost(host, true); OperationResult<String> result; try { result = pool.executeWithFailover(dummyOperation, RunOnce.get()); Assert.assertEquals(2, result.getAttemptsCount()); } catch (ConnectionException e) { LOG.error(e); Assert.fail(); } } @Test public void testRetryEmptyPool() { ConnectionPool<TestClient> pool = createPool(); RetryPolicy retry = new RunOnce(); try { pool.executeWithFailover(dummyOperation, retry); Assert.fail(); } catch (ConnectionException e) { Assert.assertEquals(1, retry.getAttemptCount()); LOG.error(e); } retry = new ConstantBackoff(1, 10); try { pool.executeWithFailover(dummyOperation, retry); Assert.fail(); } catch (ConnectionException e) { Assert.assertEquals(10, retry.getAttemptCount()); LOG.info(e); } } protected void think(long timeout) { try { Thread.sleep(timeout); } catch (InterruptedException e) { } } }