public class

QueueingConsumer

extends DefaultConsumer
java.lang.Object
   ↳ com.rabbitmq.client.DefaultConsumer
     ↳ com.rabbitmq.client.QueueingConsumer

Class Overview

Convenience class: an implementation of Consumer with straightforward blocking semantics. The general pattern for using QueueingConsumer is as follows:

 // Create connection and channel.
 ConnectionFactory factory = new ConnectionFactory();
 Connection conn = factory.newConnection();
 Channel ch1 = conn.createChannel();

 // Declare a queue and bind it to an exchange.
 String queueName = ch1.queueDeclare().AMQP.Queue.DeclareOk#getQueue getQueue();
 ch1.queueBind(queueName, exchangeName, queueName);

 // Create the QueueingConsumer and have it consume from the queue
 QueueingConsumer consumer = new QueueingConsumer(ch1);
 ch1.basicConsume(queueName, false, consumer);

 // Process deliveries
 while (/* some condition * /) {
     QueueingConsumer.Delivery delivery = consumer.nextDelivery();
     // process delivery
     ch1.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
 }
 

For a more complete example, see LogTail in the test/src/com/rabbitmq/examples directory of the source distribution.

deprecated QueueingConsumer was introduced to allow applications to overcome a limitation in the way Connection managed threads and consumer dispatching. When QueueingConsumer was introduced, callbacks to Consumers were made on the Connection's thread. This had two main drawbacks. Firstly, the Consumer could stall the processing of all Channels on the Connection. Secondly, if a Consumer made a recursive synchronous call into its Channel the client would deadlock.

QueueingConsumer provided client code with an easy way to obviate this problem by queueing incoming messages and processing them on a separate, application-managed thread.

The threading behaviour of Connection and Channel has been changed so that each Channel uses a distinct thread for dispatching to Consumers. This prevents Consumers on one Channel holding up Consumers on another and it also prevents recursive calls from deadlocking the client.

As such, it is now safe to implement Consumer directly or to extend DefaultConsumer.

Summary

Nested Classes
class QueueingConsumer.Delivery Encapsulates an arbitrary message - simple "bean" holder structure. 
Public Constructors
QueueingConsumer(Channel ch)
QueueingConsumer(Channel ch, BlockingQueue<QueueingConsumer.Delivery> q)
Public Methods
void handleCancel(String consumerTag)
No-op implementation of handleCancel(String)
void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
void handleShutdownSignal(String consumerTag, ShutdownSignalException sig)
QueueingConsumer.Delivery nextDelivery()
Main application-side API: wait for the next message delivery and return it.
QueueingConsumer.Delivery nextDelivery(long timeout)
Main application-side API: wait for the next message delivery and return it.
[Expand]
Inherited Methods
From class com.rabbitmq.client.DefaultConsumer
From class java.lang.Object
From interface com.rabbitmq.client.Consumer

Public Constructors

public QueueingConsumer (Channel ch)

public QueueingConsumer (Channel ch, BlockingQueue<QueueingConsumer.Delivery> q)

Public Methods

public void handleCancel (String consumerTag)

No-op implementation of handleCancel(String)

Parameters
consumerTag the defined consumer tag (client- or server-generated)
Throws
IOException

public void handleDelivery (String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)

Parameters
consumerTag the consumer tag associated with the consumer
envelope packaging data for the message
properties content header data for the message
body the message body (opaque, client-specific byte array)
Throws
IOException

public void handleShutdownSignal (String consumerTag, ShutdownSignalException sig)

Parameters
consumerTag the consumer tag associated with the consumer
sig a ShutdownSignalException indicating the reason for the shut down

public QueueingConsumer.Delivery nextDelivery ()

Main application-side API: wait for the next message delivery and return it.

Returns
  • the next message
Throws
InterruptedException if an interrupt is received while waiting
ShutdownSignalException if the connection is shut down while waiting
ConsumerCancelledException

public QueueingConsumer.Delivery nextDelivery (long timeout)

Main application-side API: wait for the next message delivery and return it.

Parameters
timeout timeout in millisecond
Returns
  • the next message or null if timed out
Throws
InterruptedException if an interrupt is received while waiting
ShutdownSignalException if the connection is shut down while waiting
ConsumerCancelledException