java.lang.Object | ||
↳ | com.rabbitmq.client.DefaultConsumer | |
↳ | com.rabbitmq.client.QueueingConsumer |
Convenience class: an implementation of Consumer
with
straightforward blocking semantics.
The general pattern for using QueueingConsumer is as follows:
// Create connection and channel.ConnectionFactory
factory = new ConnectionFactory(); Connection conn = factory.newConnection();Channel
ch1 = conn.createChannel(); // Declare a queue and bind it to an exchange. String queueName = ch1.queueDeclare().AMQP.Queue.DeclareOk#getQueue getQueue(); ch1.queueBind
(queueName, exchangeName, queueName); // Create the QueueingConsumer and have it consume from the queue QueueingConsumer consumer = newQueueingConsumer
(ch1); ch1.basicConsume
(queueName, false, consumer); // Process deliveries while (/* some condition * /) {QueueingConsumer.Delivery
delivery = consumer.nextDelivery
(); // process delivery ch1.basicAck
(delivery.getEnvelope
().getDeliveryTag
(), false); }
For a more complete example, see LogTail in the test/src/com/rabbitmq/examples directory of the source distribution.
deprecatedQueueingConsumer
was introduced to allow
applications to overcome a limitation in the way Connection
managed threads and consumer dispatching. When QueueingConsumer
was introduced, callbacks to Consumers
were made on the
Connection's
thread. This had two main drawbacks. Firstly, the
Consumer
could stall the processing of all
Channels
on the Connection
. Secondly, if a
Consumer
made a recursive synchronous call into its
Channel
the client would deadlock.
QueueingConsumer
provided client code with an easy way to
obviate this problem by queueing incoming messages and processing them on
a separate, application-managed thread.
The threading behaviour of Connection
and Channel
has been changed so that each Channel
uses a distinct thread
for dispatching to Consumers
. This prevents
Consumers
on one Channel
holding up
Consumers
on another and it also prevents recursive calls from
deadlocking the client.
As such, it is now safe to implement Consumer
directly or
to extend DefaultConsumer
.
Nested Classes | |||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
QueueingConsumer.Delivery | Encapsulates an arbitrary message - simple "bean" holder structure. |
Public Constructors | |||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
Public Methods | |||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
No-op implementation of
handleCancel(String) | |||||||||||
No-op implementation of
handleDelivery(String, Envelope, AMQP.BasicProperties, byte[]) . | |||||||||||
No-op implementation of
handleShutdownSignal(String, ShutdownSignalException) . | |||||||||||
Main application-side API: wait for the next message delivery and return it.
| |||||||||||
Main application-side API: wait for the next message delivery and return it.
|
[Expand]
Inherited Methods | |||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
From class
com.rabbitmq.client.DefaultConsumer
| |||||||||||
From class
java.lang.Object
| |||||||||||
From interface
com.rabbitmq.client.Consumer
|
No-op implementation of handleCancel(String)
consumerTag | the defined consumer tag (client- or server-generated) |
---|
IOException |
---|
No-op implementation of handleDelivery(String, Envelope, AMQP.BasicProperties, byte[])
.
consumerTag | the consumer tag associated with the consumer |
---|---|
envelope | packaging data for the message |
properties | content header data for the message |
body | the message body (opaque, client-specific byte array) |
IOException |
---|
No-op implementation of handleShutdownSignal(String, ShutdownSignalException)
.
consumerTag | the consumer tag associated with the consumer |
---|---|
sig | a ShutdownSignalException indicating the reason for the shut down
|
Main application-side API: wait for the next message delivery and return it.
InterruptedException | if an interrupt is received while waiting |
---|---|
ShutdownSignalException | if the connection is shut down while waiting |
ConsumerCancelledException |
Main application-side API: wait for the next message delivery and return it.
timeout | timeout in millisecond |
---|
InterruptedException | if an interrupt is received while waiting |
---|---|
ShutdownSignalException | if the connection is shut down while waiting |
ConsumerCancelledException |