使用fluentd,kafka,mongodb搭建日志处理系统

设计实现

  • 采集: 可选的有fluentd, td-agent(fluentd的稳定版本, 二者区别), Apache flume.
  • 消息持久化队列: http://kafka.apache.org/ (分布式消息队列),
    • 数据源进行topic分流,实现Category
    • 作为一层buffer来适配输入输出的消息速率,解除系统耦合度
    • 作用类似于缓存,即活跃的数据和离线处理系统之间的缓存
    • kafka是显式分布式架构,producer、broker(Kafka)和consumer都可以有多个。Kafka的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。几个基本概念:
      1. message(消息)是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。如果consumer订阅了这个主题,那么新发布的消息就会广播给这些consumer。
      2. Kafka是显式分布式的,多个producer、consumer和broker可以运行在一个大的集群上,作为一个逻辑整体对外提供服务。对于consumer,多个consumer可以组成一个group,这个message只能传输给某个group中的某一个consumer.
  • 存储: mongo

Fluentd(td-agent)

Fluentd内置三种日志采集, file-tail, tcp, http-url, 通过插件可以支持更多的采集方式.

Install fluentd(td-agent) by RPM

$ curl -L http://toolbelt.treasuredata.com/sh/install-redhat.sh | sh

Install plugins

参考 http://blog.csdn.net/virusfu/article/details/9023237
安装插件: $ /usr/lib64/fluent/ruby/bin/gem install fluent-plugin-kafka
查看已安装的插件: $ /usr/lib64/fluent/ruby/bin/gem list | grep fluent-plugin-

Setup

参考 http://docs.fluentd.org/articles/config-file and http://www.r66r.net/?p=504

编辑 /etc/td-agent/td-agent.conf
td-agent和fluent不同之一就是: td配置文件默认输出到Treasure Data, 需要去掉配置文件里这部分.
td配置文件里重要的标签有三种:

  • source: 输入源
  • match :输出目的地
  • include:包含其它配置文件等
    下面是一个采集source(file-tail日志)的例子:
<source>
type tail
format apache
path /etc/httpd/logs/access_log
pos_file /tmp/td-agent/access_access.pos
tag apache.access
</source>

解释:

  • type: tail文件追加, 或者http, forward等, 也可以使用插件输入: type tail_ex
  • format: 使用td-agent内置的apach日志解析规则
  • patch: 日志文件路径
  • post_file: 建议使用这个参数, post_file保存读取log的长度, 在下次宕机重启后能继续收集
  • tag: 用于match

td-agent支持三种形式的日志采集, 不同的日志source配置也不同, 上面是file-tail的方式采集日志, 如果用http方式采集日志, 参考 http://docs.fluentd.org/articles/in_http ;
source配置如下:

<source>
type http
port 8994 # td-agent采集监听的port
tag http8994.access
</source>

测试命令: curl -X POST -d 'json={"action":"login","user":2}' http://10.11.0.9:8994/forum.php

match:

<match apache.access>
type file
path /tmp/td-agent/access_access.match
</match>

解释:
path输出的文件名, 默认生成名为”$path.日期.xxxx” 的文件.

Start up

  • $ /etc/init.d/td-agent start //启动
  • $ /etc/init.d/td-agent status //状态查看
  • $ /etc/init.d/td-agent reload //不重启重新载入配置
    td-agent执行日志在: /var/log/td-agent/td-agent.log

附例:

下面是一个输出到mongo的match例子:

<match mongo.**>
# plugin type
type mongo
# mongodb db + collection
database apache
collection access
# mongodb host + port
host localhost
port 27017
# interval
flush_interval 10s
</match>

fluent-plugin-kafka

https://github.com/htgc/fluent-plugin-kafka

例: match使用grep过滤, 并用tag发送到Kafka

<source>
type tail
path /home/wyyhzc/webApp.log
pos_file /home/wyyhzc/webApp.log.pos
tag webapp
format /^(?<message>(.*))$/
</source>

过滤一次:

<match webapp.**>       // 注意"webapp.*"符合"webapp.a", 但不符合"webapp"和"webapp.a.b". "webapp.**"符合前面所有
type grep // fluent-plugin-grep插件
regexp1 code ^4\d\d$
exclude1 referer ^https?://yourdomain.com
add_tag_prefix webapp_filtered // 为这个match加tag
</match>

发送到kafka

<match webapp_filtered.**>     // 对应上面的tag
type kafka
brokers hadoopdn1:9092 // kafka-server
default_topic webapp_log // topic
output_data_type json // 可选(json|ltsv|msgpack|attr:<record name>)
output_include_tag false
output_include_time flase
</match>

Kafka

Apache Kafka是用于发布—订阅消息传递,实现了分布式提交日志,适用于离线和在线消息消费。

  • 消息的发布(publish)称作producer生产者,消息的订阅(subscribe)称作consumer消费者,中间的存储阵列称作broker。生产者将消息发布到Kafka主题,消费者订阅这些主题并消费这些消息。
  • 多个broker协同合作,producer、consumer和broker三者之间通过zookeeper来协调请求和转发
  • producer产生和推送(push)数据到broker,consumer从broker拉取(pull)数据并进行处理

Install kafka (参考http://kafka.apache.org/documentation.html)

安装方式有两种, 自己通过源码编译kafka, 或者直接在apache下载编译好的二进制文件.

编译安装:

tar xzf kafka-<VERSION>.tgz
cd kafka-<VERSION>
sbt update
sbt package
sbt assembly-package-dependency
Kafka是用Scala写的,SBT是Simple Build Tool的简称,类似于Java的Maven。

下载二进制文件

wget http://apache.mirrors.hoobly.com/kafka/0.8.1.1/kafka_2.8.0-0.8.1.1.tgz
通过java -version查看JVM是32bit or 64 bit, 如果安装了32位的HotSpot VM,需要修改/bin/kafka-run-class.sh文件 去掉KAFKA_JVM_PERFORMANCE_OPTS的”-XX:+UseCompressedOops” 参数.

启动zookeeper和kafka进程

Kafka需要zookeeper服务, 如果没有安装zookeeper, 可以启动kafka自带的单点zookeeper:

  • 启动zookeeper: $ nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
  • 启动kafka: $ nohup bin/kafka-server-start.sh config/server.properties &
    停止kafka: bin/kafka-server-stop.sh
    停止zookeeper: bin/zookeeper-server-stop.sh

Zookeeper集群部署参考:

单机测试命令(可略过)

  1. 创建topic
    message以topic(主题)为单位, productor可以向某个topic发送消息, consumer可以订阅topic;
    bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test # 连接本机2181端口的zookeeper-serve
    bin/kafka-topics.sh –list –zookeeper localhost:2181 # 查看已创建的topic
    bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic test # 查看topic的描述

  2. 启动msg消费者
    bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning #

  3. 启动msg生产者
    新启动Terminal输入:
    bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
    然后输入一些测试消息并回车, 可以看到消费者log对msg进行消费.
    测试完成.

kafka和zookeeper集群

参考 http://kafka.apache.org/documentation.html
修改./config/server.properties下列参数:

broker.id=0  # 区分多个Kafka
port=9092 #
log.dir=/tmp/kafka-logs #
zookeeper.connect=nutch1:2181 # zookeeper集群地址

启动kafka : bin/kafka-server-start.sh config/server.properties

当然也可以在一台机器上启动多个kafka-server, 要修改上面的port和log.dir以区分不同的kafka-server, 步骤:

cp config/server.properties config/server-1.properties
cp config/server.properties config/server-2.properties

修改上面两个properties文件, 启动:

bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &

参考: