用PHP尝试RabbitMQ(amqp扩展)实现消息的发送和接收

上篇文章我们介绍了amqp扩展在windows下的安装方法,这里我们看一下用法。

消费者:接收消息

逻辑:
创建连接-->创建channel-->创建交换机-->创建队列-->绑定交换机/队列/路由键-->接收消息

<?php
/*************************************
* PHP amqp(RabbitMQ) Demo - consumer
* Author: Linvo
* Date: 2012/7/30
*************************************/
//配置信息
$conn_args = array(
    'host' => '192.168.1.93',
    'port' => '5672',
    'login' => 'guest',
    'password' => 'guest',
    'vhost'=>'/'
);
$e_name = 'e_linvo'; //交换机名
$q_name = 'q_linvo'; //队列名
$k_route = 'key_1'; //路由key

//创建连接和channel
$conn = new AMQPConnection($conn_args);
if (!$conn->connect()) {
    die("Cannot connect to the broker!\n");
}
$channel = new AMQPChannel($conn);

//创建交换机
$ex = new AMQPExchange($channel);
$ex->setName($e_name);
$ex->setType(AMQP_EX_TYPE_DIRECT); //direct类型
$ex->setFlags(AMQP_DURABLE); //持久化
echo "Exchange Status:".$ex->declare()."\n";

//创建队列
$q = new AMQPQueue($channel);
$q->setName($q_name);
$q->setFlags(AMQP_DURABLE); //持久化
echo "Message Total:".$q->declare()."\n";

//绑定交换机与队列,并指定路由键
echo 'Queue Bind: '.$q->bind($e_name, $k_route)."\n";

//阻塞模式接收消息
echo "Message:\n";
while(True){
    $q->consume('processMessage');
    //$q->consume('processMessage', AMQP_AUTOACK); //自动ACK应答
}
$conn->disconnect();

/**
* 消费回调函数
* 处理消息
*/
function processMessage($envelope, $queue) {
    $msg = $envelope->getBody();
    echo $msg."\n"; //处理消息
    $queue->ack($envelope->getDeliveryTag()); //手动发送ACK应答
}

生产者:发送消息

逻辑:
创建连接-->创建channel-->创建交换机对象-->发送消息 Continue reading

windows下安装rabbitmq的php扩展amqp(原创)

从php官方下载相应的版本http://pecl.php.net/package/amqp,我这里使用的是1.4.0版本(http://pecl.php.net/package/amqp/1.4.0/windows
根据当前使用的php版本选择相应的扩展dll,下载后是一个压缩包,里面有两个dll扩展(php_amqp.dll和rabbitmq.1.dll)。

php_amqp

我的环境是64位的,php5.5.12.所以使用的是http://windows.php.net/downloads/pecl/releases/amqp/1.4.0/php_amqp-1.4.0-5.5-ts-vc11-x64.zip

1.将php_amqp.dll放在php的ext目录里,然后修改php.ini文件,在文件的最后面添加两行

[amqp]
extension=php_amqp.dll

2.将rabbitmq.1.dll文件放在php的根目录里(也就是ext目录的父级目录),然后修改apache的httpd.con文件,文件尾部添加一行

LoadFile  "d:/wamp/bin/php/php5.5.12/rabbitmq.1.dll"

这里的路径根据情况修改,我这里使用的wampserver软件。

3.重启apache,并查看phpinfo信息。只要看到amqp 字样即可。

amqp

下面我们看一下rabbitmq扩展amqp的用法:用PHP尝试RabbitMQ(amqp扩展)实现消息的发送和接收

RabbitMQ中的ack介绍

no_ack 的用途:确保 message 被 consumer “成功”处理了。这里“成功”的意思是,(在设置了 no_ack=false 的情况下)只要 consumer 手动应答了 Basic.Ack ,就算其“成功”处理了。

对于ack简单的说就是“消费者”先从queue里读取一条数据,然后去处理,等处理完了,再给queue一个 ack 回应,表示处理完了,这时queue就将这条数据从队列中删除。如果不回应给队列ack的话,则这条消息仍然存在在queue中(这个也属于一种应用场景)。 Continue reading

RabbitMQ管理后台操作流程

默认的管理地址为 http://localhost:15672

rabbit_steps

1.创建一个虚拟主机

这个在后台的"Admin"页面的右侧第二个菜单可以找到。为了方便管理,每一个步都使用一个前缀来命名,每一步的前缀是不一定的,这样方便管理。

命名:VHost_虚拟主机名称

2.创建一个交换机

命名:Ex_交换机名称

3.创建一个队列

命名:Queue_队列名称

4.将交换机与队列进行绑定,通过指定路由关键字实现

命名:RouteKey_路由关键字

RabbitMQ中Ack的介绍:http://blog.haohtml.com/archives/15285

RabbitMQ的结构图如下

rabbitmq-struct

几个概念说明:
Broker:简单来说就是消息队列服务器实体。
Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
Queue:消息队列载体,每个消息都会被投入到一个或多个队列。
Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来。
Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
vhost:虚拟主机,一个broker里可以开设多个vhost,用作不同用户的权限分离。
producer:消息生产者,就是投递消息的程序。
consumer:消息消费者,就是接受消息的程序。
channel:消息通道,在客户端的每个连接里,可建立多个channel,每个channel代表一个会话任务。

