public class

ReverseIndexQueryTest

extends Object
package com.netflix.astyanax.recipes;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;

import junit.framework.Assert;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Function;
import com.netflix.astyanax.AstyanaxContext;
import com.netflix.astyanax.Cluster;
import com.netflix.astyanax.Keyspace;
import com.netflix.astyanax.MutationBatch;
import com.netflix.astyanax.Serializer;
import com.netflix.astyanax.annotations.Component;
import com.netflix.astyanax.connectionpool.NodeDiscoveryType;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.connectionpool.impl.BagConnectionPoolImplTest;
import com.netflix.astyanax.connectionpool.impl.ConnectionPoolConfigurationImpl;
import com.netflix.astyanax.connectionpool.impl.CountingConnectionPoolMonitor;
import com.netflix.astyanax.ddl.KeyspaceDefinition;
import com.netflix.astyanax.impl.AstyanaxConfigurationImpl;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.Row;
import com.netflix.astyanax.recipes.ReverseIndexQuery.IndexEntryCallback;
import com.netflix.astyanax.serializers.AnnotatedCompositeSerializer;
import com.netflix.astyanax.serializers.LongSerializer;
import com.netflix.astyanax.serializers.StringSerializer;
import com.netflix.astyanax.thrift.ThriftFamilyFactory;

public class ReverseIndexQueryTest {

    private static Logger LOG = LoggerFactory
            .getLogger(BagConnectionPoolImplTest.class);

    private static AstyanaxContext<Cluster> clusterContext;

    private static final String TEST_CLUSTER_NAME = "TestCluster";
    private static final String TEST_KEYSPACE_NAME = "ReverseIndexTest";
    private static final String TEST_DATA_CF = "Data";
    private static final String TEST_INDEX_CF = "Index";

    private static final boolean TEST_INIT_KEYSPACE = true;
    private static final long ROW_COUNT = 1000;
    private static final int SHARD_COUNT = 11;

    public static final String SEEDS = "localhost:7102";

    private static ColumnFamily<Long, String> CF_DATA = ColumnFamily
            .newColumnFamily(TEST_DATA_CF, LongSerializer.get(),
                    StringSerializer.get());

    private static class IndexEntry {
        @Component(ordinal = 0)
        Long value;
        @Component(ordinal = 1)
        Long key;

        public IndexEntry(Long value, Long key) {
            this.value = value;
            this.key = key;
        }
    }

    private static Serializer<IndexEntry> indexEntitySerializer = new AnnotatedCompositeSerializer<IndexEntry>(
            IndexEntry.class);

    private static ColumnFamily<String, IndexEntry> CF_INDEX = ColumnFamily
            .newColumnFamily(TEST_INDEX_CF, StringSerializer.get(),
                    indexEntitySerializer);

    @BeforeClass
    public static void setup() throws Exception {
        clusterContext = new AstyanaxContext.Builder()
                .forCluster(TEST_CLUSTER_NAME)
                .withAstyanaxConfiguration(
                        new AstyanaxConfigurationImpl()
                                .setDiscoveryType(NodeDiscoveryType.NONE))
                .withConnectionPoolConfiguration(
                        new ConnectionPoolConfigurationImpl(TEST_CLUSTER_NAME)
                                .setMaxConnsPerHost(1).setSeeds(SEEDS))
                .withConnectionPoolMonitor(new CountingConnectionPoolMonitor())
                .buildCluster(ThriftFamilyFactory.getInstance());

        clusterContext.start();

        if (TEST_INIT_KEYSPACE) {
            Cluster cluster = clusterContext.getEntity();
            try {
                LOG.info("Dropping keyspace: " + TEST_KEYSPACE_NAME);
                cluster.dropKeyspace(TEST_KEYSPACE_NAME);
                Thread.sleep(10000);
            } catch (ConnectionException e) {
                LOG.warn(e.getMessage());
            }

            Map<String, String> stratOptions = new HashMap<String, String>();
            stratOptions.put("replication_factor", "3");

            try {
                LOG.info("Creating keyspace: " + TEST_KEYSPACE_NAME);

                KeyspaceDefinition ksDef = cluster.makeKeyspaceDefinition();

                ksDef.setName(TEST_KEYSPACE_NAME)
                        .setStrategyOptions(stratOptions)
                        .setStrategyClass("SimpleStrategy")
                        .addColumnFamily(
                                cluster.makeColumnFamilyDefinition()
                                        .setName(CF_DATA.getName())
                                        .setComparatorType("UTF8Type")
                        // .setKeyValidationClass("LongType")
                        // .setDefaultValidationClass("BytesType")
                        )
                        .addColumnFamily(
                                cluster.makeColumnFamilyDefinition()
                                        .setName(CF_INDEX.getName())
                                        .setComparatorType(
                                                "CompositeType(LongType, LongType)")
                                        .setDefaultValidationClass("BytesType"));
                cluster.addKeyspace(ksDef);
                Thread.sleep(2000);
                populateKeyspace();
            } catch (ConnectionException e) {
                LOG.error(e.getMessage());
            }
        }
    }

