public final class

ThriftKeyspaceImpl

extends Object
implements Keyspace
/*******************************************************************************
 * Copyright 2011 Netflix
 * 
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * 
 *   http://www.apache.org/licenses/LICENSE-2.0
 * 
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 ******************************************************************************/
package com.netflix.astyanax.thrift;

import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.CounterColumn;
import org.apache.cassandra.thrift.Cassandra.Client;

import com.google.common.base.Function;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.collect.Lists;
import com.netflix.astyanax.AstyanaxConfiguration;
import com.netflix.astyanax.ColumnMutation;
import com.netflix.astyanax.Execution;
import com.netflix.astyanax.CassandraOperationType;
import com.netflix.astyanax.KeyspaceTracerFactory;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.WriteAheadEntry;
import com.netflix.astyanax.WriteAheadLog;
import com.netflix.astyanax.SerializerPackage;
import com.netflix.astyanax.connectionpool.ConnectionPool;
import com.netflix.astyanax.connectionpool.Host;
import com.netflix.astyanax.connectionpool.Operation;
import com.netflix.astyanax.connectionpool.OperationResult;
import com.netflix.astyanax.connectionpool.TokenRange;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.exceptions.OperationException;
import com.netflix.astyanax.connectionpool.impl.TokenRangeImpl;
import com.netflix.astyanax.ddl.KeyspaceDefinition;
import com.netflix.astyanax.model.*;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.query.ColumnFamilyQuery;
import com.netflix.astyanax.retry.RetryPolicy;
import com.netflix.astyanax.serializers.SerializerPackageImpl;
import com.netflix.astyanax.serializers.UnknownComparatorException;
import com.netflix.astyanax.thrift.ddl.*;

public final class ThriftKeyspaceImpl implements Keyspace {

    private final ConnectionPool<Cassandra.Client> connectionPool;
    private static final RandomPartitioner partitioner = new RandomPartitioner();
    private final AstyanaxConfiguration config;
    private final String ksName;
    private final ExecutorService executor;
    private final KeyspaceTracerFactory tracerFactory;
    private final Cache<String, Object> cache;

    public ThriftKeyspaceImpl(String ksName, ConnectionPool<Cassandra.Client> pool, AstyanaxConfiguration config,
            final KeyspaceTracerFactory tracerFactory) {
        this.connectionPool = pool;
        this.config = config;
        this.ksName = ksName;
        this.executor = config.getAsyncExecutor();
        this.tracerFactory = tracerFactory;
        this.cache = CacheBuilder.newBuilder().expireAfterWrite(10, TimeUnit.MINUTES).build();
    }

    @Override
    public String getKeyspaceName() {
        return this.ksName;
    }

    @Override
    public MutationBatch prepareMutationBatch() {
        return new AbstractThriftMutationBatchImpl(config.getClock()) {
            private ConsistencyLevel consistencyLevel = config.getDefaultWriteConsistencyLevel();
            private RetryPolicy retry = config.getRetryPolicy().duplicate();
            private Host pinnedHost;
            private WriteAheadLog wal;

            @Override
            public MutationBatch pinToHost(Host host) {
                this.pinnedHost = host;
                return this;
            }

            @Override
            public MutationBatch setConsistencyLevel(ConsistencyLevel consistencyLevel) {
                this.consistencyLevel = consistencyLevel;
                return this;
            }

            @Override
            public OperationResult<Void> execute() throws ConnectionException {
                WriteAheadEntry walEntry = null;
                if (wal != null) {
                    walEntry = wal.createEntry();
                    walEntry.writeMutation(this);
                }
                try {
                    OperationResult<Void> result = executeOperation(
                            new AbstractKeyspaceOperationImpl<Void>(
                                    tracerFactory.newTracer(CassandraOperationType.BATCH_MUTATE), pinnedHost,
                                    getKeyspaceName()) {
                                @Override
                                public Void internalExecute(Client client) throws Exception {
                                    client.batch_mutate(getMutationMap(),
                                            ThriftConverter.ToThriftConsistencyLevel(consistencyLevel));
                                    discardMutations();
                                    return null;
                                }

                                @Override
                                public BigInteger getToken() {
                                    if (getMutationMap().size() == 1)
                                        return partitioner.getToken(getMutationMap().keySet().iterator().next()).token;
                                    else
                                        return null;
                                }
                            }, retry);

                    if (walEntry != null) {
                        wal.removeEntry(walEntry);
                    }
                    return result;
                }
                catch (ConnectionException exception) {
                    throw exception;
                }
            }

            @Override
            public Future<OperationResult<Void>> executeAsync() throws ConnectionException {
                return executor.submit(new Callable<OperationResult<Void>>() {
                    @Override
                    public OperationResult<Void> call() throws Exception {
                        return execute();
                    }
                });
            }

            @Override
            public MutationBatch withRetryPolicy(RetryPolicy retry) {
                this.retry = retry;
                return this;
            }

            @Override
            public MutationBatch usingWriteAheadLog(WriteAheadLog manager) {
                this.wal = manager;
                return this;
            }
        };
    }

