public static interface

JsonRowsWriter.ErrorHandler

package com.netflix.astyanax.util;

import java.io.PrintWriter;
import java.nio.ByteBuffer;
import java.text.SimpleDateFormat;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.codehaus.jettison.json.JSONObject;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.netflix.astyanax.ExceptionCallback;
import com.netflix.astyanax.SerializerPackage;
import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
import com.netflix.astyanax.model.Column;
import com.netflix.astyanax.model.ColumnList;
import com.netflix.astyanax.model.Row;
import com.netflix.astyanax.model.Rows;

public class JsonRowsWriter implements RowsWriter {
    public interface ErrorHandler {
        boolean onException(Exception e);
    }

    enum Field {
        ROW_KEY, COUNT, NAMES, ROWS, COLUMN, TIMESTAMP, VALUE,
    }

    private Map<Field, String> fieldNames = Maps.newHashMap();

    private String TIME_FORMAT_STRING = "yyyy-MM-dd'T'HH:mm:ss.SSSZ";
    private final PrintWriter out;
    private final SerializerPackage serializers;
    private String extra = null;
    private String errorValueText = "???";
    private boolean rowsAsArray = true;
    private boolean ignoreExceptions = true;
    private boolean ignoreUndefinedColumns = false;
    private Set<String> dynamicNames;
    private Set<String> fixedNames;
    private List<String> fixedNamesList;
    private Set<String> ignoreNames = Sets.newHashSet();
    private Set<String> metadataNames;
    private ExceptionCallback exceptionCallback;
    private boolean columnsAsRows = false;
    private int rowCount = 0;
    private int columnCount = 0;
    private int maxStringLength = 256;
    private String rowColumnDelimiter = "$";

    public JsonRowsWriter(PrintWriter out, SerializerPackage serializers) throws ConnectionException {
        this.out = out;
        this.serializers = serializers;

        fieldNames.put(Field.NAMES, "names");
        fieldNames.put(Field.ROWS, "rows");
        fieldNames.put(Field.COUNT, "count");
        fieldNames.put(Field.ROW_KEY, "_key");
        fieldNames.put(Field.COLUMN, "column");
        fieldNames.put(Field.TIMESTAMP, "timestamp");
        fieldNames.put(Field.VALUE, "value");
    }

    public JsonRowsWriter setRowsName(String fieldName) {
        fieldNames.put(Field.ROWS, fieldName);
        return this;
    }

    public JsonRowsWriter setNamesName(String fieldName) {
        fieldNames.put(Field.NAMES, fieldName);
        return this;
    }

    public JsonRowsWriter setCountName(String fieldName) {
        fieldNames.put(Field.COUNT, fieldName);
        return this;
    }

    public JsonRowsWriter setRowKeyName(String fieldName) {
        fieldNames.put(Field.ROW_KEY, fieldName);
        return this;
    }

    public JsonRowsWriter setExtra(String extra) {
        this.extra = extra;
        return this;
    }

    public JsonRowsWriter setRowsAsArray(boolean flag) {
        this.rowsAsArray = flag;
        return this;
    }

    public JsonRowsWriter setIgnoreUndefinedColumns(boolean flag) {
        this.ignoreUndefinedColumns = flag;
        return this;
    }

    public JsonRowsWriter setErrorValueText(String text) {
        this.errorValueText = text;
        return this;
    }

    public JsonRowsWriter setDynamicColumnNames(boolean flag) {
        if (flag) {
            dynamicNames = Sets.newLinkedHashSet();
        }
        else {
            dynamicNames = null;
        }
        return this;
    }

    public JsonRowsWriter setFixedColumnNames(String... columns) {
        this.fixedNames = Sets.newLinkedHashSet(Arrays.asList(columns));
        this.fixedNamesList = Arrays.asList(columns);
        return this;
    }

    public JsonRowsWriter setIgnoreColumnNames(String... columns) {
        this.ignoreNames = Sets.newHashSet(Arrays.asList(columns));
        return this;
    }

    public JsonRowsWriter setExceptionCallback(ExceptionCallback exceptionCallback) {
        this.exceptionCallback = exceptionCallback;
        return this;
    }

    public JsonRowsWriter setColumnsAsRows(boolean columnsAsRows) {
        this.columnsAsRows = columnsAsRows;
        setFixedColumnNames("column", "value", "timestamp");
        return this;
    }

