博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RabbitMQ消息队列(二):”Hello, World“
阅读量:5956 次
发布时间:2019-06-19

本文共 3164 字,大约阅读时间需要 10 分钟。

本文将使用Python(pika 0.9.8)实现从Producer到Consumer传递数据”Hello, World“。

首先复习一下上篇所学:RabbitMQ实现了AMQP定义的消息队列。它实现的功能”非常简单“:从Producer接收数据然后传递到Consumer。它能保证多并发,数据安全传递,可扩展。

和任何的Hello world一样,它们都不复杂。我们将会设计两个程序,一个发送Hello world,另一个接收这个数据并且打印到屏幕。

整体的设计如下图:

1. 环境配置

RabbitMQ 实现了AMQP。因此,我们需要安装AMPQ的library。幸运的是对于多种编程语言都有实现。我们可以使用以下lib的任何一个:

在这里我们将使用pika. 可以通过 包管理工具来安装:

$ sudo pip install pika==0.9.8复制代码

这个安装依赖于pip和git-core。

  • On Ubuntu:
    $ sudo apt-get install python-pip git-core复制代码
  • On Debian:
    $ sudo apt-get install python-setuptools git-core$ sudo easy_install pip复制代码
  • On Windows:To install easy_install, run the MS Windows Installer for

    > easy_install pip> pip install pika==0.9.8复制代码

2. Sending

第一个program send.py:发送Hello world 到queue。正如我们在上篇文章提到的,你程序的第一句话就是建立连接,第二句话就是创建channel:

#!/usr/bin/env pythonimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(               'localhost'))channel = connection.channel()复制代码

创建连接传入的参数就是RabbitMQ Server的ip或者name。

关于谁创建queue,上篇文章也讨论过:Producer和Consumer都应该去创建。

接下来我们创建名字为hello的queue:

channel.queue_declare(queue='hello')复制代码

创建了channel,我们可以通过相应的命令来list queue:

$ sudo rabbitmqctl list_queuesListing queues ...hello    0...done.复制代码

现在我们已经准备好了发送了。

从架构图可以看出,Producer只能发送到exchange,它是不能直接发送到queue的。现在我们使用默认的exchange(名字是空字符)。这个默认的exchange允许我们发送给指定的queue。routing_key就是指定的queue名字。

channel.basic_publish(exchange='',                      routing_key='hello',                      body='Hello World!')print " [x] Sent 'Hello World!'"复制代码

退出前别忘了关闭connection。

connection.close()复制代码

3. Receiving

第二个program receive.py 将从queue中获取Message并且打印到屏幕。

第一步还是创建connection。第二步创建channel。第三步创建queue,name = hello:

channel.queue_declare(queue='hello')复制代码

接下来要subscribe了。在这之前,需要声明一个回调函数来处理接收到的数据。

def callback(ch, method, properties, body):    print " [x] Received %r" % (body,)复制代码

subscribe:

channel.basic_consume(callback,                      queue='hello',                      no_ack=True)复制代码

最后,准备好无限循环监听吧:

print ' [*] Waiting for messages. To exit press CTRL+C'channel.start_consuming()复制代码

4. 最终版本

send.py:

#!/usr/bin/env pythonimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(        host='localhost'))channel = connection.channel()channel.queue_declare(queue='hello')channel.basic_publish(exchange='',                      routing_key='hello',                      body='Hello World!')print " [x] Sent 'Hello World!'"connection.close()复制代码

receive.py:

#!/usr/bin/env pythonimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(        host='localhost'))channel = connection.channel()channel.queue_declare(queue='hello')print ' [*] Waiting for messages. To exit press CTRL+C'def callback(ch, method, properties, body):    print " [x] Received %r" % (body,)channel.basic_consume(callback,                      queue='hello',                      no_ack=True)channel.start_consuming()复制代码

5. 最终运行

先运行 send.py program:

$ python send.py [x] Sent 'Hello World!'复制代码

send.py 每次运行完都会停止。注意:现在数据已经存到queue里了。接收它:

$ python receive.py [*] Waiting for messages. To exit press CTRL+C [x] Received 'Hello World!'复制代码

接下来,就要奉上更接近实际环境的例子。取决与我的课余时间啊。。。

参考文献:

1. http://www.com/tutorials/tutorial-one-python.html

转载地址:http://oyrxx.baihongyu.com/

你可能感兴趣的文章
CollectionView水平和竖直瀑布流的实现
查看>>
前端知识复习一(css)
查看>>
spark集群启动步骤及web ui查看
查看>>
利用WCF改进文件流传输的三种方式
查看>>
Spring学习总结(2)——Spring的常用注解
查看>>
关于IT行业人员吃的都是青春饭?[透彻讲解]
查看>>
钱到用时方恨少(随记)
查看>>
mybatis主键返回的实现
查看>>
org.openqa.selenium.StaleElementReferenceException
查看>>
数论之 莫比乌斯函数
查看>>
linux下查找某个文件位置的方法
查看>>
python之MySQL学习——数据操作
查看>>
Harmonic Number (II)
查看>>
长连接、短连接、长轮询和WebSocket
查看>>
day30 模拟ssh远程执行命令
查看>>
做错的题目——给Array附加属性
查看>>
Url.Action取消字符转义
查看>>
HBase 笔记3
查看>>
Linux嵌入式GDB调试环境搭建
查看>>
java分析jvm常用指令
查看>>