public class

GridFSInputFile

extends GridFSFile
// GridFSInputFile.java

/**
 *      Copyright (C) 2008 10gen Inc.
 *
 *   Licensed under the Apache License, Version 2.0 (the "License");
 *   you may not use this file except in compliance with the License.
 *   You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 *   Unless required by applicable law or agreed to in writing, software
 *   distributed under the License is distributed on an "AS IS" BASIS,
 *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 *   See the License for the specific language governing permissions and
 *   limitations under the License.
 */

package com.mongodb.gridfs;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.MessageDigest;
import java.util.Date;

import org.bson.types.ObjectId;

import com.mongodb.BasicDBObjectBuilder;
import com.mongodb.DBObject;
import com.mongodb.WriteConcern;
import com.mongodb.MongoException;
import com.mongodb.util.SimplePool;
import com.mongodb.util.Util;

/**
 * This class represents a GridFS file to be written to the database
 * Operations include:
 * - writing data obtained from an InputStream
 * - getting an OutputStream to stream the data out
 *
 * @author Eliot Horowitz and Guy K. Kloss
 */
public class GridFSInputFile extends GridFSFile {

    /**
     * Default constructor setting the GridFS file name and providing an input
     * stream containing data to be written to the file.
     *
     * @param fs
     *            The GridFS connection handle.
     * @param in
     *            Stream used for reading data from.
     * @param filename
     *            Name of the file to be created.
     * @param closeStreamOnPersist
                  indicate the passed in input stream should be closed once the data chunk persisted
     */
    GridFSInputFile( GridFS fs , InputStream in , String filename, boolean closeStreamOnPersist ) {
        _fs = fs;
        _in = in;
        _filename = filename;
        _closeStreamOnPersist = closeStreamOnPersist;

        _id = new ObjectId();
        _chunkSize = GridFS.DEFAULT_CHUNKSIZE;
        _uploadDate = new Date();
        _messageDigester = _md5Pool.get();
        _messageDigester.reset();
        _buffer = new byte[(int) _chunkSize];
    }

    /**
     * Default constructor setting the GridFS file name and providing an input
     * stream containing data to be written to the file.
     *
     * @param fs
     *            The GridFS connection handle.
     * @param in
     *            Stream used for reading data from.
     * @param filename
     *            Name of the file to be created.
     */
    GridFSInputFile( GridFS fs , InputStream in , String filename ) {
        this( fs, in, filename, false);
    }

    /**
     * Constructor that only provides a file name, but does not rely on the
     * presence of an {@link java.io.InputStream}. An
     * {@link java.io.OutputStream} can later be obtained for writing using the
     * {@link #getOutputStream()} method.
     *
     * @param fs
     *            The GridFS connection handle.
     * @param filename
     *            Name of the file to be created.
     */
    GridFSInputFile( GridFS fs , String filename ) {
        this( fs , null , filename );
    }

    /**
     * Minimal constructor that does not rely on the presence of an
     * {@link java.io.InputStream}. An {@link java.io.OutputStream} can later be
     * obtained for writing using the {@link #getOutputStream()} method.
     *
     * @param fs
     *            The GridFS connection handle.
     */
    GridFSInputFile( GridFS fs ) {
        this( fs , null , null );
    }

    public void setId(Object id) {
        _id = id;
    }

    /**
     * Sets the file name on the GridFS entry.
     *
     * @param fn
     *            File name.
     */
    public void setFilename( String fn ) {
        _filename = fn;
    }

    /**
     * Sets the content type (MIME type) on the GridFS entry.
     *
     * @param ct
     *            Content type.
     */
    public void setContentType( String ct ) {
        _contentType = ct;
    }

    /**
     * Set the chunk size. This must be called before saving any data.
     * @param _chunkSize
     */
    public void setChunkSize(long _chunkSize) {
        if (_outputStream != null || _savedChunks)
            return;
        this._chunkSize = _chunkSize;
    }

    /**
     * calls {@link GridFSInputFile#save(long)} with the existing chunk size
     */
    @Override
    public void save() {
        save( _chunkSize);
    }

