public class

ParallelReader

extends IndexReader
package org.apache.lucene.index;

/**
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import org.apache.lucene.document.Document;
import org.apache.lucene.document.FieldSelector;
import org.apache.lucene.document.FieldSelectorResult;
import org.apache.lucene.document.Fieldable;
import org.apache.lucene.search.FieldCache; // not great (circular); used only to purge FieldCache entry on close

import java.io.IOException;
import java.util.*;


/** An IndexReader which reads multiple, parallel indexes.  Each index added
 * must have the same number of documents, but typically each contains
 * different fields.  Each document contains the union of the fields of all
 * documents with the same document number.  When searching, matches for a
 * query term are from the first index added that has the field.
 *
 * <p>This is useful, e.g., with collections that have large fields which
 * change rarely and small fields that change more frequently.  The smaller
 * fields may be re-indexed in a new index and both indexes may be searched
 * together.
 *
 * <p><strong>Warning:</strong> It is up to you to make sure all indexes
 * are created and modified the same way. For example, if you add
 * documents to one index, you need to add the same documents in the
 * same order to the other indexes. <em>Failure to do so will result in
 * undefined behavior</em>.
 */
public class ParallelReader extends IndexReader {
  private List<IndexReader> readers = new ArrayList<IndexReader>();
  private List<Boolean> decrefOnClose = new ArrayList<Boolean>(); // remember which subreaders to decRef on close
  boolean incRefReaders = false;
  private SortedMap<String,IndexReader> fieldToReader = new TreeMap<String,IndexReader>();
  private Map<IndexReader,Collection<String>> readerToFields = new HashMap<IndexReader,Collection<String>>();
  private List<IndexReader> storedFieldReaders = new ArrayList<IndexReader>();

  private int maxDoc;
  private int numDocs;
  private boolean hasDeletions;

 /** Construct a ParallelReader. 
  * <p>Note that all subreaders are closed if this ParallelReader is closed.</p>
  */
  public ParallelReader() throws IOException { this(true); }
   
 /** Construct a ParallelReader. 
  * @param closeSubReaders indicates whether the subreaders should be closed
  * when this ParallelReader is closed
  */
  public ParallelReader(boolean closeSubReaders) throws IOException {
    super();
    this.incRefReaders = !closeSubReaders;
  }

 /** Add an IndexReader.
  * @throws IOException if there is a low-level IO error
  */
  public void add(IndexReader reader) throws IOException {
    ensureOpen();
    add(reader, false);
  }

 /** Add an IndexReader whose stored fields will not be returned.  This can
  * accelerate search when stored fields are only needed from a subset of
  * the IndexReaders.
  *
  * @throws IllegalArgumentException if not all indexes contain the same number
  *     of documents
  * @throws IllegalArgumentException if not all indexes have the same value
  *     of {@link IndexReader#maxDoc()}
  * @throws IOException if there is a low-level IO error
  */
  public void add(IndexReader reader, boolean ignoreStoredFields)
    throws IOException {

    ensureOpen();
    if (readers.size() == 0) {
      this.maxDoc = reader.maxDoc();
      this.numDocs = reader.numDocs();
      this.hasDeletions = reader.hasDeletions();
    }

    if (reader.maxDoc() != maxDoc)                // check compatibility
      throw new IllegalArgumentException
        ("All readers must have same maxDoc: "+maxDoc+"!="+reader.maxDoc());
    if (reader.numDocs() != numDocs)
      throw new IllegalArgumentException
        ("All readers must have same numDocs: "+numDocs+"!="+reader.numDocs());

    Collection<String> fields = reader.getFieldNames(IndexReader.FieldOption.ALL);
    readerToFields.put(reader, fields);
    for (final String field : fields) {                         // update fieldToReader map
      if (fieldToReader.get(field) == null)
        fieldToReader.put(field, reader);
    }

    if (!ignoreStoredFields)
      storedFieldReaders.add(reader);             // add to storedFieldReaders
    readers.add(reader);
    
    if (incRefReaders) {
      reader.incRef();
    }
    decrefOnClose.add(Boolean.valueOf(incRefReaders));
  }
  
  @Override
  public synchronized Object clone() {
    try {
      return doReopen(true);
    } catch (Exception ex) {
      throw new RuntimeException(ex);
    }
  }
  
  /**
   * Tries to reopen the subreaders.
   * <br>
   * If one or more subreaders could be re-opened (i. e. subReader.reopen() 
   * returned a new instance != subReader), then a new ParallelReader instance 
   * is returned, otherwise this instance is returned.
   * <p>
   * A re-opened instance might share one or more subreaders with the old 
   * instance. Index modification operations result in undefined behavior
   * when performed before the old instance is closed.
   * (see {@link IndexReader#reopen()}).
   * <p>
   * If subreaders are shared, then the reference count of those
   * readers is increased to ensure that the subreaders remain open
   * until the last referring reader is closed.
   * 
   * @throws CorruptIndexException if the index is corrupt
   * @throws IOException if there is a low-level IO error 
   */
  @Override
  public synchronized IndexReader reopen() throws CorruptIndexException, IOException {
    return doReopen(false);
  }
    
  protected IndexReader doReopen(boolean doClone) throws CorruptIndexException, IOException {
    ensureOpen();
    
    boolean reopened = false;
    List<IndexReader> newReaders = new ArrayList<IndexReader>();
    
    boolean success = false;
    
    try {
      for (final IndexReader oldReader : readers) {
        IndexReader newReader = null;
        if (doClone) {
          newReader = (IndexReader) oldReader.clone();
        } else {
          newReader = oldReader.reopen();
        }
        newReaders.add(newReader);
        // if at least one of the subreaders was updated we remember that
        // and return a new ParallelReader
        if (newReader != oldReader) {
          reopened = true;
        }
      }
      success = true;
    } finally {
      if (!success && reopened) {
        for (int i = 0; i < newReaders.size(); i++) {
          IndexReader r = newReaders.get(i);
          if (r != readers.get(i)) {
            try {
              r.close();
            } catch (IOException ignore) {
              // keep going - we want to clean up as much as possible
            }
          }
        }
      }
    }

    if (reopened) {
      List<Boolean> newDecrefOnClose = new ArrayList<Boolean>();
      ParallelReader pr = new ParallelReader();
      for (int i = 0; i < readers.size(); i++) {
        IndexReader oldReader = readers.get(i);
        IndexReader newReader = newReaders.get(i);
        if (newReader == oldReader) {
          newDecrefOnClose.add(Boolean.TRUE);
          newReader.incRef();
        } else {
          // this is a new subreader instance, so on close() we don't
          // decRef but close it 
          newDecrefOnClose.add(Boolean.FALSE);
        }
        pr.add(newReader, !storedFieldReaders.contains(oldReader));
      }
      pr.decrefOnClose = newDecrefOnClose;
      pr.incRefReaders = incRefReaders;
      return pr;
    } else {
      // No subreader was refreshed
      return this;
    }
  }


  @Override
  public int numDocs() {
    // Don't call ensureOpen() here (it could affect performance)
    return numDocs;
  }

  @Override
  public int maxDoc() {
    // Don't call ensureOpen() here (it could affect performance)
    return maxDoc;
  }

  @Override
  public boolean hasDeletions() {
    // Don't call ensureOpen() here (it could affect performance)
    return hasDeletions;
  }

  // check first reader
  @Override
  public boolean isDeleted(int n) {
    // Don't call ensureOpen() here (it could affect performance)
    if (readers.size() > 0)
      return readers.get(0).isDeleted(n);
    return false;
  }

  // delete in all readers
  @Override
  protected void doDelete(int n) throws CorruptIndexException, IOException {
    for (final IndexReader reader : readers) {
      reader.deleteDocument(n);
    }
    hasDeletions = true;
  }

  // undeleteAll in all readers
  @Override
  protected void doUndeleteAll() throws CorruptIndexException, IOException {
    for (final IndexReader reader : readers) {
      reader.undeleteAll();
    }
    hasDeletions = false;
  }

  // append fields from storedFieldReaders
  @Override
  public Document document(int n, FieldSelector fieldSelector) throws CorruptIndexException, IOException {
    ensureOpen();
    Document result = new Document();
    for (final IndexReader reader: storedFieldReaders) {

      boolean include = (fieldSelector==null);
      if (!include) {
        Collection<String> fields = readerToFields.get(reader);
        for (final String field : fields)
          if (fieldSelector.accept(field) != FieldSelectorResult.NO_LOAD) {
            include = true;
            break;
          }
      }
      if (include) {
        List<Fieldable> fields = reader.document(n, fieldSelector).getFields();
        for (Fieldable field : fields) {
          result.add(field);
        }
      }
    }
    return result;
  }

