设计实现
- 采集: 可选的有fluentd, td-agent(fluentd的稳定版本, 二者区别), Apache flume.
- 消息持久化队列: http://kafka.apache.org/ (分布式消息队列),
- 数据源进行topic分流,实现Category
- 作为一层buffer来适配输入输出的消息速率,解除系统耦合度
- 作用类似于缓存,即活跃的数据和离线处理系统之间的缓存
- kafka是显式分布式架构,producer、broker(Kafka)和consumer都可以有多个。Kafka的作用类似于缓存,即活跃的数据和离线处理系统之间的缓存。几个基本概念:
- message(消息)是通信的基本单位,每个producer可以向一个topic(主题)发布一些消息。如果consumer订阅了这个主题,那么新发布的消息就会广播给这些consumer。
- Kafka是显式分布式的,多个producer、consumer和broker可以运行在一个大的集群上,作为一个逻辑整体对外提供服务。对于consumer,多个consumer可以组成一个group,这个message只能传输给某个group中的某一个consumer.
- 存储: mongo
Fluentd(td-agent)
Fluentd内置三种日志采集, file-tail, tcp, http-url, 通过插件可以支持更多的采集方式.
Install fluentd(td-agent) by RPM
$ curl -L http://toolbelt.treasuredata.com/sh/install-redhat.sh | sh
Install plugins
参考 http://blog.csdn.net/virusfu/article/details/9023237
安装插件: $ /usr/lib64/fluent/ruby/bin/gem install fluent-plugin-kafka
查看已安装的插件: $ /usr/lib64/fluent/ruby/bin/gem list | grep fluent-plugin-
Setup
参考 http://docs.fluentd.org/articles/config-file and http://www.r66r.net/?p=504
编辑 /etc/td-agent/td-agent.conf
td-agent和fluent不同之一就是: td配置文件默认输出到Treasure Data, 需要去掉配置文件里这部分.
td配置文件里重要的标签有三种:
- source: 输入源
- match :输出目的地
- include:包含其它配置文件等
下面是一个采集source(file-tail日志)的例子:
<source> |
解释:
- type: tail文件追加, 或者http, forward等, 也可以使用插件输入: type tail_ex
- format: 使用td-agent内置的apach日志解析规则
- patch: 日志文件路径
- post_file: 建议使用这个参数, post_file保存读取log的长度, 在下次宕机重启后能继续收集
- tag: 用于match
td-agent支持三种形式的日志采集, 不同的日志source配置也不同, 上面是file-tail的方式采集日志, 如果用http方式采集日志, 参考 http://docs.fluentd.org/articles/in_http ;
source配置如下:<source>
type http
port 8994 # td-agent采集监听的port
tag http8994.access
</source>
测试命令: curl -X POST -d 'json={"action":"login","user":2}' http://10.11.0.9:8994/forum.php
match:<match apache.access>
type file
path /tmp/td-agent/access_access.match
</match>
解释:
path输出的文件名, 默认生成名为”$path.日期.xxxx” 的文件.
Start up
- $ /etc/init.d/td-agent start //启动
- $ /etc/init.d/td-agent status //状态查看
- $ /etc/init.d/td-agent reload //不重启重新载入配置
td-agent执行日志在: /var/log/td-agent/td-agent.log
附例:
下面是一个输出到mongo的match例子:
<match mongo.**> |
fluent-plugin-kafka
https://github.com/htgc/fluent-plugin-kafka
例: match使用grep过滤, 并用tag发送到Kafka
<source> |
过滤一次:
<match webapp.**> // 注意"webapp.*"符合"webapp.a", 但不符合"webapp"和"webapp.a.b". "webapp.**"符合前面所有 |
发送到kafka
<match webapp_filtered.**> // 对应上面的tag |
Kafka
Apache Kafka是用于发布—订阅消息传递,实现了分布式提交日志,适用于离线和在线消息消费。
- 消息的发布(publish)称作producer生产者,消息的订阅(subscribe)称作consumer消费者,中间的存储阵列称作broker。生产者将消息发布到Kafka主题,消费者订阅这些主题并消费这些消息。
- 多个broker协同合作,producer、consumer和broker三者之间通过zookeeper来协调请求和转发
- producer产生和推送(push)数据到broker,consumer从broker拉取(pull)数据并进行处理
Install kafka (参考http://kafka.apache.org/documentation.html)
安装方式有两种, 自己通过源码编译kafka, 或者直接在apache下载编译好的二进制文件.
编译安装:
tar xzf kafka-<VERSION>.tgz |
下载二进制文件
wget http://apache.mirrors.hoobly.com/kafka/0.8.1.1/kafka_2.8.0-0.8.1.1.tgz
通过java -version
查看JVM是32bit or 64 bit, 如果安装了32位的HotSpot VM,需要修改/bin/kafka-run-class.sh文件 去掉KAFKA_JVM_PERFORMANCE_OPTS的”-XX:+UseCompressedOops” 参数.
启动zookeeper和kafka进程
Kafka需要zookeeper服务, 如果没有安装zookeeper, 可以启动kafka自带的单点zookeeper:
- 启动zookeeper:
$ nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
- 启动kafka: $
nohup bin/kafka-server-start.sh config/server.properties &
停止kafka:bin/kafka-server-stop.sh
停止zookeeper:bin/zookeeper-server-stop.sh
Zookeeper集群部署参考:
单机测试命令(可略过)
创建topic
message以topic(主题)为单位, productor可以向某个topic发送消息, consumer可以订阅topic;
bin/kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test # 连接本机2181端口的zookeeper-serve
bin/kafka-topics.sh –list –zookeeper localhost:2181 # 查看已创建的topic
bin/kafka-topics.sh –describe –zookeeper localhost:2181 –topic test # 查看topic的描述启动msg消费者
bin/kafka-console-consumer.sh –zookeeper localhost:2181 –topic test –from-beginning #启动msg生产者
新启动Terminal输入:
bin/kafka-console-producer.sh –broker-list localhost:9092 –topic test
然后输入一些测试消息并回车, 可以看到消费者log对msg进行消费.
测试完成.
kafka和zookeeper集群
参考 http://kafka.apache.org/documentation.html
修改./config/server.properties下列参数:
broker.id=0 # 区分多个Kafka |
- kafka-server配置文件参考 http://kafka.apache.org/documentation.html // Broker Configs
- Topic配置参考 http://kafka.apache.org/documentation.html // Topic-level configuration
- Consumer配置参考: http://kafka.apache.org/documentation.html // Consumer Configs , 重要参数group.id / zookeeper.connect
- Producer配置参考: http://kafka.apache.org/documentation.html // Producer Configs,
启动kafka : bin/kafka-server-start.sh config/server.properties
当然也可以在一台机器上启动多个kafka-server, 要修改上面的port和log.dir以区分不同的kafka-server, 步骤:
cp config/server.properties config/server-1.properties |
修改上面两个properties文件, 启动:bin/kafka-server-start.sh config/server-1.properties &
bin/kafka-server-start.sh config/server-2.properties &
参考:
- fluentd+kafka+mongo: http://noops.me/?p=1325
- Fluentd + MongoDB http://blog.nosqlfan.com/html/3521.html
- kafka介绍:
- kafka部署: http://shift-alt-ctrl.iteye.com/blog/1930791
- tdagent: http://www.r66r.net/?p=504
- flume-ng+Kafka+Storm+HDFS 实时系统搭建 http://blog.csdn.net/weijonathan/article/details/18301321
- flume+kafka+storm+mysql架构设计 http://blog.csdn.net/mylittlered/article/details/20810265
- 基于Flume的美团日志收集系统(一)架构和设计 http://tech.meituan.com/mt-log-system-arch.html
- 使用Fluentd + MongoDB构建实时日志收集系统 http://blog.nosqlfan.com/html/3521.html