    public JsonRowsWriter addExtra(String name, String value) {
        if (extra == null) {
            extra = new String();
        }

        if (!extra.isEmpty()) {
            extra += ",";
        }
        extra += jsonifyString(name) + ":" + jsonifyString(value);
        return this;
    }

    public JsonRowsWriter setMaxLength(int maxStringLength) {
        this.maxStringLength = maxStringLength;
        return this;
    }

    int getColumnCount() {
        return this.columnCount;
    }

    int getRowCount() {
        return this.rowCount;
    }

    @Override
    public void write(Rows<?, ?> rows) throws Exception {
        this.rowCount = 0;
        this.columnCount = 0;

        out.println("{");

        if (this.fixedNamesList != null) {
            writeColumnNames(this.fixedNamesList);
        }
        else if (this.dynamicNames == null) {
            metadataNames = this.getColumnNamesFromMetadata();
            List<String> names = Lists.newArrayList(metadataNames);
            Collections.sort(names);
            writeColumnNames(names);
        }

        out.append(jsonifyString(this.fieldNames.get(Field.ROWS))).append(":");

        int count = 0;
        if (columnsAsRows) {
            addExtra("columnsAsRows", "true");
            if (rowsAsArray) {
                out.append("[");
            }
            boolean firstRow = true;
            for (Row<?, ?> row : rows) {
                if (row.getColumns().isEmpty())
                    continue;
                rowCount++;
                String idString = serializers.keyAsString(row.getRawKey());
                count += writeColumnsAsRows(idString, row.getColumns(), firstRow);
                firstRow = false;
            }

            if (rowsAsArray) {
                out.append("]");
            }
            out.println();
        }
        else {
            if (rowsAsArray) {
                out.append("[").println();

                boolean firstRow = true;
                for (Row<?, ?> row : rows) {
                    if (row.getColumns().isEmpty())
                        continue;
                    rowCount++;
                    if (!firstRow) {
                        out.println(",");
                    }
                    else {
                        firstRow = false;
                    }

                    out.append("{");
                    try {
                        String idString = serializers.keyAsString(row.getRawKey());
                        out.append(jsonifyString(this.fieldNames.get(Field.ROW_KEY))).append(":")
                                .append(jsonifyString(idString));
                        writeColumns(row.getColumns(), false);
                        count++;
                    }
                    catch (Exception e) {
                        if (!ignoreExceptions) {
                            throw e;
                        }
                    }
                    out.print("}");
                }

                out.println();
                out.append("]");
            }
            else {
                out.append("{").println();

                boolean firstRow = true;
                for (Row<?, ?> row : rows) {
                    if (row.getColumns().isEmpty())
                        continue;
                    rowCount++;
                    if (!firstRow) {
                        out.println(",");
                    }
                    else {
                        firstRow = false;
                    }

                    try {
                        String idString = serializers.keyAsString(row.getRawKey());
                        out.append(jsonifyString(idString)).append(":{");
                        // out.append(jsonifyString(this.fieldNames.get(Field.ROW_KEY))).append(":").append(jsonifyString(idString));
                        writeColumns(row.getColumns(), true);
                        out.print("}");
                        count++;
                    }
                    catch (Exception e) {
                        if (!ignoreExceptions) {
                            throw e;
                        }
                    }
                }
                out.println();
                out.append("}");
            }
        }
        out.println(",");

        if (this.dynamicNames != null) {
            List<String> names = Lists.newArrayList(this.dynamicNames);
            Collections.sort(names);
            writeColumnNames(names);
        }

        if (extra != null) {
            out.append(extra).println(",");
        }
        out.append(jsonifyString(this.fieldNames.get(Field.COUNT))).append(":").append(Integer.toString(count))
                .println();
        out.println("}");
    }

