Advanced Message Queuing Protocol (AMQP)
Listing 2. Server-Side Python Program
#!/usr/bin/python
import subprocess
import os
from qpid.util import connect, ssl
from qpid.connection import Connection, sslwrap
from qpid.datatypes import Message, RangedSet, uuid4
from qpid.queue import Empty
from qpid.spec import load
from qpid.queue import Empty
from qpid.session import SessionException
# processRequest: this is what actually does the work.
def processRequest(requestMessage):
print "Servicing Request"
proc = subprocess.Popen('rpm -qa',
shell=True,
stdout=subprocess.PIPE,
)
stdout_value = proc.communicate()[0]
myPid = os.getpid()
ret_value = "From Server PID " \
+ str(myPid) + ":\n" + stdout_value \
+ "---------------------------------\n"
return ret_value
# First, load the correct specification file.
locSpec = load('/usr/local/share/qpid/specs/amqp.0-10.xml')
# Now, connect to the server.
socket = connect("localhost", 5672)
connection = Connection (sock=socket,
spec=locSpec,
username = "guest",
password = "guest")
connection.start()
session = connection.session(str(uuid4()))
# Declare the listening server queue and connect to server queue.
# Create server queue if it does not exist.
myPid = os.getpid()
serverQueueName = "serverListenQueue" + str(myPid)
localQueueName = "serverListenLocal_" + session.name
session.queue_declare(queue=serverQueueName,
exclusive=True)
session.exchange_bind(exchange="amq.topic",
queue=serverQueueName,
binding_key="SERVER_STATUS")
session.message_subscribe(queue=serverQueueName,
destination=localQueueName)
localQueue = session.incoming(localQueueName)
localQueue.start()
# Now, start an event loop.
while True:
try:
requestObj = localQueue.get(timeout=60)
session.message_accept(RangedSet(requestObj.id))
requestStr = requestObj.body
requestProperties = requestObj.get("message_properties")
replyTo = requestProperties.reply_to
if replyTo == None:
raise Exception("This message is missing " \
+ "the 'reply_to'" property, " \
+ "which is required")
responseMessage = processRequest(requestStr)
props = session.delivery_properties(
routing_key=replyTo["routing_key"])
session.message_transfer(destination=replyTo["exchange"],
message=Message(props,
responseMessage))
except Empty:
continue
In the event loop, the server first receives a request from the local queue. If there is no request within the timeout value (60 seconds), the get() method will raise an Empty exception. Because the server needs to serve requests continually, the program catches the Empty exception and simply continues. When a message arrives, the server runs the processRequest method and constructs data with the method's return values. The reply message takes exchange and routing key information from the original message's reply-to field and then is delivered to the exchange.
With AMQP, it is possible to construct a queuing system that allows a server farm to respond to multiple different kinds of requests. This example considers weather prediction models. Here, there are different server clusters, with one cluster serving each state. In such a case, it would be extremely handy to be able to send requests to each farm from an arbitrary location.
This example requires three processes. The first process (the client) delivers requests, and it is fundamentally the same as the client in the previous example. It is different only in that it loops over a list to deliver ten weather requests for Ohio and ten requests for Virginia. On the receiving end, there are two servers: one for Ohio and one for Virginia. Each server subscribes to the amq.topic exchange with the routing key #.ohio or #.virginia. Furthermore, each server has the ability to subscribe to existing server queues or create those that do not exist.
These routing keys contain wild cards. When the routing key contains a hash mark in place of text, the exchange will match any text where the hash mark resides. In this way, the weather predicting dæmons using #.ohio also would respond to requests for topic news.ohio and sports.ohio. Likewise, if a sports reporting dæmon had invaded the cluster and was listening for sports.#, the subscriptions for both the sports dæmon and the weather reporting dæmon for Ohio would match sports.ohio.
Listing 3 contains the client, and Listing 4 contains the server for Ohio. Create the server for Virginia by duplicating the server for Ohio and replacing all occurrences of Ohio with Virginia. (When you do so, make sure all routing keys have all lowercase characters.)
Realizing the promise of Apache® Hadoop® requires the effective deployment of compute, memory, storage and networking to achieve optimal results. With its flexibility and multitude of options, it is easy to over or under provision the server infrastructure, resulting in poor performance and high TCO. Join us for an in depth, technical discussion with industry experts from leading Hadoop and server companies who will provide insights into the key considerations for designing and deploying an optimal Hadoop cluster.
Sponsored by AMD
Built-in forensics, incident response, and security with Red Hat Enterprise Linux 6
Every security policy provides guidance and requirements for ensuring adequate protection of information and data, as well as high-level technical and administrative security requirements for a system in a given environment. Traditionally, providing security for a system focuses on the confidentiality of the information on it. However, protecting the data integrity and system and data availability is just as important. For example, when processing United States intelligence information, there are three attributes that require protection: confidentiality, integrity, and availability.
Learn more about catching the bad guy in this free white paper.
Sponsored by DLT Solutions
| Dynamic DNS—an Object Lesson in Problem Solving | May 21, 2013 |
| Using Salt Stack and Vagrant for Drupal Development | May 20, 2013 |
| Making Linux and Android Get Along (It's Not as Hard as It Sounds) | May 16, 2013 |
| Drupal Is a Framework: Why Everyone Needs to Understand This | May 15, 2013 |
| Home, My Backup Data Center | May 13, 2013 |
| Non-Linux FOSS: Seashore | May 10, 2013 |
- Dynamic DNS—an Object Lesson in Problem Solving
- Making Linux and Android Get Along (It's Not as Hard as It Sounds)
- Using Salt Stack and Vagrant for Drupal Development
- New Products
- RSS Feeds
- A Topic for Discussion - Open Source Feature-Richness?
- Validate an E-Mail Address with PHP, the Right Way
- Drupal Is a Framework: Why Everyone Needs to Understand This
- Readers' Choice Awards
- The Secret Password Is...
- All the articles you talked
1 hour 33 min ago - All the articles you talked
1 hour 36 min ago - All the articles you talked
1 hour 37 min ago - myip
6 hours 2 min ago - Keeping track of IP address
7 hours 53 min ago - Roll your own dynamic dns
13 hours 6 min ago - Please correct the URL for Salt Stack's web site
16 hours 18 min ago - Android is Linux -- why no better inter-operation
18 hours 33 min ago - Connecting Android device to desktop Linux via USB
19 hours 2 min ago - Find new cell phone and tablet pc
20 hours 16 sec ago






Comments
Article Graphic
Hello Everyone,
Please check out my blog update for the correct graphic:
http://www.globalherald.net/jb01/weblog/22.html
Thanks!
-JK