/******************************************************************************* * Copyright 2011 Netflix * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. ******************************************************************************/ package com.netflix.astyanax.connectionpool.impl; import java.util.List; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.netflix.astyanax.connectionpool.BadHostDetector; import com.netflix.astyanax.connectionpool.Connection; import com.netflix.astyanax.connectionpool.ConnectionFactory; import com.netflix.astyanax.connectionpool.ConnectionPoolConfiguration; import com.netflix.astyanax.connectionpool.ConnectionPoolMonitor; import com.netflix.astyanax.connectionpool.Host; import com.netflix.astyanax.connectionpool.HostConnectionPool; import com.netflix.astyanax.connectionpool.LatencyScoreStrategy; import com.netflix.astyanax.connectionpool.RetryBackoffStrategy; import com.netflix.astyanax.connectionpool.exceptions.ConnectionAbortedException; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; import com.netflix.astyanax.connectionpool.exceptions.HostDownException; import com.netflix.astyanax.connectionpool.exceptions.IsDeadConnectionException; import com.netflix.astyanax.connectionpool.exceptions.IsTimeoutException; import com.netflix.astyanax.connectionpool.exceptions.MaxConnsPerHostReachedException; import com.netflix.astyanax.connectionpool.exceptions.PoolTimeoutException; import com.netflix.astyanax.connectionpool.exceptions.ThrottledException; import com.netflix.astyanax.connectionpool.exceptions.TimeoutException; import com.netflix.astyanax.connectionpool.exceptions.UnknownException; /** * Pool of connections for a single host. * * Features 1. Async open connection 2. * * @author elandau * */ public class SimpleHostConnectionPool<CL> implements HostConnectionPool<CL> { /** * Interface to notify the owning connection pool of up/down state changes. * This give the owning connection pool a chance to remove a downed host * from its internal state until it's back up. * * @author elandau * * @param <CL> */ public interface Listener<CL> { void onHostDown(HostConnectionPool<CL> pool); void onHostUp(HostConnectionPool<CL> pool); } private static final AtomicLong poolIdCounter = new AtomicLong(0); private final long id = poolIdCounter.incrementAndGet(); private final BlockingQueue<Connection<CL>> availableConnections; private final AtomicInteger activeCount = new AtomicInteger(0); private final AtomicInteger pendingConnections = new AtomicInteger(0); private final AtomicInteger blockedThreads = new AtomicInteger(0); private final ConnectionFactory<CL> factory; private final Host host; private final AtomicBoolean isShutdown = new AtomicBoolean(false); private final ScheduledExecutorService executor = Executors.newScheduledThreadPool(1, new ThreadFactoryBuilder() .setDaemon(true).build()); private final RetryBackoffStrategy.Instance retryContext; private final BadHostDetector.Instance badHostDetector; private final LatencyScoreStrategy.Instance latencyStrategy; private final Listener<CL> listener; private final ConnectionPoolMonitor monitor; protected final ConnectionPoolConfiguration config; public SimpleHostConnectionPool(Host host, ConnectionFactory<CL> factory, ConnectionPoolMonitor monitor, ConnectionPoolConfiguration config, Listener<CL> listener) { this.host = host; this.config = config; this.factory = factory; this.listener = listener; this.availableConnections = new LinkedBlockingQueue<Connection<CL>>(); this.retryContext = config.getRetryBackoffStrategy().createInstance(); this.latencyStrategy = config.getLatencyScoreStrategy().createInstance(); this.badHostDetector = config.getBadHostDetector().createInstance(); this.monitor = monitor; } @Override public int growConnections(int numConnections) throws ConnectionException, InterruptedException { int count = Math.min(numConnections, config.getMaxConnsPerHost()); for (int i = 0, attemptCount = 0; i < count && attemptCount < 100; i++, attemptCount++) { try { availableConnections.add(openConnection()); } catch (MaxConnsPerHostReachedException e) { return i; } catch (ThrottledException e) { Thread.sleep(50); i--; } } return count; } /** * Create a connection as long the max hasn't been reached * * @param timeout * - Max wait timeout if max connections have been allocated and * pool is empty. 0 to throw a MaxConnsPerHostReachedException. * @return * @throws TimeoutException * if timeout specified and no new connection is available * MaxConnsPerHostReachedException if max connections created * and no timeout was specified */ @Override public Connection<CL> borrowConnection(int timeout) throws ConnectionException { if (isShutdown()) { throw new HostDownException("Can't borrow connection. Host is down."); } Connection<CL> connection = null; long startTime = System.currentTimeMillis(); try { // Try to get a free connection without blocking. connection = availableConnections.poll(); if (connection != null) { return connection; } boolean isOpenning = tryOpenAsync(); // Wait for a connection to free up or a new one to be opened if (timeout > 0) { connection = waitForConnection(isOpenning ? config.getConnectTimeout() : timeout); return connection; } else throw new PoolTimeoutException("Fast fail waiting for connection from pool").setHost(getHost()) .setLatency(System.currentTimeMillis() - startTime); } finally { if (connection != null) { monitor.incConnectionBorrowed(host, System.currentTimeMillis() - startTime); } } } /** * Internal method to wait for a connection from the available connection * pool. * * @param timeout * @return * @throws ConnectionException */ private Connection<CL> waitForConnection(int timeout) throws ConnectionException { Connection<CL> connection = null; long startTime = System.currentTimeMillis(); try { if (blockedThreads.incrementAndGet() <= config.getMaxBlockedThreadsPerHost()) { connection = availableConnections.poll(timeout, TimeUnit.MILLISECONDS); if (connection != null) return connection; } else { throw new PoolTimeoutException("Too many clients blocked on this pool " + blockedThreads.get()) .setHost(getHost()); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } finally { blockedThreads.decrementAndGet(); } throw new PoolTimeoutException("Timed out waiting for connection").setHost(getHost()).setLatency( System.currentTimeMillis() - startTime); } /** * Return a connection to this host * * @param connection */ @Override public boolean returnConnection(Connection<CL> connection) { monitor.incConnectionReturned(host); ConnectionException ce = connection.getLastException(); if (ce != null) { if (ce instanceof TimeoutException) { if (badHostDetector.addTimeoutSample()) { internalCloseConnection(connection); retryContext.suspend(); markAsDown(ce); return true; } } if (ce instanceof IsDeadConnectionException) { internalCloseConnection(connection); if (!(ce instanceof ConnectionAbortedException)) { markAsDown(ce); return true; } return false; } } // Still within the number of max active connection if (activeCount.get() <= config.getMaxConnsPerHost()) { availableConnections.add(connection); if (isShutdown()) { discardIdleConnections(); return true; } } else { // maxConnsPerHost was reduced. This may end up closing too many // connections, // but that's ok. We'll open them later. internalCloseConnection(connection); return true; } return false; } @Override public boolean closeConnection(Connection<CL> connection) { monitor.incConnectionReturned(host); internalCloseConnection(connection); return true; } private void internalCloseConnection(Connection<CL> connection) { connection.close(); activeCount.decrementAndGet(); } /** * Mark the host as down. No new connections will be created from this host. * Connections currently in use will be allowed to continue processing. */ @Override public void markAsDown(ConnectionException reason) { final HostConnectionPool<CL> pool = this; // Start the reconnect thread if (isShutdown.compareAndSet(false, true)) { discardIdleConnections(); listener.onHostDown(this); monitor.onHostDown(pool.getHost(), reason); try { executor.schedule(new Runnable() { @Override public void run() { Thread.currentThread().setName("RetryService : " + host.toString()); if (reconnect()) { // Created a new connection successfully. Update // internal state. retryContext.success(); isShutdown.set(false); monitor.onHostReactivated(host, pool); listener.onHostUp(SimpleHostConnectionPool.this); } else { executor.schedule(this, retryContext.getNextDelay(), TimeUnit.MILLISECONDS); } } }, retryContext.getNextDelay(), TimeUnit.MILLISECONDS); } catch (RejectedExecutionException e) { // Ignore } } else { discardIdleConnections(); } } private boolean reconnect() { Connection<CL> connection = null; try { activeCount.incrementAndGet(); connection = factory.createConnection(SimpleHostConnectionPool.this); connection.open(); availableConnections.add(connection); return true; } catch (Exception e) { activeCount.decrementAndGet(); } return false; } @Override public void shutdown() { executor.shutdown(); markAsDown(null); } /** * Open a new connection synchronously * * @return * @throws ConnectionException */ @Override public Connection<CL> openConnection() throws ConnectionException { if (isShutdown()) { throw new HostDownException("Can't open new connection. Host is down."); } Connection<CL> connection = null; try { if (activeCount.incrementAndGet() <= config.getMaxConnsPerHost()) { connection = factory.createConnection(this); connection.open(); if (isShutdown()) { connection.close(); connection = null; discardIdleConnections(); throw new HostDownException("Host marked down after connection was created."); } return connection; } else { throw new PoolTimeoutException("Pool exhausted").setHost(getHost()); } } catch (Exception e) { connection = null; ConnectionException ce = (e instanceof ConnectionException) ? (ConnectionException) e : new UnknownException(e); if (ce instanceof IsDeadConnectionException) { markAsDown(ce); } throw ce; } finally { if (connection == null) { activeCount.decrementAndGet(); } } } /** * Try to open a new connection asynchronously. We don't actually return a * connection here. Instead, the connection will be added to idle queue when * it's ready. */ private boolean tryOpenAsync() { Connection<CL> connection = null; // Try to open a new connection, as long as we haven't reached the max try { if (activeCount.incrementAndGet() <= config.getMaxConnsPerHost()) { // Don't try to open too many connections at the same time. if (pendingConnections.incrementAndGet() > config.getMaxPendingConnectionsPerHost()) { pendingConnections.decrementAndGet(); } else { try { connection = factory.createConnection(this); connection.openAsync(new Connection.AsyncOpenCallback<CL>() { @Override public void success(Connection<CL> connection) { pendingConnections.decrementAndGet(); availableConnections.add(connection); // Sanity check in case the connection // pool was closed if (isShutdown()) { discardIdleConnections(); } } @Override public void failure(Connection<CL> conn, ConnectionException e) { pendingConnections.decrementAndGet(); activeCount.decrementAndGet(); if (e instanceof IsDeadConnectionException) { markAsDown(e); } } }); return true; } catch (ThrottledException e) { // Trying to open way too many connections here } finally { if (connection == null) { pendingConnections.decrementAndGet(); } } } } } finally { if (connection == null) { activeCount.decrementAndGet(); } } return false; } @Override public boolean isShutdown() { return isShutdown.get(); } @Override public Host getHost() { return host; } @Override public int getActiveConnectionCount() { return activeCount.get(); } @Override public int getIdleConnectionCount() { return availableConnections.size(); } @Override public int getPendingConnectionCount() { return pendingConnections.get(); } @Override public int getBlockedThreadCount() { return blockedThreads.get(); } @Override public int getBusyConnectionCount() { return getActiveConnectionCount() - getIdleConnectionCount() - getPendingConnectionCount(); } @Override public double getScore() { return latencyStrategy.getScore(); } @Override public double getMeanLatency() { return latencyStrategy.getMean(); } @Override public void addLatencySample(long latency, long now) { latencyStrategy.addSample(latency); } private void discardIdleConnections() { List<Connection<CL>> connections = Lists.newArrayList(); availableConnections.drainTo(connections); activeCount.addAndGet(-connections.size()); for (Connection<CL> connection : connections) { connection.close(); // This is usually an async operation } } public String toString() { int idle = getIdleConnectionCount(); int open = getActiveConnectionCount(); return new StringBuilder().append("SimpleHostConnectionPool[").append("host=").append(host).append("-") .append(id).append(",active=").append(!isShutdown()).append(",open=").append(open).append(",busy=") .append(open - idle).append(",idle=").append(idle).append(",blocked=").append(getBlockedThreadCount()) .append(",pending=").append(getPendingConnectionCount()).append(",score=").append(getScore()) .append("]").toString(); } }