/******************************************************************************* * Copyright 2011 Netflix * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. ******************************************************************************/ package com.netflix.astyanax.thrift; import com.google.common.collect.Maps; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.netflix.astyanax.AstyanaxConfiguration; import com.netflix.astyanax.AuthenticationCredentials; import com.netflix.astyanax.CassandraOperationTracer; import com.netflix.astyanax.CassandraOperationType; import com.netflix.astyanax.KeyspaceTracerFactory; import com.netflix.astyanax.connectionpool.Connection; import com.netflix.astyanax.connectionpool.ConnectionFactory; import com.netflix.astyanax.connectionpool.ConnectionPoolConfiguration; import com.netflix.astyanax.connectionpool.ConnectionPoolMonitor; import com.netflix.astyanax.connectionpool.Host; import com.netflix.astyanax.connectionpool.HostConnectionPool; import com.netflix.astyanax.connectionpool.Operation; import com.netflix.astyanax.connectionpool.OperationResult; import com.netflix.astyanax.connectionpool.RateLimiter; import com.netflix.astyanax.connectionpool.exceptions.ConnectionException; import com.netflix.astyanax.connectionpool.exceptions.IsTimeoutException; import com.netflix.astyanax.connectionpool.exceptions.ThrottledException; import com.netflix.astyanax.connectionpool.impl.OperationResultImpl; import com.netflix.astyanax.connectionpool.impl.SimpleRateLimiterImpl; import org.apache.cassandra.thrift.AuthenticationRequest; import org.apache.cassandra.thrift.Cassandra; import org.apache.cassandra.thrift.TBinaryProtocol; import org.apache.thrift.transport.TFramedTransport; import org.apache.thrift.transport.TSocket; import org.apache.thrift.transport.TTransportException; import java.util.Map; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; public class ThriftSyncConnectionFactoryImpl implements ConnectionFactory<Cassandra.Client> { private static final String NAME_FORMAT = "ThriftConnection<%s-%d>"; private final AtomicLong idCounter = new AtomicLong(0); private final ExecutorService executor = Executors.newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true) .build()); private final RateLimiter limiter; private final ConnectionPoolConfiguration cpConfig; private final KeyspaceTracerFactory tracerFactory; private final ConnectionPoolMonitor monitor; private final AstyanaxConfiguration asConfig; public ThriftSyncConnectionFactoryImpl(AstyanaxConfiguration asConfig, ConnectionPoolConfiguration cpConfig, KeyspaceTracerFactory tracerFactory, ConnectionPoolMonitor monitor) { this.cpConfig = cpConfig; this.asConfig = asConfig; this.limiter = new SimpleRateLimiterImpl(cpConfig); this.tracerFactory = tracerFactory; this.monitor = monitor; } @Override public Connection<Cassandra.Client> createConnection(final HostConnectionPool<Cassandra.Client> pool) throws ThrottledException { if (limiter.check() == false) { throw new ThrottledException("Too many connection attempts"); } return new Connection<Cassandra.Client>() { private final long id = idCounter.incrementAndGet(); private Cassandra.Client cassandraClient; private TFramedTransport transport; private TSocket socket; private int timeout = 0; private AtomicLong operationCounter = new AtomicLong(); private AtomicBoolean closed = new AtomicBoolean(false); private volatile ConnectionException lastException = null; private volatile String keyspaceName; @Override public <R> OperationResult<R> execute(Operation<Cassandra.Client, R> op) throws ConnectionException { long startTime = System.nanoTime(); long latency = 0; setTimeout(cpConfig.getSocketTimeout()); // In case the configurationchanged operationCounter.incrementAndGet(); // Set a new keyspace, if it changed lastException = null; if (op.getKeyspace() != null && (keyspaceName == null || !op.getKeyspace().equals(keyspaceName))) { CassandraOperationTracer tracer = tracerFactory.newTracer(CassandraOperationType.SET_KEYSPACE) .start(); try { cassandraClient.set_keyspace(op.getKeyspace()); if (asConfig.getCqlVersion() != null) cassandraClient.set_cql_version(asConfig.getCqlVersion()); keyspaceName = op.getKeyspace(); long now = System.nanoTime(); latency = now - startTime; pool.addLatencySample(latency, now); tracer.success(); } catch (Exception e) { long now = System.nanoTime(); latency = now - startTime; lastException = ThriftConverter.ToConnectionPoolException(e).setLatency(latency); if (e instanceof IsTimeoutException) pool.addLatencySample(latency, now); tracer.failure(lastException); throw lastException; } startTime = System.nanoTime(); // We don't want to include // the set_keyspace in our // latency calculation } // Execute the operation try { R result = op.execute(cassandraClient); long now = System.nanoTime(); latency = now - startTime; pool.addLatencySample(latency, now); return new OperationResultImpl<R>(getHost(), result, latency); } catch (Exception e) { long now = System.nanoTime(); latency = now - startTime; lastException = ThriftConverter.ToConnectionPoolException(e).setLatency(latency); if (e instanceof IsTimeoutException) pool.addLatencySample(latency, now); throw lastException; } } @Override public void open() throws ConnectionException { if (cassandraClient != null) { throw new IllegalStateException("Open called on already open connection"); } long startTime = System.currentTimeMillis(); try { socket = new TSocket(getHost().getIpAddress(), getHost().getPort(), cpConfig.getConnectTimeout()); socket.getSocket().setTcpNoDelay(true); socket.getSocket().setKeepAlive(true); socket.getSocket().setSoLinger(false, 0); setTimeout(cpConfig.getSocketTimeout()); transport = new TFramedTransport(socket); transport.open(); cassandraClient = new Cassandra.Client(new TBinaryProtocol(transport)); monitor.incConnectionCreated(getHost()); AuthenticationCredentials credentials = cpConfig.getAuthenticationCredentials(); if (credentials != null) { Map<String, String> thriftCredentials = Maps.newHashMapWithExpectedSize(2); thriftCredentials.put("username", credentials.getUsername()); thriftCredentials.put("password", credentials.getPassword()); cassandraClient.login(new AuthenticationRequest(thriftCredentials)); } } catch (Exception e) { closeClient(); ConnectionException ce = ThriftConverter.ToConnectionPoolException(e).setHost(getHost()) .setLatency(System.currentTimeMillis() - startTime); monitor.incConnectionCreateFailed(getHost(), ce); throw ce; } } @Override public void openAsync(final AsyncOpenCallback<Cassandra.Client> callback) { final Connection<Cassandra.Client> This = this; executor.submit(new Runnable() { @Override public void run() { try { open(); callback.success(This); } catch (Exception e) { callback.failure(This, ThriftConverter.ToConnectionPoolException(e)); } } }); } @Override public void close() { if (closed.compareAndSet(false, true)) { monitor.incConnectionClosed(getHost(), lastException); executor.submit(new Runnable() { @Override public void run() { try { closeClient(); } catch (Exception e) { } } }); } } private void closeClient() { if (transport != null) { try { transport.flush(); } catch (TTransportException e) { } finally { try { transport.close(); } catch (Exception e) { } finally { transport = null; } } } if (socket != null) { try { socket.close(); } catch (Exception e) { } finally { socket = null; } } } @Override public HostConnectionPool<Cassandra.Client> getHostConnectionPool() { return pool; } @Override public ConnectionException getLastException() { return lastException; } @Override public String toString() { return String.format(NAME_FORMAT, getHost().getHostName(), id); } /** * Compares the toString of these clients */ @Override public boolean equals(Object obj) { return toString().equals(obj.toString()); } @Override public long getOperationCount() { return operationCounter.get(); } @Override public Host getHost() { return pool.getHost(); } public void setTimeout(int timeout) { if (this.timeout != timeout) { socket.setTimeout(timeout); this.timeout = timeout; } } }; } }