public class

ObjectReader

extends Object
implements Callable<V>
package com.netflix.astyanax.recipes.storage;

import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceArray;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.astyanax.connectionpool.exceptions.NotFoundException;
import com.netflix.astyanax.retry.RetryPolicy;
import com.netflix.astyanax.retry.RunOnce;

public class ObjectReader implements Callable<ObjectMetadata> {
    private static final Logger LOG = LoggerFactory.getLogger(ObjectReader.class);

    private static final int DEFAULT_CONCURRENCY_LEVEL = 4;
    private static final int MAX_WAIT_TIME_TO_FINISH = 60;
    private static final int DEFAULT_BATCH_SIZE = 11;

    private final ChunkedStorageProvider provider;
    private final String objectName;
    private final OutputStream os;

    private int concurrencyLevel = DEFAULT_CONCURRENCY_LEVEL;
    private int maxWaitTimeInSeconds = MAX_WAIT_TIME_TO_FINISH;
    private int batchSize = DEFAULT_BATCH_SIZE;
    private RetryPolicy retryPolicy;
    private ObjectReadCallback callback = new NoOpObjectReadCallback();

    public ObjectReader(ChunkedStorageProvider provider, String objectName, OutputStream os) {
        this.provider = provider;
        this.objectName = objectName;
        this.os = os;
        this.retryPolicy = new RunOnce();
    }

    public ObjectReader withBatchSize(int size) {
        this.batchSize = size;
        return this;
    }

    public ObjectReader withConcurrencyLevel(int level) {
        this.concurrencyLevel = level;
        return this;
    }

    public ObjectReader withRetryPolicy(RetryPolicy retryPolicy) {
        this.retryPolicy = retryPolicy;
        return this;
    }

    public ObjectReader withMaxWaitTime(int maxWaitTimeInSeconds) {
        this.maxWaitTimeInSeconds = maxWaitTimeInSeconds;
        return this;
    }

    public ObjectReader withCallback(ObjectReadCallback callback) {
        this.callback = callback;
        return this;
    }

    @Override
    public ObjectMetadata call() throws Exception {
        LOG.info("Reading: " + objectName);

        Preconditions.checkNotNull(objectName);
        Preconditions.checkNotNull(os);

        try {
            // Try to get the file metadata first. The entire file must be
            // available before it can be downloaded.
            // If not available then we back off and retry using the provided
            // retry policy.
            ObjectMetadata attributes;
            RetryPolicy retry = retryPolicy.duplicate();
            do {
                try {
                    attributes = provider.readMetadata(objectName);
                    if (attributes.isValidForRead())
                        break;
                    if (!retry.allowRetry())
                        throw new NotFoundException("File doesn't exists or isn't ready to be read: " + objectName);
                }
                catch (Exception e) {
                    LOG.warn(e.getMessage());
                    if (!retry.allowRetry())
                        throw e;
                }
            } while (true);

            final AtomicReference<Exception> exception = new AtomicReference<Exception>();
            final AtomicLong totalBytesRead = new AtomicLong();

            // Iterate sequentially building up the batches. Once a complete
            // batch of ids is ready
            // randomize fetching the chunks and then download them in parallel
            List<Integer> idsToRead = Lists.newArrayList();
            for (int block = 0; block < attributes.getChunkCount(); block++) {
                idsToRead.add(block);

                // Got a batch, or reached the end
                if (idsToRead.size() == batchSize || block == attributes.getChunkCount() - 1) {

                    // Read blocks in random order
                    final int firstBlockId = idsToRead.get(0);
                    Collections.shuffle(idsToRead);
                    final AtomicReferenceArray<ByteBuffer> chunks = new AtomicReferenceArray<ByteBuffer>(
                            idsToRead.size());
                    ExecutorService executor = Executors.newFixedThreadPool(
                            concurrencyLevel,
                            new ThreadFactoryBuilder().setDaemon(true)
                                    .setNameFormat("ChunkReader-" + objectName + "-%d").build());
                    try {
                        for (final int chunkId : idsToRead) {
                            executor.submit(new Runnable() {
                                @Override
                                public void run() {
                                    // Do the fetch
                                    RetryPolicy retry = retryPolicy.duplicate();
                                    while (exception.get() == null) {
                                        try {
                                            ByteBuffer chunk = provider.readChunk(objectName, chunkId);
                                            totalBytesRead.addAndGet(chunk.limit());
                                            chunks.set(chunkId - firstBlockId, chunk);
                                            callback.onChunk(chunkId, chunk);
                                            break;
                                        }
                                        catch (Exception e) {
                                            callback.onChunkException(chunkId, e);
                                            if (retry.allowRetry())
                                                continue;
                                            exception.compareAndSet(null, e);
                                        }
                                    }
                                }
                            });
                        }
                    }
                    finally {
                        executor.shutdown();
                        if (!executor.awaitTermination(maxWaitTimeInSeconds, TimeUnit.SECONDS)) {
                            throw new Exception("Took too long to fetch object: " + objectName);
                        }
                    }

                    if (exception.get() != null)
                        throw exception.get();

                    for (int i = 0; i < chunks.length(); i++) {
                        os.write(chunks.get(i).array());
                        os.flush();
                    }
                    idsToRead.clear();
                }
            }

            if (totalBytesRead.get() != attributes.getObjectSize()) {
                throw new Exception("Bytes read (" + totalBytesRead.get() + ") does not match object size ("
                        + attributes.getObjectSize() + ") for object " + objectName);
            }
            callback.onSuccess();
            return attributes;
        }
        catch (Exception e) {
            callback.onFailure(e);
            throw e;
        }
    }
}