  // get all vectors
  @Override
  public TermFreqVector[] getTermFreqVectors(int n) throws IOException {
    ensureOpen();
    ArrayList<TermFreqVector> results = new ArrayList<TermFreqVector>();
    for (final Map.Entry<String,IndexReader> e: fieldToReader.entrySet()) {

      String field = e.getKey();
      IndexReader reader = e.getValue();
      TermFreqVector vector = reader.getTermFreqVector(n, field);
      if (vector != null)
        results.add(vector);
    }
    return results.toArray(new TermFreqVector[results.size()]);
  }

  @Override
  public TermFreqVector getTermFreqVector(int n, String field)
    throws IOException {
    ensureOpen();
    IndexReader reader = fieldToReader.get(field);
    return reader==null ? null : reader.getTermFreqVector(n, field);
  }


  @Override
  public void getTermFreqVector(int docNumber, String field, TermVectorMapper mapper) throws IOException {
    ensureOpen();
    IndexReader reader = fieldToReader.get(field);
    if (reader != null) {
      reader.getTermFreqVector(docNumber, field, mapper); 
    }
  }

  @Override
  public void getTermFreqVector(int docNumber, TermVectorMapper mapper) throws IOException {
    ensureOpen();

    for (final Map.Entry<String,IndexReader> e : fieldToReader.entrySet()) {

      String field = e.getKey();
      IndexReader reader = e.getValue();
      reader.getTermFreqVector(docNumber, field, mapper);
    }

  }

  @Override
  public boolean hasNorms(String field) throws IOException {
    ensureOpen();
    IndexReader reader = fieldToReader.get(field);
    return reader==null ? false : reader.hasNorms(field);
  }

  @Override
  public byte[] norms(String field) throws IOException {
    ensureOpen();
    IndexReader reader = fieldToReader.get(field);
    return reader==null ? null : reader.norms(field);
  }

  @Override
  public void norms(String field, byte[] result, int offset)
    throws IOException {
    ensureOpen();
    IndexReader reader = fieldToReader.get(field);
    if (reader!=null)
      reader.norms(field, result, offset);
  }

  @Override
  protected void doSetNorm(int n, String field, byte value)
    throws CorruptIndexException, IOException {
    IndexReader reader = fieldToReader.get(field);
    if (reader!=null)
      reader.doSetNorm(n, field, value);
  }

  @Override
  public TermEnum terms() throws IOException {
    ensureOpen();
    return new ParallelTermEnum();
  }

  @Override
  public TermEnum terms(Term term) throws IOException {
    ensureOpen();
    return new ParallelTermEnum(term);
  }

  @Override
  public int docFreq(Term term) throws IOException {
    ensureOpen();
    IndexReader reader = fieldToReader.get(term.field());
    return reader==null ? 0 : reader.docFreq(term);
  }

  @Override
  public TermDocs termDocs(Term term) throws IOException {
    ensureOpen();
    return new ParallelTermDocs(term);
  }

  @Override
  public TermDocs termDocs() throws IOException {
    ensureOpen();
    return new ParallelTermDocs();
  }

  @Override
  public TermPositions termPositions(Term term) throws IOException {
    ensureOpen();
    return new ParallelTermPositions(term);
  }

  @Override
  public TermPositions termPositions() throws IOException {
    ensureOpen();
    return new ParallelTermPositions();
  }
  
  /**
   * Checks recursively if all subreaders are up to date. 
   */
  @Override
  public boolean isCurrent() throws CorruptIndexException, IOException {
    for (final IndexReader reader : readers) {
      if (!reader.isCurrent()) {
        return false;
      }
    }
    
    // all subreaders are up to date
    return true;
  }

  /**
   * Checks recursively if all subindexes are optimized 
   */
  @Override
  public boolean isOptimized() {
    for (final IndexReader reader : readers) {
      if (!reader.isOptimized()) {
        return false;
      }
    }
    
    // all subindexes are optimized
    return true;
  }

  
  /** Not implemented.
   * @throws UnsupportedOperationException
   */
  @Override
  public long getVersion() {
    throw new UnsupportedOperationException("ParallelReader does not support this method.");
  }

  // for testing
  IndexReader[] getSubReaders() {
    return readers.toArray(new IndexReader[readers.size()]);
  }

  @Override
  protected void doCommit(Map<String,String> commitUserData) throws IOException {
    for (final IndexReader reader : readers)
      reader.commit(commitUserData);
  }

  @Override
  protected synchronized void doClose() throws IOException {
    for (int i = 0; i < readers.size(); i++) {
      if (decrefOnClose.get(i).booleanValue()) {
        readers.get(i).decRef();
      } else {
        readers.get(i).close();
      }
    }

    FieldCache.DEFAULT.purge(this);
  }

