public final class


extends Subject<T>
 * Copyright (c) 2016-present, RxJava Contributors.
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
 * compliance with the License. You may obtain a copy of the License at
 * Unless required by applicable law or agreed to in writing, software distributed under the License is
 * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
 * the License for the specific language governing permissions and limitations under the License.

package io.reactivex.subjects;

import io.reactivex.annotations.Experimental;
import io.reactivex.annotations.Nullable;
import io.reactivex.plugins.RxJavaPlugins;
import java.util.concurrent.atomic.*;

import io.reactivex.Observer;
import io.reactivex.annotations.CheckReturnValue;
import io.reactivex.disposables.Disposable;
import io.reactivex.internal.disposables.EmptyDisposable;
import io.reactivex.internal.functions.ObjectHelper;
import io.reactivex.internal.fuseable.SimpleQueue;
import io.reactivex.internal.observers.BasicIntQueueDisposable;
import io.reactivex.internal.queue.SpscLinkedArrayQueue;

 * Subject that allows only a single Subscriber to subscribe to it during its lifetime.
 * <p>This subject buffers notifications and replays them to the Subscriber as requested.
 * <p>This subject holds an unbounded internal buffer.
 * <p>If more than one Subscriber attempts to subscribe to this Subject, they
 * will receive an IllegalStateException if this Subject hasn't terminated yet,
 * or the Subscribers receive the terminal event (error or completion) if this
 * Subject has terminated.
 * <p>
 * <img width="640" height="370" src="" alt="">
 * @param <T> the value type received and emitted by this Subject subclass
 * @since 2.0
public final class UnicastSubject<T> extends Subject<T> {
    /** The queue that buffers the source events. */
    final SpscLinkedArrayQueue<T> queue;

    /** The single Observer. */
    final AtomicReference<Observer<? super T>> actual;

    /** The optional callback when the Subject gets cancelled or terminates. */
    final AtomicReference<Runnable> onTerminate;

    /** deliver onNext events before error event. */
    final boolean delayError;

    /** Indicates the single observer has cancelled. */
    volatile boolean disposed;

    /** Indicates the source has terminated. */
    volatile boolean done;
     * The terminal error if not null.
     * Must be set before writing to done and read after done == true.
    Throwable error;

    /** Set to 1 atomically for the first and only Subscriber. */
    final AtomicBoolean once;

    /** The wip counter and QueueDisposable surface. */
    final BasicIntQueueDisposable<T> wip;

    boolean enableOperatorFusion;