    @Override
    public List<TokenRange> describeRing() throws ConnectionException {
        return executeOperation(
                new AbstractKeyspaceOperationImpl<List<TokenRange>>(tracerFactory
                        .newTracer(CassandraOperationType.DESCRIBE_RING), getKeyspaceName()) {
                    @Override
                    public List<TokenRange> internalExecute(Cassandra.Client client)
                            throws Exception {
                        return Lists.transform(client.describe_ring(getKeyspaceName()),
                                new Function<org.apache.cassandra.thrift.TokenRange, TokenRange>() {
                                    @Override
                                    public TokenRange apply(
                                            org.apache.cassandra.thrift.TokenRange tr) {
                                        return new TokenRangeImpl(tr.getStart_token(), tr.getEnd_token(), tr.getEndpoints());
                                    }
                                });
                    }
                }, getConfig().getRetryPolicy().duplicate()).getResult();
    }

    @SuppressWarnings("unchecked")
    @Override
    public List<TokenRange> describeRing(boolean cached) throws ConnectionException {
        if (cached) {
          try {
            return (List<TokenRange>) this.cache.get(CassandraOperationType.DESCRIBE_RING.name(),
                  new Callable<Object>() {
                      @Override
                      public Object call() throws Exception {
                          return describeRing();
                      }
                  });
            }
            catch (ExecutionException e) {
                throw ThriftConverter.ToConnectionPoolException(e);
            }
        }
        else {
            return describeRing();
        }
    }
    
    @Override
    public KeyspaceDefinition describeKeyspace() throws ConnectionException {
        return executeOperation(
                new AbstractKeyspaceOperationImpl<KeyspaceDefinition>(
                        tracerFactory.newTracer(CassandraOperationType.DESCRIBE_KEYSPACE), getKeyspaceName()) {
                    @Override
                    public KeyspaceDefinition internalExecute(Cassandra.Client client) throws Exception {
                        return new ThriftKeyspaceDefinitionImpl(client.describe_keyspace(getKeyspaceName()));
                    }
                }, getConfig().getRetryPolicy().duplicate()).getResult();
    }

    public <K, C> ColumnFamilyQuery<K, C> prepareQuery(ColumnFamily<K, C> cf) {
        return new ThriftColumnFamilyQueryImpl<K, C>(executor, tracerFactory, this, connectionPool, cf,
                config.getDefaultReadConsistencyLevel(), config.getRetryPolicy());
    }

