public class

RpcServer

extends Object
//  The contents of this file are subject to the Mozilla Public License
//  Version 1.1 (the "License"); you may not use this file except in
//  compliance with the License. You may obtain a copy of the License
//  at http://www.mozilla.org/MPL/
//
//  Software distributed under the License is distributed on an "AS IS"
//  basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
//  the License for the specific language governing rights and
//  limitations under the License.
//
//  The Original Code is RabbitMQ.
//
//  The Initial Developer of the Original Code is VMware, Inc.
//  Copyright (c) 2007-2011 VMware, Inc.  All rights reserved.
//


package com.rabbitmq.client;

import java.io.IOException;

/**
 * Class which manages a request queue for a simple RPC-style service.
 * The class is agnostic about the format of RPC arguments / return values.
*/
public class RpcServer {
    /** Channel we are communicating on */
    private final Channel _channel;
    /** Queue to receive requests from */
    private final String _queueName;
    /** Boolean controlling the exit from the mainloop. */
    private boolean _mainloopRunning = true;

    /** Consumer attached to our request queue */
    private QueueingConsumer _consumer;

    /**
     * Creates an RpcServer listening on a temporary exclusive
     * autodelete queue.
     */
    public RpcServer(Channel channel)
        throws IOException
    {
        this(channel, null);
    }

    /**
     * If the passed-in queue name is null, creates a server-named
     * temporary exclusive autodelete queue to use; otherwise expects
     * the queue to have already been declared.
     */
    public RpcServer(Channel channel, String queueName)
        throws IOException
    {
        _channel = channel;
        if (queueName == null || queueName.equals("")) {
            _queueName = _channel.queueDeclare().getQueue();
        } else {
            _queueName = queueName;
        }
        _consumer = setupConsumer();
    }

    /**
     * Public API - cancels the consumer, thus deleting the queue, if
     * it was a temporary queue, and marks the RpcServer as closed.
     * @throws IOException if an error is encountered
     */
    public void close()
        throws IOException
    {
        if (_consumer != null) {
            _channel.basicCancel(_consumer.getConsumerTag());
            _consumer = null;
        }
        terminateMainloop();
    }

    /**
     * Registers a consumer on the reply queue.
     * @throws IOException if an error is encountered
     * @return the newly created and registered consumer
     */
    protected QueueingConsumer setupConsumer()
        throws IOException
    {
        QueueingConsumer consumer = new QueueingConsumer(_channel);
        _channel.basicConsume(_queueName, consumer);
        return consumer;
    }

    /**
     * Public API - main server loop. Call this to begin processing
     * requests. Request processing will continue until the Channel
     * (or its underlying Connection) is shut down, or until
     * terminateMainloop() is called.
     *
     * Note that if the mainloop is blocked waiting for a request, the
     * termination flag is not checked until a request is received, so
     * a good time to call terminateMainloop() is during a request
     * handler.
     *
     * @return the exception that signalled the Channel shutdown, or null for orderly shutdown
     */
    public ShutdownSignalException mainloop()
        throws IOException
    {
        try {
            while (_mainloopRunning) {
                QueueingConsumer.Delivery request;
                try {
                    request = _consumer.nextDelivery();
                } catch (InterruptedException ie) {
                    continue;
                }
                processRequest(request);
                _channel.basicAck(request.getEnvelope().getDeliveryTag(), false);
            }
            return null;
        } catch (ShutdownSignalException sse) {
            return sse;
        }
    }

    /**
     * Call this method to terminate the mainloop.
     *
     * Note that if the mainloop is blocked waiting for a request, the
     * termination flag is not checked until a request is received, so
     * a good time to call terminateMainloop() is during a request
     * handler.
     */
    public void terminateMainloop() {
        _mainloopRunning = false;
    }

    /**
     * Private API - Process a single request. Called from mainloop().
     */
    public void processRequest(QueueingConsumer.Delivery request)
        throws IOException
    {
        AMQP.BasicProperties requestProperties = request.getProperties();
        String correlationId = requestProperties.getCorrelationId();
        String replyTo = requestProperties.getReplyTo();
        if (correlationId != null && replyTo != null)
        {
            AMQP.BasicProperties replyProperties
                = new AMQP.BasicProperties.Builder().correlationId(correlationId).build();
            byte[] replyBody = handleCall(request, replyProperties);
            _channel.basicPublish("", replyTo, replyProperties, replyBody);
        } else {
            handleCast(request);
        }
    }

    /**
     * Lowest-level response method. Calls
     * handleCall(AMQP.BasicProperties,byte[],AMQP.BasicProperties).
     */
    public byte[] handleCall(QueueingConsumer.Delivery request,
                             AMQP.BasicProperties replyProperties)
    {
        return handleCall(request.getProperties(),
                          request.getBody(),
                          replyProperties);
    }

    /**
     * Mid-level response method. Calls
     * handleCall(byte[],AMQP.BasicProperties).
     */
    public byte[] handleCall(AMQP.BasicProperties requestProperties,
                             byte[] requestBody,
                             AMQP.BasicProperties replyProperties)
    {
        return handleCall(requestBody, replyProperties);
    }

    /**
     * High-level response method. Returns an empty response by
     * default - override this (or other handleCall and handleCast
     * methods) in subclasses.
     */
    public byte[] handleCall(byte[] requestBody,
                             AMQP.BasicProperties replyProperties)
    {
        return new byte[0];
    }

    /**
     * Lowest-level handler method. Calls
     * handleCast(AMQP.BasicProperties,byte[]).
     */
    public void handleCast(QueueingConsumer.Delivery request)
    {
        handleCast(request.getProperties(), request.getBody());
    }

    /**
     * Mid-level handler method. Calls
     * handleCast(byte[]).
     */
    public void handleCast(AMQP.BasicProperties requestProperties, byte[] requestBody)
    {
        handleCast(requestBody);
    }

    /**
     * High-level handler method. Does nothing by default - override
     * this (or other handleCast and handleCast methods) in
     * subclasses.
     */
    public void handleCast(byte[] requestBody)
    {
        // Does nothing.
    }

    /**
     * Retrieve the channel.
     * @return the channel to which this server is connected
     */
    public Channel getChannel() {
        return _channel;
    }

    /**
     * Retrieve the queue name.
     * @return the queue which this server is consuming from
     */
    public String getQueueName() {
        return _queueName;
    }
}