日志写入量较高的情况下,需要考虑一个缓冲器来延迟写入日志,这里给的是Kafka的方法。

我的环境

  • Laravel5.5
  • Kafka

需要安装的扩展包

扩展包解释
nmred/kafka-phpKafka的PHP扩展包

开始

首先,我们需要自定义一个 LogHandler 来替代 Laravel 内置的日志记录服务,于是,我在 app/Hubs 创建了 KafkaLogHandler.php

<?php

namespace App\Hubs;

use Monolog\Handler\AbstractProcessingHandler;

class KafkaLogHandler extends AbstractProcessingHandler
{

        public function __construct()
        {
                $this->bubble = false;
        }

    protected function write(array $record)
    {

        $config = \Kafka\ProducerConfig::getInstance();
        $config->setMetadataRefreshIntervalMs(10000);
        $config->setMetadataBrokerList('127.0.0.1:32774');
        $config->setBrokerVersion('1.0.0');
        $config->setRequiredAck(1);
        $config->setIsAsyn(false);
        $config->setProduceInterval(500);
        $producer = new \Kafka\Producer();
        $producer->send([
            [
                'topic' => 'logtest',
                'value' => json_encode($record),
                'key' => '',
            ],
        ]);
    }

}

这里为了简便,Kafka的配置就没有做单独处理了,包括这里的 Topic,IP和端口都可以写入额外的配置文件中。注意,你需要在Kafka中创建好Topic,我这里的是logtest

接下来,我们需要将它注册到 Laravel 中,在 Laravel5.5版本中,下面的这段代码应该写在 bootstrap/app.php 文件中:

$app->configureMonologUsing(function ($monolog) {
    $monolog->pushHandler(new \App\Hubs\KafkaLogHandler);
});

这样,我们自定义的 Kafka 处理器就替代了 Laravel 的日志记录,因为这里禁止了冒泡处理,到这里就停止继续执行下去了。

写入到 kafka 后,我们还需要一个消费器去将数据同步到 MySQL 中,于是,这里可以创建一个命令:

php artisan make:command SyncLogDataCommand

内容如下:

public function handle()
    {
        $config = \Kafka\ConsumerConfig::getInstance();
        $config->setMetadataRefreshIntervalMs(10000);
        $config->setMetadataBrokerList('127.0.0.1:32774');
        $config->setGroupId('test');
        $config->setBrokerVersion('1.0.0');
        $config->setTopics(['logtest']);
        $consumer = new \Kafka\Consumer();
        $consumer->start(function($topic, $part, $message) {
            var_dump($message);
        });
    }

这里我并没有将数据插入到Mysql中,为了方便就直接输出了,接下来我们看下效果,我们定义个路由:

Route::get('/', function () {
    \Illuminate\Support\Facades\Log::info(123);
    dd(123);
});

然后在运行刚才自定义的命令:

php artisan log:kafka

访问 127.0.0.1:8000 ,可以看到输出结果:

array(3) {
  ["offset"]=>
  int(302)
  ["size"]=>
  int(257)
  ["message"]=>
  array(6) {
    ["crc"]=>
    int(1313338034)
    ["magic"]=>
    int(1)
    ["attr"]=>
    int(0)
    ["timestamp"]=>
    int(-1)
    ["key"]=>
    string(0) ""
    ["value"]=>
    string(235) "{"message":"123","context":[],"level":200,"level_name":"INFO","channel":"local","datetime":{"date":"2018-11-23 11:17:37.252694","timezone_type":3,"timezone":"PRC"},"extra":[],"formatted":"[2018-11-23 11:17:37] local.INFO: 123 [] []\n"}"
  }
}

OK,到这里我们就成功的使用 kafka 来记录 Laravel 的日志了。

这里可以使用 supervisor 监听该条命令,同时可以开启多个线程,kafka的同组consumer只会有一个consumer获取到消息,所以也避免了多线程的并发写入重复问题。