    @Override
    public <K, C> ColumnMutation prepareColumnMutation(final ColumnFamily<K, C> columnFamily, final K rowKey, C column) {
        return new AbstractThriftColumnMutationImpl(columnFamily.getKeySerializer().toByteBuffer(rowKey), columnFamily
                .getColumnSerializer().toByteBuffer(column), config.getClock()) {

            private RetryPolicy retry = config.getRetryPolicy().duplicate();
            private ConsistencyLevel writeConsistencyLevel = config.getDefaultWriteConsistencyLevel();

            @Override
            public ColumnMutation setConsistencyLevel(ConsistencyLevel consistencyLevel) {
                writeConsistencyLevel = consistencyLevel;
                return this;
            }

            @Override
            public Execution<Void> incrementCounterColumn(final long amount) {
                return new Execution<Void>() {
                    @Override
                    public OperationResult<Void> execute() throws ConnectionException {
                        return executeOperation(
                                new AbstractKeyspaceOperationImpl<Void>(
                                        tracerFactory.newTracer(CassandraOperationType.COUNTER_MUTATE),
                                        getKeyspaceName()) {
                                    @Override
                                    public Void internalExecute(Client client) throws Exception {
                                        client.add(key, ThriftConverter.getColumnParent(columnFamily, null),
                                                new CounterColumn().setValue(amount).setName(column),
                                                ThriftConverter.ToThriftConsistencyLevel(writeConsistencyLevel));
                                        return null;
                                    }

                                    @Override
                                    public BigInteger getToken() {
                                        return partitioner.getToken(columnFamily.getKeySerializer()
                                                .toByteBuffer(rowKey)).token;
                                    }
                                }, retry);
                    }

                    @Override
                    public Future<OperationResult<Void>> executeAsync() throws ConnectionException {
                        return executor.submit(new Callable<OperationResult<Void>>() {
                            @Override
                            public OperationResult<Void> call() throws Exception {
                                return execute();
                            }
                        });
                    }
                };
            }

            @Override
            public Execution<Void> deleteColumn() {
                return new Execution<Void>() {
                    @Override
                    public OperationResult<Void> execute() throws ConnectionException {
                        return executeOperation(
                                new AbstractKeyspaceOperationImpl<Void>(
                                        tracerFactory.newTracer(CassandraOperationType.COLUMN_DELETE),
                                        getKeyspaceName()) {
                                    @Override
                                    public Void internalExecute(Client client) throws Exception {
                                        client.remove(key, new org.apache.cassandra.thrift.ColumnPath()
                                                .setColumn_family(columnFamily.getName()).setColumn(column), config
                                                .getClock().getCurrentTime(), ThriftConverter
                                                .ToThriftConsistencyLevel(writeConsistencyLevel));
                                        return null;
                                    }

                                    @Override
                                    public BigInteger getToken() {
                                        return partitioner.getToken(columnFamily.getKeySerializer()
                                                .toByteBuffer(rowKey)).token;
                                    }
                                }, retry);
                    }

                    @Override
                    public Future<OperationResult<Void>> executeAsync() throws ConnectionException {
                        return executor.submit(new Callable<OperationResult<Void>>() {
                            @Override
                            public OperationResult<Void> call() throws Exception {
                                return execute();
                            }
                        });
                    }
                };
            }

            @Override
            public Execution<Void> insertValue(final ByteBuffer value, final Integer ttl) {
                return new Execution<Void>() {
                    @Override
                    public OperationResult<Void> execute() throws ConnectionException {
                        return executeOperation(
                                new AbstractKeyspaceOperationImpl<Void>(
                                        tracerFactory.newTracer(CassandraOperationType.COLUMN_INSERT),
                                        getKeyspaceName()) {
                                    @Override
                                    public Void internalExecute(Client client) throws Exception {
                                        org.apache.cassandra.thrift.Column c = new org.apache.cassandra.thrift.Column();
                                        c.setName(column).setValue(value).setTimestamp(clock.getCurrentTime());
                                        if (ttl != null) {
                                            c.setTtl(ttl);
                                        }

                                        client.insert(key, ThriftConverter.getColumnParent(columnFamily, null), c,
                                                ThriftConverter.ToThriftConsistencyLevel(writeConsistencyLevel));
                                        return null;
                                    }

                                    @Override
                                    public BigInteger getToken() {
                                        return partitioner.getToken(columnFamily.getKeySerializer()
                                                .toByteBuffer(rowKey)).token;
                                    }
                                }, retry);
                    }

                    @Override
                    public Future<OperationResult<Void>> executeAsync() throws ConnectionException {
                        return executor.submit(new Callable<OperationResult<Void>>() {
                            @Override
                            public OperationResult<Void> call() throws Exception {
                                return execute();
                            }
                        });
                    }
                };
            }

            @Override
            public Execution<Void> deleteCounterColumn() {
                return new Execution<Void>() {
                    @Override
                    public OperationResult<Void> execute() throws ConnectionException {
                        return executeOperation(
                                new AbstractKeyspaceOperationImpl<Void>(
                                        tracerFactory.newTracer(CassandraOperationType.COLUMN_DELETE),
                                        getKeyspaceName()) {
                                    @Override
                                    public Void internalExecute(Client client) throws Exception {
                                        client.remove_counter(key, new org.apache.cassandra.thrift.ColumnPath()
                                                .setColumn_family(columnFamily.getName()).setColumn(column),
                                                ThriftConverter.ToThriftConsistencyLevel(writeConsistencyLevel));
                                        return null;
                                    }

                                    @Override
                                    public BigInteger getToken() {
                                        return partitioner.getToken(columnFamily.getKeySerializer()
                                                .toByteBuffer(rowKey)).token;
                                    }
                                }, retry);
                    }

                    @Override
                    public Future<OperationResult<Void>> executeAsync() throws ConnectionException {
                        return executor.submit(new Callable<OperationResult<Void>>() {
                            @Override
                            public OperationResult<Void> call() throws Exception {
                                return execute();
                            }
                        });
                    }
                };
            }

            @Override
            public ColumnMutation withRetryPolicy(RetryPolicy retry) {
                this.retry = retry;
                return this;
            }
        };
    }

