PHP使用kafka增加及简单操作

原创
小哥 3年前 (2022-11-05) 阅读数 353 #AI人工智能

在安装rdkafka前需要先安装librdkafka

MacOS安装
brew install librdkafka
CentOS
yum install librdkafka
Debian
apt install librdkafka
FreeBSD
pkg install librdkafka

使用PECL工具安装命令

pecl install rdkafka

源码安装

下载PHP版本对应的扩展版本
下载地址: http://pecl.php.net/package/rdkafka
PECL官网如果无法访问: https://github.com/wang-xuemin/pecl
下面以最新的PHP8.0.1为例,安装对应的rdkafka扩展。下载rdkafka-5.0.0.tgz

1、解压源码,进入目录
cd rdkafka-5.0.0
2、扩展php扩展模块,来生成编译检测脚本
phpize
3、执行configure,编译配置检测
./configure
4、执行make安装,编译并安装
make && make install

php.ini添加extension="rdkafka.so"

extension="rdkafka.so"

重启PHP,查看phpinfo()


php rdkafka安装的方法:首先下载安装librdkafka;然后安装php-rdkafka扩展;最后在php.ini中写入“extension=rdkafka.so”即可。

本文操作环境:windows7系统、php7.0版,DELL G3电脑

php-rdkafka 扩展安装

php有两种方式调用kafka

  • php-rdkafka

文档地址:https://arnaud-lb.github.io/php-rdkafka/phpdoc/book.rdkafka.html
rdkafka安装需要依赖librdkafka所以我们需要先安装librdkafka
下载地址http://pecl.php.net/package/rdkafka

git clone https://github.com/edenhill/librdkafka.git cd librdkafka ./configure make && make install

安装php-rdkafka扩展

git clone https://github.com/arnaud-lb/php-rdkafka.git cd php-rdkafka phpize ./configure --with-php-config=/usr/local/php7.0/bin/php-config make && make install

然后在php.ini写入

extension = rdkafka.so

  • kafka-php 扩展包

文档地址:https://github.com/weiboad/kafka-php

  • 简单示例

生成者

<?php

$rk = new RdKafka\Producer(); $rk->setLogLevel(LOG_DEBUG); $rk->addBrokers("192.168.2.152");

$topic = $rk->newTopic("shop");

for ($i = 0; $i < 10; $i++) { $topic->produce(RD_KAFKA_PARTITION_UA, 0, "发送信息: $i"); $rk->poll(0); }

while ($rk->getOutQLen() > 0) { $rk->poll(50); }

?>

消费者

<?php

$conf = new RdKafka\Conf();

$conf->set(group.id, myConsumerGroup);

$rk = new RdKafka\Consumer($conf); $rk->addBrokers("192.168.2.150:9092");

$topicConf = new RdKafka\TopicConf(); $topicConf->set(auto.commit.interval.ms, 100); $topicConf->set(offset.store.method, file); $topicConf->set(offset.store.path, sys_get_temp_dir()); $topicConf->set(auto.offset.reset, smallest);

$topic = $rk->newTopic("shop", $topicConf);

// Start consuming partition 0 $topic->consumeStart(0, RD_KAFKA_OFFSET_STORED);

while (true) { $message = $topic->consume(0, 120*10000); switch ($message->err) { case RD_KAFKA_RESP_ERR_NO_ERROR: //没有错误打印信息 var_dump($message); break; case RD_KAFKA_RESP_ERR__PARTITION_EOF: echo "等待接收信息\n"; break; case RD_KAFKA_RESP_ERR__TIMED_OUT: echo "超时\n"; break; default: throw new \Exception($message->errstr(), $message->err); break; } }

?>

版权声明

所有资源都来源于爬虫采集,如有侵权请联系我们,我们将立即删除