Advanced Message Queuing Protocol (AMQP)
First, a word about prerequisites. These examples were created on CentOS 5.2 with the standard development packages as well as Ruby. Also, note that certain versions of PyXML present conflicts that will break the tests run after installation.
To install the server, simply download the full M4 release from the URL noted in the Resources section of this article to your preferred development directory and un-archive the package. Once you have a directory structure, go to the server's directory by typing:
Initially, there is no configure script; create it by running the bootstrap command. Once bootstrap completes, do the standard configure, make and make install.
One step the installation process does not perform is installing the AMQP specification files. These specification files are contained in the specs directory under qpid-M4. Copy the files found there to /usr/local/share/qpid/specs.
After installation, it's a good idea to run tests to ensure that all prerequisites have been satisfied. Start a new shell, change directories to /usr/local and su to root. Then, run the Qpid dæmon with the command:
sbin/qpidd -t --auth no
Once the broker is running, return to the original shell. Move from the cpp directory to the python directory contained within qpid-M4. Run the Python tests using:
run-tests -s 0-10-errata -I cpp_failing_0-10.txt
If the tests run and return no errors, proceed to install the Python modules by running this command as root:
python setup.py install
This example demonstrates a simple application used to query server status. The server script runs rpm to query the packages stored on the system and returns the list, with its PID, to the client. The program generating the requests is the client, and the server is a dæmon running on a “remote server”. It has an event loop that waits for requests.
In this example, the scripts use a combination of two message-routing methods: publish-subscribe (pubsub) to deliver the requests to all listening servers and direct to route the replies directly to the calling client.
Listing 1 describes the client, which is fairly straightforward. First, the client reads the spec file and then creates the Qpid connection. The connection is made by creating a standard Python socket object and passing that object to the connection's constructor. The connection, in turn, provides a session object when the session() method is called.
Listing 1. Client-Side Python Program
#!/usr/bin/python from qpid.util import connect, ssl from qpid.connection import Connection, sslwrap from qpid.datatypes import Message, RangedSet, uuid4 from qpid.queue import Empty from qpid.spec import load # First, load the correct specification file. amqSpec = load('/usr/local/share/qpid/specs/amqp.0-10.xml') # Now, connect to the server. socket = connect("localhost", 5672) connection = Connection (sock=socket, spec=amqSpec, username = "guest", password = "guest") connection.start() session = connection.session(str(uuid4())) # Declare the reply queue: replyQueueName = "producerReply_" + session.name replyQueue = session.queue_declare(queue=replyQueueName, exclusive=True, auto_delete=True) session.exchange_bind(exchange="amq.direct", queue=replyQueueName, binding_key=replyQueueName) # Declare a local queue to which we subscribe the reply-to queue localQueueName = "producerLocalQueue_" + session.name localQueue = session.incoming(localQueueName) session.message_subscribe(queue=replyQueueName, destination=localQueueName) localQueue.start() # Now, create a message with a request. message_properties = session.message_properties() message_properties.reply_to = session.reply_to("amq.direct", replyQueueName) delivery_properties = session.delivery_properties(routing_key="SERVER_STATUS") requestMsgText = "RPM_STATUS" # Send the message and wait for a response. session.message_transfer(destination="amq.topic", message=Message(message_properties, delivery_properties, requestMsgText)) while True: try: message = localQueue.get(timeout=60) content = message.body session.message_accept(RangedSet(message.id)) print content except Empty: print "No more messages!" break
Next, the client creates the reply-to server queue. Note that the reply-to server queue name contains the session ID. This gives each client a unique server queue. The queue then is bound to the amq.direct exchange, which uses queue names as its routing keys. Using the queue name for the server queue and delivering replies to the amq-direct exchange ensures that multiple copies of the server receive only their own replies.
After the server queue is declared, the program creates a local queue and subscribes it to the server queue. Once the local queue is subscribed, the program is ready to transmit a message.
The client then creates the request message. Because the program is using publish-subscribe, the routing key is set to the topic. In this case, the topic is SERVER_STATUS. Any server that is subscribed to the topic SERVER_STATUS will receive this particular message. The client also supplies the exchange type and the routing key for the reply-to fields. For this message, it is the amq-direct exchange and the name of the server queue that was created previously.
Finally, the client creates the message itself (the text “RPM_STATUS”) and delivers it to the exchange. After the message is delivered, the client waits for a reply and prints the contents of the reply to the screen.
Listing 2 defines the server. This application will listen for messages with the topic SERVER_STATUS, run rpm to query the package contents of the system and send a reply. The first steps are similar to Listing 1 in that the server starts a connection and uses the connection to get a session and create a server queue. The server then subscribes the local queue, starts the queue, and the program is ready to respond to requests.