In my September 2009 newsletter article I showed a simple example of sending a message using the C++ API to Apache Qpid. Since I intend to write a number of how-to Qpid articles, most of them will appear here in my blog. (You can still subscribe to my newsletter… there are other interesting things there 🙂 If you’re not too familiar with AMQP terminology it may help to review the newsletter article because it defines the basic architectural pieces in AMQP.
This article shows a simple, single-threaded way to receive a message from a queue. The previous example showed how to send a message to the amq.direct exchange with routing key “my_key”. Thus, if we have a client that creates a queue and binds the amq.direct exchange to it using the routing key “my_key” the message will arrive on the queue.
To start, we need the header files:
#include <qpid/client/Connection.h> #include <qpid/client/Session.h> #include <qpid/client/Message.h>
Just as with the message-sending example, we need a connection to the broker and a session to use. We’ll assume there’s a broker on the local system and that it’s listening on TCP port 5672 (the default for Qpid). The new things in this example are:
- Declare a queue (my_queue) that will receive the messages this program is interested in.
- Bind the new queue to the amq.direct exchange using the routing key that the message sender used (my_key).
- Use a SubscriptionManager and LocalQueue to manage receiving messages from the new queue. There are a number of ways to receive messages, but this one is simple and needs only the single, main thread.
qpid::client::Connection connection; try { connection.open("localhost", 5672); qpid::client::Session session = connection.newSession(); session.queueDeclare(arg::queue="my_queue"); session.exchangeBind(arg::exchange="amq.direct", arg::queue="my_queue", arg::bindingKey="my_key"); SubscriptionManager subscriptions(session); LocalQueue local_queue; subscriptions.subscribe(local_queue, string("my_queue")); Message message; local_queue.get(message, 10000); std::cout << message.getData() << std::endl; connection.close(); return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; return 1; }
It’s pretty easy. Again, this is a simple case and there are many options and alternatives. One particular thing to note in the above example is that the Message::getData() method returns a reference to a std::string. It is easy to assume that this means the message is text. This would be a bad assumption in general. A std::string can contain any set of bytes, text or not. What those bytes are is up to the application designers.