public abstract class

AbstractThriftAllRowsQueryImpl

extends Object
implements AllRowsQuery<K, C>
package com.netflix.astyanax.thrift;

import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;

import com.netflix.astyanax.ExceptionCallback;
import com.netflix.astyanax.model.ByteBufferRange;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.ColumnSlice;
import com.netflix.astyanax.query.AllRowsQuery;

public abstract class AbstractThriftAllRowsQueryImpl<K, C> implements AllRowsQuery<K, C> {

    protected SlicePredicate predicate = new SlicePredicate().setSlice_range(ThriftUtils.RANGE_ALL);
    // protected KeyRange range = new
    // KeyRange().setCount(100).setStart_token("0").setEnd_token("0");
    private int blockSize = 100;
    private ColumnFamily<K, C> columnFamily;
    private boolean repeatLastToken = true;
    private ExceptionCallback exceptionCallback;
    private Integer nThreads;
    
    public AbstractThriftAllRowsQueryImpl(ColumnFamily<K, C> columnFamily) {
        this.columnFamily = columnFamily;
    }

    public AllRowsQuery<K, C> setExceptionCallback(ExceptionCallback cb) {
        exceptionCallback = cb;
        return this;
    }

    protected ExceptionCallback getExceptionCallback() {
        return this.exceptionCallback;
    }

    @Override
    public AllRowsQuery<K, C> setThreadCount(int numberOfThreads) {
        setConcurrencyLevel(numberOfThreads);
        return this;
    }
    
    @Override
    public AllRowsQuery<K, C> setConcurrencyLevel(int numberOfThreads) {
        this.nThreads = numberOfThreads;
        return this;
    }
    
    @Override
    public AllRowsQuery<K, C> withColumnSlice(C... columns) {
        if (columns != null)
            predicate.setColumn_names(columnFamily.getColumnSerializer().toBytesList(Arrays.asList(columns)))
                    .setSlice_rangeIsSet(false);
        return this;
    }

    @Override
    public AllRowsQuery<K, C> withColumnSlice(Collection<C> columns) {
        if (columns != null)
            predicate.setColumn_names(columnFamily.getColumnSerializer().toBytesList(columns)).setSlice_rangeIsSet(
                    false);
        return this;
    }

    @Override
    public AllRowsQuery<K, C> withColumnRange(C startColumn, C endColumn, boolean reversed, int count) {
        predicate.setSlice_range(ThriftUtils.createSliceRange(columnFamily.getColumnSerializer(), startColumn,
                endColumn, reversed, count));
        return this;
    }

    @Override
    public AllRowsQuery<K, C> withColumnRange(ByteBuffer startColumn, ByteBuffer endColumn, boolean reversed, int count) {
        predicate.setSlice_range(new SliceRange(startColumn, endColumn, reversed, count));
        return this;
    }

    @Override
    public AllRowsQuery<K, C> withColumnSlice(ColumnSlice<C> slice) {
        if (slice.getColumns() != null) {
            predicate.setColumn_names(columnFamily.getColumnSerializer().toBytesList(slice.getColumns()))
                    .setSlice_rangeIsSet(false);
        }
        else {
            predicate.setSlice_range(ThriftUtils.createSliceRange(columnFamily.getColumnSerializer(),
                    slice.getStartColumn(), slice.getEndColumn(), slice.getReversed(), slice.getLimit()));
        }
        return this;
    }

    @Override
    public AllRowsQuery<K, C> withColumnRange(ByteBufferRange range) {
        predicate.setSlice_range(new SliceRange().setStart(range.getStart()).setFinish(range.getEnd())
                .setCount(range.getLimit()).setReversed(range.isReversed()));
        return this;
    }

    @Override
    public AllRowsQuery<K, C> setBlockSize(int blockSize) {
        return setRowLimit(blockSize);
    }

    @Override
    public AllRowsQuery<K, C> setRowLimit(int rowLimit) {
        this.blockSize = rowLimit;
        return this;
    }

    public int getBlockSize() {
        return blockSize;
    }

    @Override
    public AllRowsQuery<K, C> setRepeatLastToken(boolean repeatLastToken) {
        this.repeatLastToken = repeatLastToken;
        return this;
    }

    public boolean getRepeatLastToken() {
        return this.repeatLastToken;
    }

    protected Integer getConcurrencyLevel() {
        return this.nThreads;
    }
    
    protected abstract List<org.apache.cassandra.thrift.KeySlice> getNextBlock(KeyRange range);
}