public abstract class

ParallelFlowable

extends Object
/**
 * Copyright (c) 2016-present, RxJava Contributors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.
 */

package io.reactivex.parallel;

import java.util.*;
import java.util.concurrent.Callable;

import io.reactivex.*;
import io.reactivex.annotations.*;
import io.reactivex.exceptions.Exceptions;
import io.reactivex.functions.*;
import io.reactivex.internal.functions.*;
import io.reactivex.internal.operators.parallel.*;
import io.reactivex.internal.subscriptions.EmptySubscription;
import io.reactivex.internal.util.*;
import io.reactivex.plugins.RxJavaPlugins;
import org.reactivestreams.*;

/**
 * Abstract base class for Parallel publishers that take an array of Subscribers.
 * <p>
 * Use {@code from()} to start processing a regular Publisher in 'rails'.
 * Use {@code runOn()} to introduce where each 'rail' should run on thread-vise.
 * Use {@code sequential()} to merge the sources back into a single Flowable.
 *
 * <p>History: 2.0.5 - experimental
 * @param <T> the value type
 * @since 2.1 - beta
 */
@Beta
public abstract class ParallelFlowable<T> {

    /**
     * Subscribes an array of Subscribers to this ParallelFlowable and triggers
     * the execution chain for all 'rails'.
     *
     * @param subscribers the subscribers array to run in parallel, the number
     * of items must be equal to the parallelism level of this ParallelFlowable
     * @see #parallelism()
     */
    public abstract void subscribe(@NonNull Subscriber<? super T>[] subscribers);

    /**
     * Returns the number of expected parallel Subscribers.
     * @return the number of expected parallel Subscribers
     */
    public abstract int parallelism();

    /**
     * Validates the number of subscribers and returns true if their number
     * matches the parallelism level of this ParallelFlowable.
     *
     * @param subscribers the array of Subscribers
     * @return true if the number of subscribers equals to the parallelism level
     */
    protected final boolean validate(@NonNull Subscriber<?>[] subscribers) {
        int p = parallelism();
        if (subscribers.length != p) {
            Throwable iae = new IllegalArgumentException("parallelism = " + p + ", subscribers = " + subscribers.length);
            for (Subscriber<?> s : subscribers) {
                EmptySubscription.error(iae, s);
            }
            return false;
        }
        return true;
    }

