public class

ThriftAllRowsImpl

extends Object
implements Rows<K, C>
package com.netflix.astyanax.thrift;

import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;

import org.apache.cassandra.dht.RandomPartitioner;
import org.apache.cassandra.thrift.KeyRange;

import com.google.common.collect.Iterables;
import com.netflix.astyanax.model.ColumnFamily;
import com.netflix.astyanax.model.Row;
import com.netflix.astyanax.model.Rows;
import com.netflix.astyanax.thrift.model.*;

public class ThriftAllRowsImpl<K, C> implements Rows<K, C> {
    private ColumnFamily<K, C> columnFamily;
    private AbstractThriftAllRowsQueryImpl<K, C> query;
    private final RandomPartitioner partitioner = new RandomPartitioner();

    public ThriftAllRowsImpl(AbstractThriftAllRowsQueryImpl<K, C> query, ColumnFamily<K, C> columnFamily) {
        this.columnFamily = columnFamily;
        this.query = query;
    }

    /**
     * Each call to .iterator() returns a new context starting at the beginning
     * of the column family.
     */
    @Override
    public Iterator<Row<K, C>> iterator() {
        return new Iterator<Row<K, C>>() {
            private KeyRange range;
            private org.apache.cassandra.thrift.KeySlice lastRow;
            private List<org.apache.cassandra.thrift.KeySlice> list = null;
            private Iterator<org.apache.cassandra.thrift.KeySlice> iter = null;

            {
                range = new KeyRange().setCount(query.getBlockSize()).setStart_token("0").setEnd_token("0");
            }

            @Override
            public boolean hasNext() {
                // Get the next block
                if (iter == null || (!iter.hasNext() && list.size() == query.getBlockSize())) {
                    if (lastRow != null) {
                        // Determine the start token for the next page
                        String token = partitioner.getToken(ByteBuffer.wrap(lastRow.getKey())).toString();
                        if (query.getRepeatLastToken()) {
                            // Start token is non-inclusive
                            BigInteger intToken = new BigInteger(token).subtract(new BigInteger("1"));
                            range.setStart_token(intToken.toString());
                        }
                        else {
                            range.setStart_token(token);
                        }
                    }

                    // Fetch the data
                    list = query.getNextBlock(range);
                    if (list == null) {
                        return false;
                    }
                    iter = list.iterator();
                    if (iter == null || !iter.hasNext()) {
                        return false;
                    }

                    // If repeating last token then skip the first row in the
                    // result
                    if (lastRow != null && query.getRepeatLastToken() && iter.hasNext()) {
                        iter.next();
                    }

                    lastRow = Iterables.getLast(list);
                }
                return iter.hasNext();
            }

            @Override
            public Row<K, C> next() {
                org.apache.cassandra.thrift.KeySlice row = iter.next();
                return new ThriftRowImpl<K, C>(columnFamily.getKeySerializer().fromBytes(row.getKey()),
                        ByteBuffer.wrap(row.getKey()), new ThriftColumnOrSuperColumnListImpl<C>(row.getColumns(),
                                columnFamily.getColumnSerializer()));
            }

            @Override
            public void remove() {
                throw new IllegalStateException();
            }
        };
    }

    @Override
    public Row<K, C> getRow(K key) {
        throw new IllegalStateException();
    }

    @Override
    public int size() {
        throw new IllegalStateException();
    }

    @Override
    public boolean isEmpty() {
        throw new IllegalStateException();
    }

    @Override
    public Row<K, C> getRowByIndex(int i) {
        throw new IllegalStateException();
    }
}