public class

AnnotatedCompositeSerializer

extends AbstractSerializer<T>
package com.netflix.astyanax.serializers;

import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import com.netflix.astyanax.Serializer;
import com.netflix.astyanax.annotations.Component;
import com.netflix.astyanax.model.Equality;
import com.netflix.astyanax.model.RangeEndpoint;

/**
 * Serializer for a Pojo annotated with Component field annotations
 * 
 * Serialized data is formatted as a list of components with each component
 * having the format: <2 byte length><data><0>
 * 
 * @author elandau
 * 
 * @param <T>
 */
public class AnnotatedCompositeSerializer<T> extends AbstractSerializer<T> {
    private static byte END_OF_COMPONENT = 0;
    private static ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);

    /**
     * Serializer for a single component within the Pojo
     * 
     * @author elandau
     * 
     * @param <P>
     */
    public static class ComponentSerializer<P> implements Comparable<ComponentSerializer<?>> {
        private Field field;
        private Serializer<P> serializer;
        private int ordinal;

        public ComponentSerializer(Field field, Serializer<P> serializer, int ordinal) {
            this.field = field;
            this.field.setAccessible(true);
            this.serializer = serializer;
            this.ordinal = ordinal;
        }

        public Field getField() {
            return this.field;
        }

        public ByteBuffer serialize(Object obj) throws IllegalArgumentException, IllegalAccessException {
            Object value = field.get(obj);
            ByteBuffer buf = serializer.toByteBuffer((P) value);
            return buf;
        }

        public void deserialize(Object obj, ByteBuffer value) throws IllegalArgumentException, IllegalAccessException {
            field.set(obj, serializer.fromByteBuffer(value));
        }

        public ByteBuffer serializeValue(Object value) {
            ByteBuffer buf = serializer.toByteBuffer((P) value);
            return buf;
        }

        @Override
        public int compareTo(ComponentSerializer<?> other) {
            return this.ordinal - other.ordinal;
        }
    }

    private List<ComponentSerializer<?>> components;
    private Class<T> clazz;

    public AnnotatedCompositeSerializer(Class<T> clazz) {
        this.clazz = clazz;
        this.components = new ArrayList<ComponentSerializer<?>>();

        for (Field field : clazz.getDeclaredFields()) {
            Component annotation = field.getAnnotation(Component.class);
            if (annotation != null) {
                components.add(makeComponent(field, SerializerTypeInferer.getSerializer(field.getType()),
                        annotation.ordinal()));
            }
        }

        Collections.sort(this.components);
    }

    @Override
    public ByteBuffer toByteBuffer(T obj) {
        ByteBufferOutputStream out = new ByteBufferOutputStream();
        for (ComponentSerializer<?> serializer : components) {
            try {
                // First, serialize the ByteBuffer for this component
                ByteBuffer cb = serializer.serialize(obj);
                if (cb == null) {
                    cb = ByteBuffer.allocate(0);
                }

                // Write the data: <length><data><0>
                out.writeShort((short) cb.remaining());
                out.write(cb.slice());
                out.write(END_OF_COMPONENT);
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
        return out.getByteBuffer();
    }

    @Override
    public T fromByteBuffer(ByteBuffer byteBuffer) {
        byteBuffer = byteBuffer.duplicate();
        try {
            T obj = createContents(clazz);
            for (ComponentSerializer<?> serializer : components) {
                ByteBuffer data = getWithShortLength(byteBuffer);
                if (data != null) {
                    if (data.remaining() > 0) {
                        serializer.deserialize(obj, data);
                    }
                    byte end_of_component = byteBuffer.get();
                    if (end_of_component != END_OF_COMPONENT) {
                        throw new RuntimeException("Invalid composite column.  Expected END_OF_COMPONENT.");
                    }
                }
                else {
                    throw new RuntimeException("Missing component data in composite type");
                }
            }
            return obj;
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public ComparatorType getComparatorType() {
        return ComparatorType.COMPOSITETYPE;
    }

    private static int getShortLength(ByteBuffer bb) {
        int length = (bb.get() & 0xFF) << 8;
        return length | (bb.get() & 0xFF);
    }

    private static ByteBuffer getWithShortLength(ByteBuffer bb) {
        int length = getShortLength(bb);
        return getBytes(bb, length);
    }

    private static ByteBuffer getBytes(ByteBuffer bb, int length) {
        ByteBuffer copy = bb.duplicate();
        copy.limit(copy.position() + length);
        bb.position(bb.position() + length);
        return copy;
    }

    private static <P> ComponentSerializer<P> makeComponent(Field field, Serializer<P> serializer, int ordinal) {
        return new ComponentSerializer<P>(field, serializer, ordinal);
    }

    private T createContents(Class<T> clazz) throws InstantiationException, IllegalAccessException {
        return clazz.newInstance();
    }

    public CompositeRangeBuilder buildRange() {
        return new CompositeRangeBuilder() {
            private int position = 0;

            public void nextComponent() {
                position++;
            }

            public void append(ByteBufferOutputStream out, Object value, Equality equality) {
                ComponentSerializer<?> serializer = components.get(position);
                // First, serialize the ByteBuffer for this component
                ByteBuffer cb;
                try {
                    cb = serializer.serializeValue(value);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }

                if (cb == null) {
                    cb = EMPTY_BYTE_BUFFER;
                }

                // Write the data: <length><data><0>
                out.writeShort((short) cb.remaining());
                out.write(cb.slice());
                out.write(equality.toByte());
            }
        };
    }

    public <T1> RangeEndpoint makeEndpoint(T1 value, Equality equality) {
        RangeEndpoint endpoint = new RangeEndpoint() {
            private ByteBufferOutputStream out = new ByteBufferOutputStream();
            private int position = 0;

            @Override
            public RangeEndpoint append(Object value, Equality equality) {
                ComponentSerializer<?> serializer = components.get(position);
                position++;
                // First, serialize the ByteBuffer for this component
                ByteBuffer cb;
                try {
                    cb = serializer.serializeValue(value);
                }
                catch (Exception e) {
                    throw new RuntimeException(e);
                }

                if (cb == null) {
                    cb = EMPTY_BYTE_BUFFER;
                }

                // Write the data: <length><data><0>
                out.writeShort((short) cb.remaining());
                out.write(cb.slice());
                out.write(equality.toByte());
                return this;
            }

            @Override
            public ByteBuffer toBytes() {
                return out.getByteBuffer();
            }
        };
        endpoint.append(value, equality);
        return endpoint;
    }
}