    /**
     * Take a Publisher and prepare to consume it on multiple 'rails' (number of CPUs)
     * in a round-robin fashion.
     * @param <T> the value type
     * @param source the source Publisher
     * @return the ParallelFlowable instance
     */
    @CheckReturnValue
    public static <T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> source) {
        return from(source, Runtime.getRuntime().availableProcessors(), Flowable.bufferSize());
    }

    /**
     * Take a Publisher and prepare to consume it on parallelism number of 'rails' in a round-robin fashion.
     * @param <T> the value type
     * @param source the source Publisher
     * @param parallelism the number of parallel rails
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    public static <T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> source, int parallelism) {
        return from(source, parallelism, Flowable.bufferSize());
    }

    /**
     * Take a Publisher and prepare to consume it on parallelism number of 'rails' ,
     * possibly ordered and round-robin fashion and use custom prefetch amount and queue
     * for dealing with the source Publisher's values.
     * @param <T> the value type
     * @param source the source Publisher
     * @param parallelism the number of parallel rails
     * @param prefetch the number of values to prefetch from the source
     * the source until there is a rail ready to process it.
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public static <T> ParallelFlowable<T> from(@NonNull Publisher<? extends T> source,
            int parallelism, int prefetch) {
        ObjectHelper.requireNonNull(source, "source");
        ObjectHelper.verifyPositive(parallelism, "parallelism");
        ObjectHelper.verifyPositive(prefetch, "prefetch");

        return RxJavaPlugins.onAssembly(new ParallelFromPublisher<T>(source, parallelism, prefetch));
    }

    /**
     * Maps the source values on each 'rail' to another value.
     * <p>
     * Note that the same mapper function may be called from multiple threads concurrently.
     * @param <R> the output value type
     * @param mapper the mapper function turning Ts into Us.
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final <R> ParallelFlowable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper");
        return RxJavaPlugins.onAssembly(new ParallelMap<T, R>(this, mapper));
    }

    /**
     * Maps the source values on each 'rail' to another value and
     * handles errors based on the given {@link ParallelFailureHandling} enumeration value.
     * <p>
     * Note that the same mapper function may be called from multiple threads concurrently.
     * @param <R> the output value type
     * @param mapper the mapper function turning Ts into Us.
     * @param errorHandler the enumeration that defines how to handle errors thrown
     *                     from the mapper function
     * @return the new ParallelFlowable instance
     * @since 2.0.8 - experimental
     */
    @CheckReturnValue
    @Experimental
    @NonNull
    public final <R> ParallelFlowable<R> map(@NonNull Function<? super T, ? extends R> mapper, @NonNull ParallelFailureHandling errorHandler) {
        ObjectHelper.requireNonNull(mapper, "mapper");
        ObjectHelper.requireNonNull(errorHandler, "errorHandler is null");
        return RxJavaPlugins.onAssembly(new ParallelMapTry<T, R>(this, mapper, errorHandler));
    }

    /**
     * Maps the source values on each 'rail' to another value and
     * handles errors based on the returned value by the handler function.
     * <p>
     * Note that the same mapper function may be called from multiple threads concurrently.
     * @param <R> the output value type
     * @param mapper the mapper function turning Ts into Us.
     * @param errorHandler the function called with the current repeat count and
     *                     failure Throwable and should return one of the {@link ParallelFailureHandling}
     *                     enumeration values to indicate how to proceed.
     * @return the new ParallelFlowable instance
     * @since 2.0.8 - experimental
     */
    @CheckReturnValue
    @Experimental
    @NonNull
    public final <R> ParallelFlowable<R> map(@NonNull Function<? super T, ? extends R> mapper, @NonNull BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler) {
        ObjectHelper.requireNonNull(mapper, "mapper");
        ObjectHelper.requireNonNull(errorHandler, "errorHandler is null");
        return RxJavaPlugins.onAssembly(new ParallelMapTry<T, R>(this, mapper, errorHandler));
    }

    /**
     * Filters the source values on each 'rail'.
     * <p>
     * Note that the same predicate may be called from multiple threads concurrently.
     * @param predicate the function returning true to keep a value or false to drop a value
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    public final ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate) {
        ObjectHelper.requireNonNull(predicate, "predicate");
        return RxJavaPlugins.onAssembly(new ParallelFilter<T>(this, predicate));
    }

    /**
     * Filters the source values on each 'rail' and
     * handles errors based on the given {@link ParallelFailureHandling} enumeration value.
     * <p>
     * Note that the same predicate may be called from multiple threads concurrently.
     * @param predicate the function returning true to keep a value or false to drop a value
     * @param errorHandler the enumeration that defines how to handle errors thrown
     *                     from the predicate
     * @return the new ParallelFlowable instance
     * @since 2.0.8 - experimental
     */
    @CheckReturnValue
    @Experimental
    public final ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate, @NonNull ParallelFailureHandling errorHandler) {
        ObjectHelper.requireNonNull(predicate, "predicate");
        ObjectHelper.requireNonNull(errorHandler, "errorHandler is null");
        return RxJavaPlugins.onAssembly(new ParallelFilterTry<T>(this, predicate, errorHandler));
    }


    /**
     * Filters the source values on each 'rail' and
     * handles errors based on the returned value by the handler function.
     * <p>
     * Note that the same predicate may be called from multiple threads concurrently.
     * @param predicate the function returning true to keep a value or false to drop a value
     * @param errorHandler the function called with the current repeat count and
     *                     failure Throwable and should return one of the {@link ParallelFailureHandling}
     *                     enumeration values to indicate how to proceed.
     * @return the new ParallelFlowable instance
     * @since 2.0.8 - experimental
     */
    @CheckReturnValue
    @Experimental
    public final ParallelFlowable<T> filter(@NonNull Predicate<? super T> predicate, @NonNull BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler) {
        ObjectHelper.requireNonNull(predicate, "predicate");
        ObjectHelper.requireNonNull(errorHandler, "errorHandler is null");
        return RxJavaPlugins.onAssembly(new ParallelFilterTry<T>(this, predicate, errorHandler));
    }

    /**
     * Specifies where each 'rail' will observe its incoming values with
     * no work-stealing and default prefetch amount.
     * <p>
     * This operator uses the default prefetch size returned by {@code Flowable.bufferSize()}.
     * <p>
     * The operator will call {@code Scheduler.createWorker()} as many
     * times as this ParallelFlowable's parallelism level is.
     * <p>
     * No assumptions are made about the Scheduler's parallelism level,
     * if the Scheduler's parallelism level is lower than the ParallelFlowable's,
     * some rails may end up on the same thread/worker.
     * <p>
     * This operator doesn't require the Scheduler to be trampolining as it
     * does its own built-in trampolining logic.
     *
     * @param scheduler the scheduler to use
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler) {
        return runOn(scheduler, Flowable.bufferSize());
    }

    /**
     * Specifies where each 'rail' will observe its incoming values with
     * possibly work-stealing and a given prefetch amount.
     * <p>
     * This operator uses the default prefetch size returned by {@code Flowable.bufferSize()}.
     * <p>
     * The operator will call {@code Scheduler.createWorker()} as many
     * times as this ParallelFlowable's parallelism level is.
     * <p>
     * No assumptions are made about the Scheduler's parallelism level,
     * if the Scheduler's parallelism level is lower than the ParallelFlowable's,
     * some rails may end up on the same thread/worker.
     * <p>
     * This operator doesn't require the Scheduler to be trampolining as it
     * does its own built-in trampolining logic.
     *
     * @param scheduler the scheduler to use
     * that rail's worker has run out of work.
     * @param prefetch the number of values to request on each 'rail' from the source
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final ParallelFlowable<T> runOn(@NonNull Scheduler scheduler, int prefetch) {
        ObjectHelper.requireNonNull(scheduler, "scheduler");
        ObjectHelper.verifyPositive(prefetch, "prefetch");
        return RxJavaPlugins.onAssembly(new ParallelRunOn<T>(this, scheduler, prefetch));
    }

    /**
     * Reduces all values within a 'rail' and across 'rails' with a reducer function into a single
     * sequential value.
     * <p>
     * Note that the same reducer function may be called from multiple threads concurrently.
     * @param reducer the function to reduce two values into one.
     * @return the new Flowable instance emitting the reduced value or empty if the ParallelFlowable was empty
     */
    @CheckReturnValue
    @NonNull
    public final Flowable<T> reduce(@NonNull BiFunction<T, T, T> reducer) {
        ObjectHelper.requireNonNull(reducer, "reducer");
        return RxJavaPlugins.onAssembly(new ParallelReduceFull<T>(this, reducer));
    }

    /**
     * Reduces all values within a 'rail' to a single value (with a possibly different type) via
     * a reducer function that is initialized on each rail from an initialSupplier value.
     * <p>
     * Note that the same mapper function may be called from multiple threads concurrently.
     * @param <R> the reduced output type
     * @param initialSupplier the supplier for the initial value
     * @param reducer the function to reduce a previous output of reduce (or the initial value supplied)
     * with a current source value.
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final <R> ParallelFlowable<R> reduce(@NonNull Callable<R> initialSupplier, @NonNull BiFunction<R, ? super T, R> reducer) {
        ObjectHelper.requireNonNull(initialSupplier, "initialSupplier");
        ObjectHelper.requireNonNull(reducer, "reducer");
        return RxJavaPlugins.onAssembly(new ParallelReduce<T, R>(this, initialSupplier, reducer));
    }

    /**
     * Merges the values from each 'rail' in a round-robin or same-order fashion and
     * exposes it as a regular Publisher sequence, running with a default prefetch value
     * for the rails.
     * <p>
     * This operator uses the default prefetch size returned by {@code Flowable.bufferSize()}.
     * <img width="640" height="602" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/parallelflowable.sequential.png" alt="">
     * <dl>
     *  <dt><b>Backpressure:</b></dt>
     *  <dd>The operator honors backpressure.</dd>
     *  <dt><b>Scheduler:</b></dt>
     *  <dd>{@code sequential} does not operate by default on a particular {@link Scheduler}.</dd>
     * </dl>
     * @return the new Flowable instance
     * @see ParallelFlowable#sequential(int)
     * @see ParallelFlowable#sequentialDelayError()
     */
    @BackpressureSupport(BackpressureKind.FULL)
    @SchedulerSupport(SchedulerSupport.NONE)
    @CheckReturnValue
    public final Flowable<T> sequential() {
        return sequential(Flowable.bufferSize());
    }

    /**
     * Merges the values from each 'rail' in a round-robin or same-order fashion and
     * exposes it as a regular Publisher sequence, running with a give prefetch value
     * for the rails.
     * <img width="640" height="602" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/parallelflowable.sequential.png" alt="">
     * <dl>
     *  <dt><b>Backpressure:</b></dt>
     *  <dd>The operator honors backpressure.</dd>
     *  <dt><b>Scheduler:</b></dt>
     *  <dd>{@code sequential} does not operate by default on a particular {@link Scheduler}.</dd>
     * </dl>
     * @param prefetch the prefetch amount to use for each rail
     * @return the new Flowable instance
     * @see ParallelFlowable#sequential()
     * @see ParallelFlowable#sequentialDelayError(int)
     */
    @BackpressureSupport(BackpressureKind.FULL)
    @SchedulerSupport(SchedulerSupport.NONE)
    @CheckReturnValue
    @NonNull
    public final Flowable<T> sequential(int prefetch) {
        ObjectHelper.verifyPositive(prefetch, "prefetch");
        return RxJavaPlugins.onAssembly(new ParallelJoin<T>(this, prefetch, false));
    }

    /**
     * Merges the values from each 'rail' in a round-robin or same-order fashion and
     * exposes it as a regular Flowable sequence, running with a default prefetch value
     * for the rails and delaying errors from all rails till all terminate.
     * <p>
     * This operator uses the default prefetch size returned by {@code Flowable.bufferSize()}.
     * <img width="640" height="602" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/parallelflowable.sequential.png" alt="">
     * <dl>
     *  <dt><b>Backpressure:</b></dt>
     *  <dd>The operator honors backpressure.</dd>
     *  <dt><b>Scheduler:</b></dt>
     *  <dd>{@code sequentialDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
     * </dl>
     * @return the new Flowable instance
     * @see ParallelFlowable#sequentialDelayError(int)
     * @see ParallelFlowable#sequential()
     * @since 2.0.7 - experimental
     */
    @BackpressureSupport(BackpressureKind.FULL)
    @SchedulerSupport(SchedulerSupport.NONE)
    @CheckReturnValue
    @Experimental
    @NonNull
    public final Flowable<T> sequentialDelayError() {
        return sequentialDelayError(Flowable.bufferSize());
    }

    /**
     * Merges the values from each 'rail' in a round-robin or same-order fashion and
     * exposes it as a regular Publisher sequence, running with a give prefetch value
     * for the rails and delaying errors from all rails till all terminate.
     * <img width="640" height="602" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/parallelflowable.sequential.png" alt="">
     * <dl>
     *  <dt><b>Backpressure:</b></dt>
     *  <dd>The operator honors backpressure.</dd>
     *  <dt><b>Scheduler:</b></dt>
     *  <dd>{@code sequentialDelayError} does not operate by default on a particular {@link Scheduler}.</dd>
     * </dl>
     * @param prefetch the prefetch amount to use for each rail
     * @return the new Flowable instance
     * @see ParallelFlowable#sequential()
     * @see ParallelFlowable#sequentialDelayError()
     * @since 2.0.7 - experimental
     */
    @BackpressureSupport(BackpressureKind.FULL)
    @SchedulerSupport(SchedulerSupport.NONE)
    @CheckReturnValue
    @NonNull
    public final Flowable<T> sequentialDelayError(int prefetch) {
        ObjectHelper.verifyPositive(prefetch, "prefetch");
        return RxJavaPlugins.onAssembly(new ParallelJoin<T>(this, prefetch, true));
    }

    /**
     * Sorts the 'rails' of this ParallelFlowable and returns a Publisher that sequentially
     * picks the smallest next value from the rails.
     * <p>
     * This operator requires a finite source ParallelFlowable.
     *
     * @param comparator the comparator to use
     * @return the new Flowable instance
     */
    @CheckReturnValue
    @NonNull
    public final Flowable<T> sorted(@NonNull Comparator<? super T> comparator) {
        return sorted(comparator, 16);
    }

    /**
     * Sorts the 'rails' of this ParallelFlowable and returns a Publisher that sequentially
     * picks the smallest next value from the rails.
     * <p>
     * This operator requires a finite source ParallelFlowable.
     *
     * @param comparator the comparator to use
     * @param capacityHint the expected number of total elements
     * @return the new Flowable instance
     */
    @CheckReturnValue
    @NonNull
    public final Flowable<T> sorted(@NonNull Comparator<? super T> comparator, int capacityHint) {
        ObjectHelper.requireNonNull(comparator, "comparator is null");
        ObjectHelper.verifyPositive(capacityHint, "capacityHint");
        int ch = capacityHint / parallelism() + 1;
        ParallelFlowable<List<T>> railReduced = reduce(Functions.<T>createArrayList(ch), ListAddBiConsumer.<T>instance());
        ParallelFlowable<List<T>> railSorted = railReduced.map(new SorterFunction<T>(comparator));

        return RxJavaPlugins.onAssembly(new ParallelSortedJoin<T>(railSorted, comparator));
    }

    /**
     * Sorts the 'rails' according to the comparator and returns a full sorted list as a Publisher.
     * <p>
     * This operator requires a finite source ParallelFlowable.
     *
     * @param comparator the comparator to compare elements
     * @return the new Flowable instance
     */
    @CheckReturnValue
    @NonNull
    public final Flowable<List<T>> toSortedList(@NonNull Comparator<? super T> comparator) {
        return toSortedList(comparator, 16);
    }
    /**
     * Sorts the 'rails' according to the comparator and returns a full sorted list as a Publisher.
     * <p>
     * This operator requires a finite source ParallelFlowable.
     *
     * @param comparator the comparator to compare elements
     * @param capacityHint the expected number of total elements
     * @return the new Flowable instance
     */
    @CheckReturnValue
    @NonNull
    public final Flowable<List<T>> toSortedList(@NonNull Comparator<? super T> comparator, int capacityHint) {
        ObjectHelper.requireNonNull(comparator, "comparator is null");
        ObjectHelper.verifyPositive(capacityHint, "capacityHint");

        int ch = capacityHint / parallelism() + 1;
        ParallelFlowable<List<T>> railReduced = reduce(Functions.<T>createArrayList(ch), ListAddBiConsumer.<T>instance());
        ParallelFlowable<List<T>> railSorted = railReduced.map(new SorterFunction<T>(comparator));

        Flowable<List<T>> merged = railSorted.reduce(new MergerBiFunction<T>(comparator));

        return RxJavaPlugins.onAssembly(merged);
    }

    /**
     * Call the specified consumer with the current element passing through any 'rail'.
     *
     * @param onNext the callback
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final ParallelFlowable<T> doOnNext(@NonNull Consumer<? super T> onNext) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
                onNext,
                Functions.emptyConsumer(),
                Functions.emptyConsumer(),
                Functions.EMPTY_ACTION,
                Functions.EMPTY_ACTION,
                Functions.emptyConsumer(),
                Functions.EMPTY_LONG_CONSUMER,
                Functions.EMPTY_ACTION
                ));
    }


    /**
     * Call the specified consumer with the current element passing through any 'rail' and
     * handles errors based on the given {@link ParallelFailureHandling} enumeration value.
     *
     * @param onNext the callback
     * @param errorHandler the enumeration that defines how to handle errors thrown
     *                     from the onNext consumer
     * @return the new ParallelFlowable instance
     * @since 2.0.8 - experimental
     */
    @CheckReturnValue
    @Experimental
    @NonNull
    public final ParallelFlowable<T> doOnNext(@NonNull Consumer<? super T> onNext, @NonNull ParallelFailureHandling errorHandler) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(errorHandler, "errorHandler is null");
        return RxJavaPlugins.onAssembly(new ParallelDoOnNextTry<T>(this, onNext, errorHandler));
    }

    /**
     * Call the specified consumer with the current element passing through any 'rail' and
     * handles errors based on the returned value by the handler function.
     *
     * @param onNext the callback
     * @param errorHandler the function called with the current repeat count and
     *                     failure Throwable and should return one of the {@link ParallelFailureHandling}
     *                     enumeration values to indicate how to proceed.
     * @return the new ParallelFlowable instance
     * @since 2.0.8 - experimental
     */
    @CheckReturnValue
    @Experimental
    @NonNull
    public final ParallelFlowable<T> doOnNext(@NonNull Consumer<? super T> onNext, @NonNull BiFunction<? super Long, ? super Throwable, ParallelFailureHandling> errorHandler) {
        ObjectHelper.requireNonNull(onNext, "onNext is null");
        ObjectHelper.requireNonNull(errorHandler, "errorHandler is null");
        return RxJavaPlugins.onAssembly(new ParallelDoOnNextTry<T>(this, onNext, errorHandler));
    }

    /**
     * Call the specified consumer with the current element passing through any 'rail'
     * after it has been delivered to downstream within the rail.
     *
     * @param onAfterNext the callback
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final ParallelFlowable<T> doAfterNext(@NonNull Consumer<? super T> onAfterNext) {
        ObjectHelper.requireNonNull(onAfterNext, "onAfterNext is null");
        return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
                Functions.emptyConsumer(),
                onAfterNext,
                Functions.emptyConsumer(),
                Functions.EMPTY_ACTION,
                Functions.EMPTY_ACTION,
                Functions.emptyConsumer(),
                Functions.EMPTY_LONG_CONSUMER,
                Functions.EMPTY_ACTION
                ));
    }

    /**
     * Call the specified consumer with the exception passing through any 'rail'.
     *
     * @param onError the callback
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final ParallelFlowable<T> doOnError(@NonNull Consumer<Throwable> onError) {
        ObjectHelper.requireNonNull(onError, "onError is null");
        return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
                Functions.emptyConsumer(),
                Functions.emptyConsumer(),
                onError,
                Functions.EMPTY_ACTION,
                Functions.EMPTY_ACTION,
                Functions.emptyConsumer(),
                Functions.EMPTY_LONG_CONSUMER,
                Functions.EMPTY_ACTION
                ));
    }

    /**
     * Run the specified Action when a 'rail' completes.
     *
     * @param onComplete the callback
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final ParallelFlowable<T> doOnComplete(@NonNull Action onComplete) {
        ObjectHelper.requireNonNull(onComplete, "onComplete is null");
        return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
                Functions.emptyConsumer(),
                Functions.emptyConsumer(),
                Functions.emptyConsumer(),
                onComplete,
                Functions.EMPTY_ACTION,
                Functions.emptyConsumer(),
                Functions.EMPTY_LONG_CONSUMER,
                Functions.EMPTY_ACTION
                ));
    }

    /**
     * Run the specified Action when a 'rail' completes or signals an error.
     *
     * @param onAfterTerminate the callback
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final ParallelFlowable<T> doAfterTerminated(@NonNull Action onAfterTerminate) {
        ObjectHelper.requireNonNull(onAfterTerminate, "onAfterTerminate is null");
        return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
                Functions.emptyConsumer(),
                Functions.emptyConsumer(),
                Functions.emptyConsumer(),
                Functions.EMPTY_ACTION,
                onAfterTerminate,
                Functions.emptyConsumer(),
                Functions.EMPTY_LONG_CONSUMER,
                Functions.EMPTY_ACTION
                ));
    }

    /**
     * Call the specified callback when a 'rail' receives a Subscription from its upstream.
     *
     * @param onSubscribe the callback
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final ParallelFlowable<T> doOnSubscribe(@NonNull Consumer<? super Subscription> onSubscribe) {
        ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
        return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
                Functions.emptyConsumer(),
                Functions.emptyConsumer(),
                Functions.emptyConsumer(),
                Functions.EMPTY_ACTION,
                Functions.EMPTY_ACTION,
                onSubscribe,
                Functions.EMPTY_LONG_CONSUMER,
                Functions.EMPTY_ACTION
                ));
    }

    /**
     * Call the specified consumer with the request amount if any rail receives a request.
     *
     * @param onRequest the callback
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final ParallelFlowable<T> doOnRequest(@NonNull LongConsumer onRequest) {
        ObjectHelper.requireNonNull(onRequest, "onRequest is null");
        return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
                Functions.emptyConsumer(),
                Functions.emptyConsumer(),
                Functions.emptyConsumer(),
                Functions.EMPTY_ACTION,
                Functions.EMPTY_ACTION,
                Functions.emptyConsumer(),
                onRequest,
                Functions.EMPTY_ACTION
                ));
    }

    /**
     * Run the specified Action when a 'rail' receives a cancellation.
     *
     * @param onCancel the callback
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final ParallelFlowable<T> doOnCancel(@NonNull Action onCancel) {
        ObjectHelper.requireNonNull(onCancel, "onCancel is null");
        return RxJavaPlugins.onAssembly(new ParallelPeek<T>(this,
                Functions.emptyConsumer(),
                Functions.emptyConsumer(),
                Functions.emptyConsumer(),
                Functions.EMPTY_ACTION,
                Functions.EMPTY_ACTION,
                Functions.emptyConsumer(),
                Functions.EMPTY_LONG_CONSUMER,
                onCancel
                ));
    }

    /**
     * Collect the elements in each rail into a collection supplied via a collectionSupplier
     * and collected into with a collector action, emitting the collection at the end.
     *
     * @param <C> the collection type
     * @param collectionSupplier the supplier of the collection in each rail
     * @param collector the collector, taking the per-rail collection and the current item
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final <C> ParallelFlowable<C> collect(@NonNull Callable<? extends C> collectionSupplier, @NonNull BiConsumer<? super C, ? super T> collector) {
        ObjectHelper.requireNonNull(collectionSupplier, "collectionSupplier is null");
        ObjectHelper.requireNonNull(collector, "collector is null");
        return RxJavaPlugins.onAssembly(new ParallelCollect<T, C>(this, collectionSupplier, collector));
    }

    /**
     * Wraps multiple Publishers into a ParallelFlowable which runs them
     * in parallel and unordered.
     *
     * @param <T> the value type
     * @param publishers the array of publishers
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public static <T> ParallelFlowable<T> fromArray(@NonNull Publisher<T>... publishers) {
        if (publishers.length == 0) {
            throw new IllegalArgumentException("Zero publishers not supported");
        }
        return RxJavaPlugins.onAssembly(new ParallelFromArray<T>(publishers));
    }

    /**
     * Perform a fluent transformation to a value via a converter function which
     * receives this ParallelFlowable.
     *
     * @param <U> the output value type
     * @param converter the converter function from ParallelFlowable to some type
     * @return the value returned by the converter function
     */
    @CheckReturnValue
    @NonNull
    public final <U> U to(@NonNull Function<? super ParallelFlowable<T>, U> converter) {
        try {
            return ObjectHelper.requireNonNull(converter, "converter is null").apply(this);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            throw ExceptionHelper.wrapOrThrow(ex);
        }
    }

    /**
     * Allows composing operators, in assembly time, on top of this ParallelFlowable
     * and returns another ParallelFlowable with composed features.
     *
     * @param <U> the output value type
     * @param composer the composer function from ParallelFlowable (this) to another ParallelFlowable
     * @return the ParallelFlowable returned by the function
     */
    @CheckReturnValue
    @NonNull
    public final <U> ParallelFlowable<U> compose(@NonNull ParallelTransformer<T, U> composer) {
        return RxJavaPlugins.onAssembly(ObjectHelper.requireNonNull(composer, "composer is null").apply(this));
    }

    /**
     * Generates and flattens Publishers on each 'rail'.
     * <p>
     * Errors are not delayed and uses unbounded concurrency along with default inner prefetch.
     *
     * @param <R> the result type
     * @param mapper the function to map each rail's value into a Publisher
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final <R> ParallelFlowable<R> flatMap(@NonNull Function<? super T, ? extends Publisher<? extends R>> mapper) {
        return flatMap(mapper, false, Integer.MAX_VALUE, Flowable.bufferSize());
    }

    /**
     * Generates and flattens Publishers on each 'rail', optionally delaying errors.
     * <p>
     * It uses unbounded concurrency along with default inner prefetch.
     *
     * @param <R> the result type
     * @param mapper the function to map each rail's value into a Publisher
     * @param delayError should the errors from the main and the inner sources delayed till everybody terminates?
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final <R> ParallelFlowable<R> flatMap(
            @NonNull Function<? super T, ? extends Publisher<? extends R>> mapper, boolean delayError) {
        return flatMap(mapper, delayError, Integer.MAX_VALUE, Flowable.bufferSize());
    }

    /**
     * Generates and flattens Publishers on each 'rail', optionally delaying errors
     * and having a total number of simultaneous subscriptions to the inner Publishers.
     * <p>
     * It uses a default inner prefetch.
     *
     * @param <R> the result type
     * @param mapper the function to map each rail's value into a Publisher
     * @param delayError should the errors from the main and the inner sources delayed till everybody terminates?
     * @param maxConcurrency the maximum number of simultaneous subscriptions to the generated inner Publishers
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final <R> ParallelFlowable<R> flatMap(
            @NonNull Function<? super T, ? extends Publisher<? extends R>> mapper, boolean delayError, int maxConcurrency) {
        return flatMap(mapper, delayError, maxConcurrency, Flowable.bufferSize());
    }

    /**
     * Generates and flattens Publishers on each 'rail', optionally delaying errors,
     * having a total number of simultaneous subscriptions to the inner Publishers
     * and using the given prefetch amount for the inner Publishers.
     *
     * @param <R> the result type
     * @param mapper the function to map each rail's value into a Publisher
     * @param delayError should the errors from the main and the inner sources delayed till everybody terminates?
     * @param maxConcurrency the maximum number of simultaneous subscriptions to the generated inner Publishers
     * @param prefetch the number of items to prefetch from each inner Publisher
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final <R> ParallelFlowable<R> flatMap(
            @NonNull Function<? super T, ? extends Publisher<? extends R>> mapper,
            boolean delayError, int maxConcurrency, int prefetch) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        ObjectHelper.verifyPositive(maxConcurrency, "maxConcurrency");
        ObjectHelper.verifyPositive(prefetch, "prefetch");
        return RxJavaPlugins.onAssembly(new ParallelFlatMap<T, R>(this, mapper, delayError, maxConcurrency, prefetch));
    }

    /**
     * Generates and concatenates Publishers on each 'rail', signalling errors immediately
     * and generating 2 publishers upfront.
     *
     * @param <R> the result type
     * @param mapper the function to map each rail's value into a Publisher
     * source and the inner Publishers (immediate, boundary, end)
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final <R> ParallelFlowable<R> concatMap(
            @NonNull Function<? super T, ? extends Publisher<? extends R>> mapper) {
        return concatMap(mapper, 2);
    }

    /**
     * Generates and concatenates Publishers on each 'rail', signalling errors immediately
     * and using the given prefetch amount for generating Publishers upfront.
     *
     * @param <R> the result type
     * @param mapper the function to map each rail's value into a Publisher
     * @param prefetch the number of items to prefetch from each inner Publisher
     * source and the inner Publishers (immediate, boundary, end)
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final <R> ParallelFlowable<R> concatMap(
            @NonNull Function<? super T, ? extends Publisher<? extends R>> mapper,
                    int prefetch) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        ObjectHelper.verifyPositive(prefetch, "prefetch");
        return RxJavaPlugins.onAssembly(new ParallelConcatMap<T, R>(this, mapper, prefetch, ErrorMode.IMMEDIATE));
    }

    /**
     * Generates and concatenates Publishers on each 'rail', optionally delaying errors
     * and generating 2 publishers upfront.
     *
     * @param <R> the result type
     * @param mapper the function to map each rail's value into a Publisher
     * @param tillTheEnd if true all errors from the upstream and inner Publishers are delayed
     * till all of them terminate, if false, the error is emitted when an inner Publisher terminates.
     * source and the inner Publishers (immediate, boundary, end)
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final <R> ParallelFlowable<R> concatMapDelayError(
            @NonNull Function<? super T, ? extends Publisher<? extends R>> mapper,
                    boolean tillTheEnd) {
        return concatMapDelayError(mapper, 2, tillTheEnd);
    }

    /**
     * Generates and concatenates Publishers on each 'rail', optionally delaying errors
     * and using the given prefetch amount for generating Publishers upfront.
     *
     * @param <R> the result type
     * @param mapper the function to map each rail's value into a Publisher
     * @param prefetch the number of items to prefetch from each inner Publisher
     * @param tillTheEnd if true all errors from the upstream and inner Publishers are delayed
     * till all of them terminate, if false, the error is emitted when an inner Publisher terminates.
     * @return the new ParallelFlowable instance
     */
    @CheckReturnValue
    @NonNull
    public final <R> ParallelFlowable<R> concatMapDelayError(
            @NonNull Function<? super T, ? extends Publisher<? extends R>> mapper,
                    int prefetch, boolean tillTheEnd) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        ObjectHelper.verifyPositive(prefetch, "prefetch");
        return RxJavaPlugins.onAssembly(new ParallelConcatMap<T, R>(
                this, mapper, prefetch, tillTheEnd ? ErrorMode.END : ErrorMode.BOUNDARY));
    }
}