package com.netflix.astyanax.recipes.storage; import java.nio.ByteBuffer; import java.util.Map; import com.google.common.collect.Maps; import com.netflix.astyanax.ColumnListMutation; import com.netflix.astyanax.Keyspace; import com.netflix.astyanax.MutationBatch; import com.netflix.astyanax.connectionpool.exceptions.NotFoundException; import com.netflix.astyanax.model.ColumnFamily; import com.netflix.astyanax.model.ColumnList; import com.netflix.astyanax.model.ConsistencyLevel; import com.netflix.astyanax.retry.BoundedExponentialBackoff; import com.netflix.astyanax.retry.RetryPolicy; import com.netflix.astyanax.serializers.StringSerializer; /** * ChunkProvider responsible for reading and writing chunks to cassandra. Chunks * are written to different row keys with the row key name having the format * <chunknumber>$<objectname> * * @author elandau * */ public class CassandraChunkedStorageProvider implements ChunkedStorageProvider { private static final RetryPolicy DEFAULT_RETRY_POLICY = new BoundedExponentialBackoff(1000, 10000, 5); private static final ConsistencyLevel DEFAULT_CONSISTENCY_LEVEL = ConsistencyLevel.CL_QUORUM; private static final int DEFAULT_CHUNKSIZE = 0x4000; private static final String DEFAULT_ROW_KEY_FORMAT = "%s$%d"; public enum Columns { DATA, OBJECTSIZE, CHUNKSIZE, CHUNKCOUNT, EXPIRES } private final ColumnFamily<String, String> cf; private final Keyspace keyspace; private final Map<Columns, String> names = Maps.newHashMap(); private RetryPolicy retryPolicy = DEFAULT_RETRY_POLICY; private ConsistencyLevel consistencyLevel = DEFAULT_CONSISTENCY_LEVEL; private String rowKeyFormat = DEFAULT_ROW_KEY_FORMAT; public CassandraChunkedStorageProvider(Keyspace keyspace, String cfName) { this.keyspace = keyspace; this.cf = ColumnFamily.newColumnFamily(cfName, StringSerializer.get(), StringSerializer.get()); } public CassandraChunkedStorageProvider(Keyspace keyspace, ColumnFamily<String, String> cf) { this.keyspace = keyspace; this.cf = cf; } public CassandraChunkedStorageProvider withColumnName(Columns column, String name) { names.put(column, name); return this; } public CassandraChunkedStorageProvider withRowKeyFormat(String format) { this.rowKeyFormat = format; return this; } private String getColumnName(Columns column) { if (names.containsKey(column)) return names.get(column); return column.name(); } @Override public int writeChunk(String objectName, int chunkId, ByteBuffer data, Integer ttl) throws Exception { MutationBatch m = keyspace.prepareMutationBatch().withRetryPolicy(retryPolicy); m.withRow(cf, getRowKey(objectName, chunkId)).putColumn(getColumnName(Columns.DATA), data, ttl) .putColumn(getColumnName(Columns.CHUNKSIZE), data.limit(), ttl); if (chunkId == 0) { m.withRow(cf, objectName).putColumn(getColumnName(Columns.CHUNKSIZE), data.limit(), ttl); } m.execute(); return data.limit(); } @Override public ByteBuffer readChunk(String objectName, int chunkId) throws Exception { return keyspace.prepareQuery(cf).setConsistencyLevel(ConsistencyLevel.CL_ONE).withRetryPolicy(retryPolicy) .getKey(getRowKey(objectName, chunkId)).getColumn(getColumnName(Columns.DATA)).execute().getResult() .getByteBufferValue(); } private String getRowKey(String objectName, int chunkId) { return new String(rowKeyFormat).replace("%s", objectName).replace("%d", Integer.toString(chunkId)); } public CassandraChunkedStorageProvider setConsistencyLevel(ConsistencyLevel consistencyLevel) { this.consistencyLevel = consistencyLevel; return this; } public ConsistencyLevel getConsistencyLevel() { return this.consistencyLevel; } @Override public void writeMetadata(String objectName, ObjectMetadata attr) throws Exception { MutationBatch m = keyspace.prepareMutationBatch().withRetryPolicy(retryPolicy); ColumnListMutation<String> row = m.withRow(cf, objectName); if (attr.getChunkSize() != null) row.putColumn(getColumnName(Columns.CHUNKSIZE), attr.getChunkSize(), attr.getTtl()); if (attr.getChunkCount() != null) row.putColumn(getColumnName(Columns.CHUNKCOUNT), attr.getChunkCount(), attr.getTtl()); if (attr.getObjectSize() != null) row.putColumn(getColumnName(Columns.OBJECTSIZE), attr.getObjectSize(), attr.getTtl()); m.execute(); } @Override public ObjectMetadata readMetadata(String objectName) throws Exception, NotFoundException { ColumnList<String> columns = keyspace.prepareQuery(cf).getKey(objectName).execute().getResult(); if (columns.isEmpty()) { throw new NotFoundException(objectName); } return new ObjectMetadata().setObjectSize(columns.getLongValue(getColumnName(Columns.OBJECTSIZE), null)) .setChunkSize(columns.getIntegerValue(getColumnName(Columns.CHUNKSIZE), null)) .setChunkCount(columns.getIntegerValue(getColumnName(Columns.CHUNKCOUNT), null)); } @Override public void deleteObject(String objectName, Integer chunkCount) throws Exception, NotFoundException { if (chunkCount == null) { ObjectMetadata attr = readMetadata(objectName); if (attr.getChunkCount() == null) throw new NotFoundException("Object not found :" + objectName); chunkCount = attr.getChunkCount(); } MutationBatch m = keyspace.prepareMutationBatch().withRetryPolicy(retryPolicy); for (int i = 0; i < chunkCount; i++) { m.withRow(cf, getRowKey(objectName, i)).delete(); } m.withRow(cf, objectName).delete(); m.execute(); } @Override public int getDefaultChunkSize() { return DEFAULT_CHUNKSIZE; } }