Advanced Message Queuing Protocol (AMQP)
Listing 3. Multiserver Weather Client
#!/usr/bin/python
from qpid.util import connect, ssl
from qpid.connection import Connection, sslwrap
from qpid.datatypes import Message
from qpid.datatypes import RangedSet
from qpid.datatypes import uuid4
from qpid.queue import Empty
from qpid.spec import load
# First, load the correct specification file.
amqSpec = load('/usr/local/share/qpid/specs/amqp.0-10.xml')
# Now, connect to the server.
socket = connect("localhost", 5672)
connection = Connection (sock=socket,
spec=amqSpec,
username = "guest",
password = "guest")
connection.start()
session = connection.session(str(uuid4()))
# Declare the reply queue:
replyQueueName = "weatherReply_" + session.name
replyQueue = session.queue_declare(queue=replyQueueName,
exclusive=True,
auto_delete=True)
session.exchange_bind(exchange="amq.direct",
queue=replyQueueName,
binding_key=replyQueueName)
# Declare a local queue to which we subscribe the reply-to queue
localQueueName = "weatherLocalQueue_" + session.name
localQueue = session.incoming(localQueueName)
session.message_subscribe(queue=replyQueueName,
destination=localQueueName)
localQueue.start()
# Now, create messages with requests.
weatherStates = ['ohio', 'virginia']
for state in weatherStates:
for i in range(1, 11):
message_properties = session.message_properties()
message_properties.reply_to = session.reply_to("amq.direct",
replyQueueName)
routingKey = "weather." + state
delivery_properties = session.delivery_properties(
routing_key=routingKey)
requestMsgText = "weather_report"
session.message_transfer(destination="amq.topic",
message=Message(message_properties,
delivery_properties,
requestMsgText))
print "Sent message " + str(i) + " with key " + routingKey
while True:
try:
message = localQueue.get(timeout=60)
content = message.body
session.message_accept(RangedSet(message.id))
print content
except Empty:
print "No more messages!"
break
Listing 4. Multiserver Server Side (Ohio)
#!/usr/bin/python
import subprocess
import os
from qpid.util import connect, ssl
from qpid.connection import Connection, sslwrap
from qpid.datatypes import Message, RangedSet, uuid4
from qpid.queue import Empty
from qpid.spec import load
from qpid.queue import Empty
from qpid.session import SessionException
# ProcessRequest: this is what actually does the work.
def processRequest(requestMessage):
print "Predicting the weather for Ohio"
myPid = os.getpid()
ret_value = "From Server PID " \
+ str(myPid) + ": Ohio is sunny and 70!"
return ret_value
# First, load the correct specification file.
locSpec = load('/usr/local/share/qpid/specs/amqp.0-10.xml')
# Now, connect to the server.
socket = connect("localhost", 5672)
connection = Connection(sock=socket, spec=locSpec,
username="guest", password="guest")
connection.start()
session = connection.session(str(uuid4()))
# Declare the listening server queue and connect to server queue.
# Create server queue if it does not exist.
myPid = os.getpid()
listenTopic = "#.ohio"
serverQueueName = "serverListenQueueOhio"
localQueueName = "localQueue_" + str(myPid)
try:
session.message_subscribe(queue=serverQueueName,
destination=localQueueName)
localQueue = session.incoming(localQueueName)
localQueue.start()
print "Successfully attached to existing server queue."
except SessionException, e:
print "Could not find server queue, so I am creating it."
session = connection.session(name=str(uuid4()), timeout=0)
session.queue_declare(queue=serverQueueName, exclusive=False)
session.exchange_bind(exchange="amq.topic",
queue=serverQueueName,
binding_key=listenTopic)
session.message_subscribe(queue=serverQueueName,
destination=localQueueName)
localQueue = session.incoming(localQueueName)
localQueue.start()
except Exception, e:
print "Something broke unexpectedly."
os.exit()
# Now, start a message loop.
while True:
try:
requestObj = localQueue.get(timeout=60)
session.message_accept(RangedSet(requestObj.id))
requestStr = requestObj.body
print "Received message."
requestProperties = requestObj.get("message_properties")
replyTo = requestProperties.reply_to
if replyTo == None:
raise Exception("This message is missing the "
+ "'reply_to' property, "
+ "which is required")
responseMessage = processRequest(requestStr)
props = session.delivery_properties(
routing_key=replyTo["routing_key"])
print "Responding to request."
session.message_transfer(destination=replyTo["exchange"],
message=Message(props, responseMessage))
except Empty:
continue
When you run this demonstration, run several copies each of the Ohio and Virginia servers. The messages for each state will be picked up in a round-robin manner by the respective instances of the server script. In turn, the client will print a listing of the weather forecasts with the server PIDs.
Today’s modular x86 servers are compute-centric, designed as a least common denominator to support a wide range of IT workloads. Those generic, virtualized IT workloads have much different resource optimization requirements than hyperscale and cloud applications. They have resulted in a “one size fits all” enterprise IT architecture that is not optimized for a specific set of IT workloads, and especially not emerging hyperscale workloads, such as web applications, big data, and object storage. In this report, you will learn how shifting the focus from traditional compute-centric IT architectures to an innovative disaggregated fabric-based architecture can optimize and scale your data center.
Sponsored by AMD
Built-in forensics, incident response, and security with Red Hat Enterprise Linux 6
Every security policy provides guidance and requirements for ensuring adequate protection of information and data, as well as high-level technical and administrative security requirements for a system in a given environment. Traditionally, providing security for a system focuses on the confidentiality of the information on it. However, protecting the data integrity and system and data availability is just as important. For example, when processing United States intelligence information, there are three attributes that require protection: confidentiality, integrity, and availability.
Learn more about catching the bad guy in this free white paper.
Sponsored by DLT Solutions
| Making Linux and Android Get Along (It's Not as Hard as It Sounds) | May 16, 2013 |
| Drupal Is a Framework: Why Everyone Needs to Understand This | May 15, 2013 |
| Home, My Backup Data Center | May 13, 2013 |
| Non-Linux FOSS: Seashore | May 10, 2013 |
| Trying to Tame the Tablet | May 08, 2013 |
| Dart: a New Web Programming Experience | May 07, 2013 |
- RSS Feeds
- New Products
- Making Linux and Android Get Along (It's Not as Hard as It Sounds)
- Drupal Is a Framework: Why Everyone Needs to Understand This
- A Topic for Discussion - Open Source Feature-Richness?
- Home, My Backup Data Center
- Developer Poll
- Dart: a New Web Programming Experience
- Readers' Choice Awards
- What's the tweeting protocol?
- Linux is good
1 hour 7 min ago - Reply to comment | Linux Journal
1 hour 24 min ago - Web Hosting IQ
1 hour 54 min ago - Web Hosting IQ
1 hour 55 min ago - Web Hosting IQ
1 hour 55 min ago - Reply to comment | Linux Journal
4 hours 56 min ago - play with linux? i think you mean work-around linux
13 hours 22 min ago - Where is Epistle?
13 hours 28 min ago - You forgot OwnCloud
13 hours 58 min ago - aplikasi free
17 hours 12 min ago






Comments
Article Graphic
Hello Everyone,
Please check out my blog update for the correct graphic:
http://www.globalherald.net/jb01/weblog/22.html
Thanks!
-JK