Advanced Message Queuing Protocol (AMQP)
Listing 3. Multiserver Weather Client
#!/usr/bin/python
from qpid.util import connect, ssl
from qpid.connection import Connection, sslwrap
from qpid.datatypes import Message
from qpid.datatypes import RangedSet
from qpid.datatypes import uuid4
from qpid.queue import Empty
from qpid.spec import load
# First, load the correct specification file.
amqSpec = load('/usr/local/share/qpid/specs/amqp.0-10.xml')
# Now, connect to the server.
socket = connect("localhost", 5672)
connection = Connection (sock=socket,
spec=amqSpec,
username = "guest",
password = "guest")
connection.start()
session = connection.session(str(uuid4()))
# Declare the reply queue:
replyQueueName = "weatherReply_" + session.name
replyQueue = session.queue_declare(queue=replyQueueName,
exclusive=True,
auto_delete=True)
session.exchange_bind(exchange="amq.direct",
queue=replyQueueName,
binding_key=replyQueueName)
# Declare a local queue to which we subscribe the reply-to queue
localQueueName = "weatherLocalQueue_" + session.name
localQueue = session.incoming(localQueueName)
session.message_subscribe(queue=replyQueueName,
destination=localQueueName)
localQueue.start()
# Now, create messages with requests.
weatherStates = ['ohio', 'virginia']
for state in weatherStates:
for i in range(1, 11):
message_properties = session.message_properties()
message_properties.reply_to = session.reply_to("amq.direct",
replyQueueName)
routingKey = "weather." + state
delivery_properties = session.delivery_properties(
routing_key=routingKey)
requestMsgText = "weather_report"
session.message_transfer(destination="amq.topic",
message=Message(message_properties,
delivery_properties,
requestMsgText))
print "Sent message " + str(i) + " with key " + routingKey
while True:
try:
message = localQueue.get(timeout=60)
content = message.body
session.message_accept(RangedSet(message.id))
print content
except Empty:
print "No more messages!"
break
Listing 4. Multiserver Server Side (Ohio)
#!/usr/bin/python
import subprocess
import os
from qpid.util import connect, ssl
from qpid.connection import Connection, sslwrap
from qpid.datatypes import Message, RangedSet, uuid4
from qpid.queue import Empty
from qpid.spec import load
from qpid.queue import Empty
from qpid.session import SessionException
# ProcessRequest: this is what actually does the work.
def processRequest(requestMessage):
print "Predicting the weather for Ohio"
myPid = os.getpid()
ret_value = "From Server PID " \
+ str(myPid) + ": Ohio is sunny and 70!"
return ret_value
# First, load the correct specification file.
locSpec = load('/usr/local/share/qpid/specs/amqp.0-10.xml')
# Now, connect to the server.
socket = connect("localhost", 5672)
connection = Connection(sock=socket, spec=locSpec,
username="guest", password="guest")
connection.start()
session = connection.session(str(uuid4()))
# Declare the listening server queue and connect to server queue.
# Create server queue if it does not exist.
myPid = os.getpid()
listenTopic = "#.ohio"
serverQueueName = "serverListenQueueOhio"
localQueueName = "localQueue_" + str(myPid)
try:
session.message_subscribe(queue=serverQueueName,
destination=localQueueName)
localQueue = session.incoming(localQueueName)
localQueue.start()
print "Successfully attached to existing server queue."
except SessionException, e:
print "Could not find server queue, so I am creating it."
session = connection.session(name=str(uuid4()), timeout=0)
session.queue_declare(queue=serverQueueName, exclusive=False)
session.exchange_bind(exchange="amq.topic",
queue=serverQueueName,
binding_key=listenTopic)
session.message_subscribe(queue=serverQueueName,
destination=localQueueName)
localQueue = session.incoming(localQueueName)
localQueue.start()
except Exception, e:
print "Something broke unexpectedly."
os.exit()
# Now, start a message loop.
while True:
try:
requestObj = localQueue.get(timeout=60)
session.message_accept(RangedSet(requestObj.id))
requestStr = requestObj.body
print "Received message."
requestProperties = requestObj.get("message_properties")
replyTo = requestProperties.reply_to
if replyTo == None:
raise Exception("This message is missing the "
+ "'reply_to' property, "
+ "which is required")
responseMessage = processRequest(requestStr)
props = session.delivery_properties(
routing_key=replyTo["routing_key"])
print "Responding to request."
session.message_transfer(destination=replyTo["exchange"],
message=Message(props, responseMessage))
except Empty:
continue
When you run this demonstration, run several copies each of the Ohio and Virginia servers. The messages for each state will be picked up in a round-robin manner by the respective instances of the server script. In turn, the client will print a listing of the weather forecasts with the server PIDs.
Realizing the promise of Apache® Hadoop® requires the effective deployment of compute, memory, storage and networking to achieve optimal results. With its flexibility and multitude of options, it is easy to over or under provision the server infrastructure, resulting in poor performance and high TCO. Join us for an in depth, technical discussion with industry experts from leading Hadoop and server companies who will provide insights into the key considerations for designing and deploying an optimal Hadoop cluster.
Sponsored by AMD
If you already use virtualized infrastructure, you are well on your way to leveraging the power of the cloud. Virtualization offers the promise of limitless resources, but how do you manage that scalability when your DevOps team doesn’t scale? In today’s hypercompetitive markets, fast results can make a difference between leading the pack vs. obsolescence. Organizations need more benefits from cloud computing than just raw resources. They need agility, flexibility, convenience, ROI, and control.
Stackato private Platform-as-a-Service technology from ActiveState extends your private cloud infrastructure by creating a private PaaS to provide on-demand availability, flexibility, control, and ultimately, faster time-to-market for your enterprise.
Sponsored by ActiveState
| Non-Linux FOSS: libnotify, OS X Style | Jun 18, 2013 |
| Containers—Not Virtual Machines—Are the Future Cloud | Jun 17, 2013 |
| Lock-Free Multi-Producer Multi-Consumer Queue on Ring Buffer | Jun 12, 2013 |
| Weechat, Irssi's Little Brother | Jun 11, 2013 |
| One Tail Just Isn't Enough | Jun 07, 2013 |
| Introduction to MapReduce with Hadoop on Linux | Jun 05, 2013 |
- Containers—Not Virtual Machines—Are the Future Cloud
- Non-Linux FOSS: libnotify, OS X Style
- Linux Systems Administrator
- Validate an E-Mail Address with PHP, the Right Way
- Lock-Free Multi-Producer Multi-Consumer Queue on Ring Buffer
- Senior Perl Developer
- Technical Support Rep
- UX Designer
- RSS Feeds
- Introduction to MapReduce with Hadoop on Linux
- yea
15 min 30 sec ago - One advantage with VMs
2 hours 44 min ago - about info
3 hours 17 min ago - info
3 hours 18 min ago - info
3 hours 19 min ago - info
3 hours 21 min ago - info
3 hours 22 min ago - abut info
3 hours 23 min ago - info
3 hours 24 min ago - info
3 hours 26 min ago






Comments
Article Graphic
Hello Everyone,
Please check out my blog update for the correct graphic:
http://www.globalherald.net/jb01/weblog/22.html
Thanks!
-JK