    @Override
    public AstyanaxConfiguration getConfig() {
        return this.config;
    }

    @Override
    public SerializerPackage getSerializerPackage(String cfName, boolean ignoreErrors) throws ConnectionException,
            UnknownComparatorException {
        return new SerializerPackageImpl(describeKeyspace().getColumnFamily(cfName), ignoreErrors);
    }

    @Override
    public OperationResult<Void> testOperation(final Operation<?, ?> operation) throws ConnectionException {
        return testOperation(operation, config.getRetryPolicy().duplicate());
    }

    @Override
    public OperationResult<Void> testOperation(final Operation<?, ?> operation, RetryPolicy retry)
            throws ConnectionException {
        return executeOperation(
                new AbstractKeyspaceOperationImpl<Void>(tracerFactory.newTracer(CassandraOperationType.TEST),
                        operation.getPinnedHost(), getKeyspaceName()) {
                    @Override
                    public Void internalExecute(Client client) throws Exception {
                        operation.execute(null);
                        return null;
                    }
                }, retry);
    }

    ConnectionPool<Cassandra.Client> getConnectionPool() {
        return connectionPool;
    }

    @Override
    public <K, C> OperationResult<Void> truncateColumnFamily(final ColumnFamily<K, C> columnFamily)
            throws OperationException, ConnectionException {
        return executeOperation(
                new AbstractKeyspaceOperationImpl<Void>(tracerFactory.newTracer(CassandraOperationType.TRUNCATE),
                        getKeyspaceName()) {
                    @Override
                    public Void internalExecute(Cassandra.Client client) throws Exception {
                        client.truncate(columnFamily.getName());
                        return null;
                    }
                }, config.getRetryPolicy().duplicate());
    }

    private <R> OperationResult<R> executeOperation(Operation<Cassandra.Client, R> operation, RetryPolicy retry)
            throws OperationException, ConnectionException {
        return connectionPool.executeWithFailover(operation, retry);
    }

}