    /**
     * This method first calls saveChunks(long) if the file data has not been saved yet.
     * Then it persists the file entry to GridFS.
     *
     * @param chunkSize
     *            Size of chunks for file in bytes.
     */
    public void save( long chunkSize ) {
        if (_outputStream != null)
            throw new MongoException( "cannot mix OutputStream and regular save()" );

        // note that chunkSize only changes _chunkSize in case we actually save chunks
        // otherwise there is a risk file and chunks are not compatible
        if ( ! _savedChunks ) {
            try {
                saveChunks( chunkSize );
            } catch ( IOException ioe ) {
                throw new MongoException( "couldn't save chunks" , ioe );
            }
        }

        super.save();
    }

    /**
     * @see com.mongodb.gridfs.GridFSInputFile#saveChunks(long)
     *
     * @return Number of the next chunk.
     * @throws IOException
     *             on problems reading the new entry's
     *             {@link java.io.InputStream}.
     */
    public int saveChunks() throws IOException {
        return saveChunks( _chunkSize );
    }

    /**
     * Saves all data into chunks from configured {@link java.io.InputStream} input stream
     * to GridFS. A non-default chunk size can be specified.
     * This method does NOT save the file object itself, one must call save() to do so.
     *
     * @param chunkSize
     *            Size of chunks for file in bytes.
     * @return Number of the next chunk.
     * @throws IOException
     *             on problems reading the new entry's
     *             {@link java.io.InputStream}.
     */
    public int saveChunks( long chunkSize ) throws IOException {
        if (_outputStream != null)
            throw new MongoException( "cannot mix OutputStream and regular save()" );
        if ( _savedChunks )
            throw new MongoException( "chunks already saved!" );

        if ( chunkSize <= 0 || chunkSize > GridFS.MAX_CHUNKSIZE ) {
            throw new MongoException( "chunkSize must be greater than zero and less than or equal to GridFS.MAX_CHUNKSIZE");
        }

        if ( _chunkSize != chunkSize ) {
            _chunkSize = chunkSize;
            _buffer = new byte[(int) _chunkSize];
        }

        int bytesRead = 0;
        while ( bytesRead >= 0 ) {
            _currentBufferPosition = 0;
            bytesRead = _readStream2Buffer();
            _dumpBuffer( true );
        }

        // only finish data, do not write file, in case one wants to change metadata
        _finishData();
        return _currentChunkNumber;
    }

    /**
     * After retrieving this {@link java.io.OutputStream}, this object will be
     * capable of accepting successively written data to the output stream.
     * To completely persist this GridFS object, you must finally call the {@link java.io.OutputStream#close()}
     * method on the output stream. Note that calling the save() and saveChunks()
     * methods will throw Exceptions once you obtained the OutputStream.
     *
     * @return Writable stream object.
     */
    public OutputStream getOutputStream() {
        if ( _outputStream == null ) {
            _outputStream = new MyOutputStream();
        }
        return _outputStream;
    }

    /**
     * Dumps a new chunk into the chunks collection. Depending on the flag, also
     * partial buffers (at the end) are going to be written immediately.
     *
     * @param data
     *            Data for chunk.
     * @param writePartial
     *            Write also partial buffers full.
     */
    private void _dumpBuffer( boolean writePartial ) {
        if ( ( _currentBufferPosition < _chunkSize ) && !writePartial ) {
            // Bail out, chunk not complete yet
            return;
        }
        if (_currentBufferPosition == 0) {
            // chunk is empty, may be last chunk
            return;
        }

        byte[] writeBuffer = _buffer;
        if ( _currentBufferPosition != _chunkSize ) {
            writeBuffer = new byte[_currentBufferPosition];
            System.arraycopy( _buffer, 0, writeBuffer, 0, _currentBufferPosition );
        }

        DBObject chunk = BasicDBObjectBuilder.start()
                .add( "files_id", _id )
                .add( "n", _currentChunkNumber )
                .add( "data", writeBuffer ).get();

        _fs._chunkCollection.save( chunk );

        _currentChunkNumber++;
        _totalBytes += writeBuffer.length;
        _messageDigester.update( writeBuffer );
        _currentBufferPosition = 0;
    }