    private int writeColumnsAsRows(String rowKey, ColumnList<?> columns, boolean first) throws Exception {
        for (Column<?> column : columns) {
            try {
                String columnString;
                try {
                    columnString = serializers.columnAsString(column.getRawName());
                }
                catch (Exception e) {
                    e.printStackTrace();
                    if (!ignoreExceptions) {
                        throw e;
                    }
                    columnString = e.getMessage(); // this.errorValueText;
                }

                String valueString = null;
                try {
                    valueString = serializers.valueAsString(column.getRawName(), column.getByteBufferValue());
                }
                catch (Exception e) {
                    e.printStackTrace();
                    if (!ignoreExceptions) {
                        throw e;
                    }
                    valueString = e.getMessage(); // this.errorValueText;
                }

                if (!first) {
                    out.println(",");
                }
                else {
                    first = false;
                }
                out.append("{");
                if (rowsAsArray) {
                    out.append(jsonifyString(this.fieldNames.get(Field.ROW_KEY))).append(":")
                            .append(jsonifyString(rowKey));
                }
                else {
                    out.append(jsonifyString(rowKey + rowColumnDelimiter + columnString)).append(":{");
                }
                out.print(",");

                String timestampString;
                try {
                    timestampString = new SimpleDateFormat(TIME_FORMAT_STRING).format(new Date(
                            column.getTimestamp() / 1000));
                }
                catch (Exception e) {
                    timestampString = "none";
                }

                columnCount++;
                out.append(jsonifyString(this.fieldNames.get(Field.COLUMN))).append(":")
                        .append(jsonifyString(columnString)).append(",")
                        .append(jsonifyString(this.fieldNames.get(Field.VALUE))).append(":")
                        .append(jsonifyString(valueString)).append(",")
                        .append(jsonifyString(this.fieldNames.get(Field.TIMESTAMP))).append(":")
                        .append(jsonifyString(timestampString)).append("}");
            }
            catch (Exception e) {
                if (!ignoreExceptions) {
                    throw e;
                }
            }
        }

        return columns.size();
    }

    private void writeColumns(ColumnList<?> columns, boolean first) throws Exception {
        for (Column<?> column : columns) {
            try {
                String columnString;
                try {
                    columnString = serializers.columnAsString(column.getRawName());
                }
                catch (Exception e) {
                    e.printStackTrace();
                    if (!ignoreExceptions) {
                        throw e;
                    }
                    columnString = e.getMessage(); // this.errorValueText;
                }
                if (this.ignoreNames.contains(columnString)) {
                    continue;
                }

                if (this.ignoreUndefinedColumns) {
                    if (this.fixedNames != null && !this.fixedNames.contains(columnString))
                        continue;
                    if (this.metadataNames != null && !this.metadataNames.contains(columnString))
                        continue;
                }

                if (this.dynamicNames != null)
                    this.dynamicNames.add(columnString);

                String valueString = null;
                try {
                    valueString = serializers.valueAsString(column.getRawName(), column.getByteBufferValue());
                }
                catch (Exception e) {
                    e.printStackTrace();
                    if (!ignoreExceptions) {
                        throw e;
                    }
                    valueString = e.getMessage(); // this.errorValueText;
                }

                if (!first)
                    out.append(",");
                else
                    first = false;
                out.append(jsonifyString(columnString)).append(":").append(jsonifyString(valueString));
            }
            catch (Exception e) {
                if (!ignoreExceptions) {
                    throw e;
                }
            }
        }
    }

    private Set<String> getColumnNamesFromMetadata() throws Exception {
        Set<String> set = Sets.newLinkedHashSet();
        try {
            for (ByteBuffer name : this.serializers.getColumnNames()) {
                try {
                    String columnName = this.serializers.columnAsString(name);
                    set.add(columnName);
                }
                catch (Exception e) {
                    if (!ignoreExceptions) {
                        throw e;
                    }
                }
            }
        }
        catch (Exception e) {
            if (!ignoreExceptions) {
                throw e;
            }
        }
        return set;
    }

    private void writeColumnNames(List<String> names) throws Exception {
        try {
            out.append(jsonifyString(this.fieldNames.get(Field.NAMES))).append(":[");

            boolean first = true;
            if (this.rowsAsArray) {
                out.append(jsonifyString(this.fieldNames.get(Field.ROW_KEY)));
                first = false;
            }

            for (String name : names) {
                if (this.ignoreNames.contains(name)) {
                    continue;
                }
                try {
                    if (!first)
                        out.append(",");
                    else
                        first = false;
                    out.append(jsonifyString(name));
                }
                catch (Exception e) {
                }
            }
            out.println("],");
        }
        catch (Exception e) {
            if (!ignoreExceptions) {
                throw e;
            }
        }
    }

    private String jsonifyString(String str) {
        if (str == null) {
            str = "null";
        }
        if (str.length() > maxStringLength) {
            return JSONObject.quote(str.substring(0, maxStringLength) + "...");
        }
        else {
            return JSONObject.quote(str);
        }
    }
}