  @Override
  public Collection<String> getFieldNames (IndexReader.FieldOption fieldNames) {
    ensureOpen();
    Set<String> fieldSet = new HashSet<String>();
    for (final IndexReader reader : readers) {
      Collection<String> names = reader.getFieldNames(fieldNames);
      fieldSet.addAll(names);
    }
    return fieldSet;
  }

  private class ParallelTermEnum extends TermEnum {
    private String field;
    private Iterator<String> fieldIterator;
    private TermEnum termEnum;

    public ParallelTermEnum() throws IOException {
      try {
        field = fieldToReader.firstKey();
      } catch(NoSuchElementException e) {
        // No fields, so keep field == null, termEnum == null
        return;
      }
      if (field != null)
        termEnum = fieldToReader.get(field).terms();
    }

    public ParallelTermEnum(Term term) throws IOException {
      field = term.field();
      IndexReader reader = fieldToReader.get(field);
      if (reader!=null)
        termEnum = reader.terms(term);
    }

    @Override
    public boolean next() throws IOException {
      if (termEnum==null)
        return false;

      // another term in this field?
      if (termEnum.next() && termEnum.term().field()==field)
        return true;                              // yes, keep going

      termEnum.close();                           // close old termEnum

      // find the next field with terms, if any
      if (fieldIterator==null) {
        fieldIterator = fieldToReader.tailMap(field).keySet().iterator();
        fieldIterator.next();                     // Skip field to get next one
      }
      while (fieldIterator.hasNext()) {
        field = fieldIterator.next();
        termEnum = fieldToReader.get(field).terms(new Term(field));
        Term term = termEnum.term();
        if (term!=null && term.field()==field)
          return true;
        else
          termEnum.close();
      }
 
      return false;                               // no more fields
    }

    @Override
    public Term term() {
      if (termEnum==null)
        return null;

      return termEnum.term();
    }

    @Override
    public int docFreq() {
      if (termEnum==null)
        return 0;

      return termEnum.docFreq();
    }

    @Override
    public void close() throws IOException {
      if (termEnum!=null)
        termEnum.close();
    }

  }

  // wrap a TermDocs in order to support seek(Term)
  private class ParallelTermDocs implements TermDocs {
    protected TermDocs termDocs;

    public ParallelTermDocs() {}
    public ParallelTermDocs(Term term) throws IOException {
      if (term == null)
        termDocs = readers.isEmpty() ? null : readers.get(0).termDocs(null);
      else
        seek(term);
    }

    public int doc() { return termDocs.doc(); }
    public int freq() { return termDocs.freq(); }

    public void seek(Term term) throws IOException {
      IndexReader reader = fieldToReader.get(term.field());
      termDocs = reader!=null ? reader.termDocs(term) : null;
    }

    public void seek(TermEnum termEnum) throws IOException {
      seek(termEnum.term());
    }

    public boolean next() throws IOException {
      if (termDocs==null)
        return false;

      return termDocs.next();
    }

    public int read(final int[] docs, final int[] freqs) throws IOException {
      if (termDocs==null)
        return 0;

      return termDocs.read(docs, freqs);
    }

    public boolean skipTo(int target) throws IOException {
      if (termDocs==null)
        return false;

      return termDocs.skipTo(target);
    }

    public void close() throws IOException {
      if (termDocs!=null)
        termDocs.close();
    }

  }

  private class ParallelTermPositions
    extends ParallelTermDocs implements TermPositions {

    public ParallelTermPositions() {}
    public ParallelTermPositions(Term term) throws IOException { seek(term); }

    @Override
    public void seek(Term term) throws IOException {
      IndexReader reader = fieldToReader.get(term.field());
      termDocs = reader!=null ? reader.termPositions(term) : null;
    }

    public int nextPosition() throws IOException {
      // It is an error to call this if there is no next position, e.g. if termDocs==null
      return ((TermPositions)termDocs).nextPosition();
    }

    public int getPayloadLength() {
      return ((TermPositions)termDocs).getPayloadLength();
    }

    public byte[] getPayload(byte[] data, int offset) throws IOException {
      return ((TermPositions)termDocs).getPayload(data, offset);
    }


    // TODO: Remove warning after API has been finalized
    public boolean isPayloadAvailable() {
      return ((TermPositions) termDocs).isPayloadAvailable();
    }
  }

}