    @AfterClass
    public static void teardown() {
        if (clusterContext != null)
            clusterContext.shutdown();
    }

    public static void populateKeyspace() throws Exception {
        LOG.info("Ppoulating keyspace: " + TEST_KEYSPACE_NAME);

        Keyspace keyspace = clusterContext.getEntity().getKeyspace(
                TEST_KEYSPACE_NAME);

        try {
            // CF_Users :
            // 1 :
            // 'A' : 1,
            // 'B' : 2,
            //
            // CF_Index :
            // 'B_Shard1':
            // 2:1 : null
            // 3:2 : null
            //

            MutationBatch m = keyspace.prepareMutationBatch();

            for (long row = 0; row < ROW_COUNT; row++) {
                long value = row * 100;
                m.withRow(CF_DATA, row).putColumn("A", "ABC", null)
                        .putColumn("B", "DEF", null);
                m.withRow(CF_INDEX, "B_" + (row % SHARD_COUNT)).putColumn(
                        new IndexEntry(value, row), row, null);
            }

            // System.out.println(m);
            m.execute();
        } catch (Exception e) {
            LOG.error(e.getMessage());
            Assert.fail();
        }
    }

    @Test
    public void testReverseIndex() {
        LOG.info("Starting");
        final AtomicLong counter = new AtomicLong();

        Keyspace keyspace = clusterContext.getEntity().getKeyspace(
                TEST_KEYSPACE_NAME);
        ReverseIndexQuery
                .newQuery(keyspace, CF_DATA, CF_INDEX.getName(),
                        LongSerializer.get())
                .fromIndexValue(100L)
                .toIndexValue(10000L)
                .withIndexShards(
                        new Shards.StringShardBuilder().setPrefix("B_")
                                .setShardCount(SHARD_COUNT).build())
                .withColumnSlice(Arrays.asList("A"))
                .forEach(new Function<Row<Long, String>, Void>() {
                    @Override
                    public Void apply(Row<Long, String> row) {
                        StringBuilder sb = new StringBuilder();
                        for (Column<String> column : row.getColumns()) {
                            sb.append(column.getName()).append(", ");
                        }
                        counter.incrementAndGet();
                        LOG.info("Row: " + row.getKey() + " Columns: "
                                + sb.toString());
                        return null;
                    }
                }).forEachIndexEntry(new IndexEntryCallback<Long, Long>() {
                    @Override
                    public boolean handleEntry(Long key, Long value,
                            ByteBuffer meta) {
                        LOG.info("Row : " + key + " IndexValue: " + value
                                + " Meta: "
                                + LongSerializer.get().fromByteBuffer(meta));
                        if (key % 2 == 1)
                            return false;
                        return true;
                    }
                }).execute();

        LOG.info("Read " + counter.get() + " rows");
    }

}