     * Creates an UnicastSubject with an internal buffer capacity hint 16.
     * @param <T> the value type
     * @return an UnicastSubject instance
    public static <T> UnicastSubject<T> create() {
        return new UnicastSubject<T>(bufferSize(), true);

     * Creates an UnicastSubject with the given internal buffer capacity hint.
     * @param <T> the value type
     * @param capacityHint the hint to size the internal unbounded buffer
     * @return an UnicastSubject instance
    public static <T> UnicastSubject<T> create(int capacityHint) {
        return new UnicastSubject<T>(capacityHint, true);

     * Creates an UnicastSubject with the given internal buffer capacity hint and a callback for
     * the case when the single Subscriber cancels its subscription.
     * <p>The callback, if not null, is called exactly once and
     * non-overlapped with any active replay.
     * @param <T> the value type
     * @param capacityHint the hint to size the internal unbounded buffer
     * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed
     * @return an UnicastSubject instance
    public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminate) {
        return new UnicastSubject<T>(capacityHint, onTerminate, true);

     * Creates an UnicastSubject with the given internal buffer capacity hint, delay error flag and
     * a callback for the case when the single Subscriber cancels its subscription.
     * <p>The callback, if not null, is called exactly once and
     * non-overlapped with any active replay.
     * @param <T> the value type
     * @param capacityHint the hint to size the internal unbounded buffer
     * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed
     * @param delayError deliver pending onNext events before onError
     * @return an UnicastSubject instance
     * @since 2.0.8 - experimental
    public static <T> UnicastSubject<T> create(int capacityHint, Runnable onTerminate, boolean delayError) {
        return new UnicastSubject<T>(capacityHint, onTerminate, delayError);

     * Creates an UnicastSubject with an internal buffer capacity hint 16 and given delay error flag.
     * <p>The callback, if not null, is called exactly once and
     * non-overlapped with any active replay.
     * @param <T> the value type
     * @param delayError deliver pending onNext events before onError
     * @return an UnicastSubject instance
     * @since 2.0.8 - experimental
    public static <T> UnicastSubject<T> create(boolean delayError) {
        return new UnicastSubject<T>(bufferSize(), delayError);

     * Creates an UnicastSubject with the given capacity hint and delay error flag.
     * @param capacityHint the capacity hint for the internal, unbounded queue
     * @param delayError deliver pending onNext events before onError
     * @since 2.0.8 - experimental
    UnicastSubject(int capacityHint, boolean delayError) {
        this.queue = new SpscLinkedArrayQueue<T>(ObjectHelper.verifyPositive(capacityHint, "capacityHint"));
        this.onTerminate = new AtomicReference<Runnable>();
        this.delayError = delayError;
        this.actual = new AtomicReference<Observer<? super T>>();
        this.once = new AtomicBoolean();
        this.wip = new UnicastQueueDisposable();

     * Creates an UnicastSubject with the given capacity hint and callback
     * for when the Subject is terminated normally or its single Subscriber cancels.
     * @param capacityHint the capacity hint for the internal, unbounded queue
     * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed
     * @since 2.0
     * */
    UnicastSubject(int capacityHint, Runnable onTerminate) {
        this(capacityHint, onTerminate, true);

     * Creates an UnicastSubject with the given capacity hint, delay error flag and callback
     * for when the Subject is terminated normally or its single Subscriber cancels.
     * @param capacityHint the capacity hint for the internal, unbounded queue
     * @param onTerminate the callback to run when the Subject is terminated or cancelled, null not allowed
     * @param delayError deliver pending onNext events before onError
     * @since 2.0.8 - experimental
    UnicastSubject(int capacityHint, Runnable onTerminate, boolean delayError) {
        this.queue = new SpscLinkedArrayQueue<T>(ObjectHelper.verifyPositive(capacityHint, "capacityHint"));
        this.onTerminate = new AtomicReference<Runnable>(ObjectHelper.requireNonNull(onTerminate, "onTerminate"));
        this.delayError = delayError;
        this.actual = new AtomicReference<Observer<? super T>>();
        this.once = new AtomicBoolean();
        this.wip = new UnicastQueueDisposable();

    protected void subscribeActual(Observer<? super T> observer) {
        if (!once.get() && once.compareAndSet(false, true)) {
            actual.lazySet(observer); // full barrier in drain
            if (disposed) {
        } else {
            EmptyDisposable.error(new IllegalStateException("Only a single observer allowed."), observer);

    void doTerminate() {
        Runnable r = onTerminate.get();
        if (r != null && onTerminate.compareAndSet(r, null)) {

    public void onSubscribe(Disposable s) {
        if (done || disposed) {

    public void onNext(T t) {
        if (done || disposed) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));

    public void onError(Throwable t) {
        if (done || disposed) {
        if (t == null) {
            t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
        error = t;
        done = true;



    public void onComplete() {
        if (done || disposed) {
        done = true;



    void drainNormal(Observer<? super T> a) {
        int missed = 1;
        SimpleQueue<T> q = queue;
        boolean failFast = !this.delayError;
        boolean canBeError = true;
        for (;;) {
            for (;;) {

                if (disposed) {

                boolean d = this.done;
                T v = queue.poll();
                boolean empty = v == null;

                if (d) {
                    if (failFast && canBeError) {
                        if (failedFast(q, a)) {
                        } else {
                            canBeError = false;

                    if (empty) {

                if (empty) {


            missed = wip.addAndGet(-missed);
            if (missed == 0) {

    void drainFused(Observer<? super T> a) {
        int missed = 1;

        final SpscLinkedArrayQueue<T> q = queue;
        final boolean failFast = !delayError;

        for (;;) {

            if (disposed) {
            boolean d = done;

            if (failFast && d) {
                if (failedFast(q, a)) {


            if (d) {

            missed = wip.addAndGet(-missed);
            if (missed == 0) {

    void errorOrComplete(Observer<? super T> a) {
        Throwable ex = error;
        if (ex != null) {
        } else {

    boolean failedFast(final SimpleQueue<T> q, Observer<? super T> a) {
        Throwable ex = error;
        if (ex != null) {
            return true;
        } else {
            return false;

    void drain() {
        if (wip.getAndIncrement() != 0) {

        Observer<? super T> a = actual.get();
        int missed = 1;

        for (;;) {

            if (a != null) {
                if (enableOperatorFusion) {
                } else {

            missed = wip.addAndGet(-missed);
            if (missed == 0) {

            a = actual.get();

    public boolean hasObservers() {
        return actual.get() != null;

    public Throwable getThrowable() {
        if (done) {
            return error;
        return null;

    public boolean hasThrowable() {
        return done && error != null;

    public boolean hasComplete() {
        return done && error == null;

    final class UnicastQueueDisposable extends BasicIntQueueDisposable<T> {

        private static final long serialVersionUID = 7926949470189395511L;

        public int requestFusion(int mode) {
            if ((mode & ASYNC) != 0) {
                enableOperatorFusion = true;
                return ASYNC;
            return NONE;

        public T poll() throws Exception {
            return queue.poll();

        public boolean isEmpty() {
            return queue.isEmpty();

        public void clear() {

        public void dispose() {
            if (!disposed) {
                disposed = true;


                if (wip.getAndIncrement() == 0) {

        public boolean isDisposed() {
            return disposed;