Continue reading

将rabbitmq 里的消息持久化

消息持久设置:

1) 将交换机置为可持久;
2) 将通道置为可持久
3) 消息发送时设置可持久。
当我们“生产”了一条可持久化的消息,尝试中断MQ服务,启动消费者获取消息,消息依然能够恢复。相反,则抛出异常。
上面三条必须全部设置,不然消息将无法持久化。

rabbitmq 添加远程访问功能

刚刚安装过的rabbitmq 消息队列,并启用了插件管理功能,3.3.1版中,处于安全的考虑,guest这个默认的用户只能通过http://localhost:15672 来登录,其他的IP无法直接使用这个账号。 这对于服务器上没有安装桌面的情况是无法管理维护的,除非通过在前面添加一层代理向外提供服务,这个又有些麻烦了,这里通过配置文件来实现这个功能。

只要编辑 /etc/rabbitmq/rabbitmq.config 文件,添加以下配置就可以了。

[
{rabbit, [{tcp_listeners, [5672]}, {loopback_users, ["asdf"]}]}
].

现在添加了一个新授权用户asdf,可以通过外网使用这个用户名和密码访问.(记得要先用命令添加这个命令才行,  #rabbitmqctl  add_user asdf pwd123456)我是通过在管理平台里直接添加的用户和密码的,我的测试环境装有桌面的。
参考文档:http://www.rabbitmq.com/access-control.html

# rabbitmqctl add_user asdf pwd123456
# rabbitmqctl list_users
Listing users ...
asdf
guest   [administrator]

Setting permissions for user "asdf" in vhost "/" ...

# rabbitmqctl set_permissions -p "/" asdf ".*" ".*" ".*"
# rabbitmqctl list_permissions -p /
Listing permissions in vhost "/" ...
asdf .* .* .*
guest .* .* .*

可以看到添加用户成功了,但不是administrator角色,这里我们也将asdf用户设置为administrator角色.

# rabbitmqctl set_user_tags asdf administrator
Setting tags for user "asdf" to [administrator] ..

# rabbitmqctl list_users
Listing users ...
asdf [administrator]
guest [administrator]

我用rpm包安装的rabbitmq,所以提供默认的配置参考文件(/usr/share/doc/rabbitmq-server-3.5.0/rabbitmq.config.example),如果你使用源码编译的话,可以找到一个默认的配置文件rabbitmq.config.example,点击这里下载此文件rabbitmq.config

$sudo rabbitmqctl set_permissions -p /vhost1 user_admin '.*' '.*' '.*'

该命令使用户user_admin具有/vhost1这个virtual host中所有资源的配置、写、读权限以便管理其中的资源

centos下安装RabbitMQ消息队列

这里环境为centos7 64位.
一。安装erlang

su -c 'rpm -Uvh http://download.fedoraproject.org/pub/epel/7/x86_64/e/epel-release-7-5.noarch.rpm'
sudo yum install erlang

二。安装rabbitmq

我们是用CentOS7(RHEL7也一样),可以从这里:http://fedoraproject.org/wiki/EPEL/FAQ#howtouse 找到安装有erlang的RHEL7(CentOS同)软件仓库并安装:

wget -c http://www.rabbitmq.com/releases/rabbitmq-server/v3.5.0/rabbitmq-server-3.5.0-1.noarch.rpm

sudo rpm --import http://www.rabbitmq.com/rabbitmq-signing-key-public.asc
sudo yum install rabbitmq-server-3.5.0-1.noarch.rpm

三。启用rabbitmq

sudo chkconfig rabbitmq-server on

As an administrator, start and stop the server as usual using /sbin/service rabbitmq-server stop/start/etc.

sudo /sbin/service rabbitmq-server start

注意:如果通过上面的start命令启动失败,就检查一下下面的端口是否被占用,否则服务启动不了:

  • 4369(epmd), 25672(Erlang distribution)
  • 56725671(AMQP 0-9-1 without and with TLS)
  • 15672(if management plugin is enabled)
  • 6161361614(if STOMP is enabled)
  • 18838883(if MQTT is enabled)

使用rpm安装完rabbitmq后,默认在/etc/rabbitmq/目录里是没有rabbitmq.config文件的,你可以手动创建,也可以复制一份默认的配置文件(/usr/share/doc/rabbitmq-server-3.5.0/rabbitmq.config.example )

默认只允许guest用户通过localhost本机访问,远程是无法访问的,而一般服务器不安装桌面的,所以我们需要配置允许远程访问.

四。启用管理插件,这样可以通过浏览器访问(http://www.rabbitmq.com/management.html#configuration)

rabbitmq-plugins enable rabbitmq_management

可以看到15672端口已在监听。

http://www.rabbitmq.com/access-control.html

================================
RabbitMQ

wget "http://pypi.python.org/packages/source/s/simplejson/simplejson-2.0.9.tar.gz#md5=af5e67a39ca3408563411d357e6d5e47"
tar zxvf simplejson-2.0.9.tar.gz
cd simplejson-2.0.9
python setup.py build
python setup.py install

Continue reading