    /**
     * Reads a buffer full from the {@link java.io.InputStream}.
     *
     * @return Number of bytes read from stream.
     * @throws IOException
     *             if the reading from the stream fails.
     */
    private int _readStream2Buffer() throws IOException {
        int bytesRead = 0;
        while ( _currentBufferPosition < _chunkSize && bytesRead >= 0 ) {
            bytesRead = _in.read( _buffer, _currentBufferPosition,
                                 (int) _chunkSize - _currentBufferPosition );
            if ( bytesRead > 0 ) {
                _currentBufferPosition += bytesRead;
            } else if ( bytesRead == 0 ) {
                throw new RuntimeException( "i'm doing something wrong" );
            }
        }
        return bytesRead;
    }

    /**
     * Marks the data as fully written. This needs to be called before super.save()
     */
    private void _finishData() {
        if (!_savedChunks) {
            _md5 = Util.toHex( _messageDigester.digest() );
            _md5Pool.done( _messageDigester );
            _messageDigester = null;
            _length = _totalBytes;
            _savedChunks = true;
            try {
                if ( _in != null && _closeStreamOnPersist )
                    _in.close();
            } catch (IOException e) {
                //ignore
            }
        }
    }

    private final InputStream _in;
    private boolean _closeStreamOnPersist;
    private boolean _savedChunks = false;
    private byte[] _buffer = null;
    private int _currentChunkNumber = 0;
    private int _currentBufferPosition = 0;
    private long _totalBytes = 0;
    private MessageDigest _messageDigester = null;
    private OutputStream _outputStream = null;

    /**
     * A pool of {@link java.security.MessageDigest} objects.
     */
    static SimplePool<MessageDigest> _md5Pool
            = new SimplePool<MessageDigest>( "md5" , 10 , -1 , false , false ) {
        /**
         * {@inheritDoc}
         *
         * @see com.mongodb.util.SimplePool#createNew()
         */
        protected MessageDigest createNew() {
            try {
                return MessageDigest.getInstance( "MD5" );
            } catch ( java.security.NoSuchAlgorithmException e ) {
                throw new RuntimeException( "your system doesn't have md5!" );
            }
        }
    };

    /**
     * An output stream implementation that can be used to successively write to
     * a GridFS file.
     *
     * @author Guy K. Kloss
     */
    class MyOutputStream extends OutputStream {

        /**
         * {@inheritDoc}
         *
         * @see java.io.OutputStream#write(int)
         */
        @Override
        public void write( int b ) throws IOException {
            byte[] byteArray = new byte[1];
            byteArray[0] = (byte) (b & 0xff);
            write( byteArray, 0, 1 );
        }

        /**
         * {@inheritDoc}
         *
         * @see java.io.OutputStream#write(byte[], int, int)
         */
        @Override
        public void write( byte[] b , int off , int len ) throws IOException {
            int offset = off;
            int length = len;
            int toCopy = 0;
            while ( length > 0 ) {
                toCopy = length;
                if ( toCopy > _chunkSize - _currentBufferPosition ) {
                    toCopy = (int) _chunkSize - _currentBufferPosition;
                }
                System.arraycopy( b, offset, _buffer, _currentBufferPosition, toCopy );
                _currentBufferPosition += toCopy;
                offset += toCopy;
                length -= toCopy;
                if ( _currentBufferPosition == _chunkSize ) {
                    _dumpBuffer( false );
                }
            }
        }

        /**
         * Processes/saves all data from {@link java.io.InputStream} and closes
         * the potentially present {@link java.io.OutputStream}. The GridFS file
         * will be persisted afterwards.
         */
        @Override
        public void close() {
            // write last buffer if needed
            _dumpBuffer( true );
            // finish stream
            _finishData();
            // save file obj
            GridFSInputFile.super.save();
        }
    }
}