实战大数据-01-构建日志采集和分析平台

一、构建Flume集群

需求描述:每天有海量的用户访问新闻网站,那么新闻网站需要多台Web服务器分摊用户的访问压力,而且用户访问新闻网站产生的日志数据写入web服务器落盘。为了分析新闻网站的用户行为,需要通过Flume 将用户日志数据采集到大数据平台。如果每台Flume采集服务直接将数据写入大数据平台,会造成很大的 I/O 压力,所以需要增加Flume聚合层对来自采集节点的数据进行聚合,它能减少对大数据平台的压力。Flume的采集层和聚合层共同形成了Flume集群。

本项目有 3 个实验集群节点:centos01、centos02和centos03, centos01作为 Flume采集节点,采集Web服务器日志,选择centos02和centos03作为Flume聚合节点,聚合来自 centos01 节点采集的数据,要求 centos01 采集节点的数据负载均衡到centos02 和 centos03节点。

1、配置Flume采集服务

由于web服务器有多台,这里以 centos01 作为采集节点配置即可。
在 centos01 节点上,进入 Flume 安装目录下的 conf目录,新建一个配置文件 taildir-file-selector-avro.properties ,具体配置如下:

$ cd /opt/modules/apache-flume-1.8.0-bin/conf
$ vi taildir-file-selector-avro.properties

文件内容:

# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'

# 定义Source、Channel、Sink的名称
agent1.sources = taildirSource
agent1.channels = fileChannel
agent1.sinkgroups = g1
agent1.sinks = k1 k2

# Define and configure an exec source
agent1.sources.taildirSource.type = TAILDIR
agent1.sources.taildirSource.positionFile = /home/hadoop/data/flume/taildir_position.json
agent1.sources.taildirSource.filegroups = f1
agent1.sources.taildirSource.filegroups.f1 = /home/hadoop/data/flume/logs/sogou.log
agent1.sources.taildirSource.channels = fileChannel

# 定义和配置一个fileChannel
agent1.channels.fileChannel.type = file
agent1.channels.fileChannel.checkpointDir = /home/hadoop/data/flume/checkpointDir
agent1.channels.fileChannel.dataDirs = /home/hadoop/data/flume/dataDirs

# 定义和配置一个sink组
agent1.sinkgroups.g1.sinks = k1 k2

# 为sink组定义一个处理器,load_balance 负载均衡  failover故障切换
agent1.sinkgroups.g1.processor.type = load_balance
agent1.sinkgroups.g1.processor.backoff = true

# round_robin 轮询  random随机
agent1.sinkgroups.g1.processor.selector = round_robin
agent1.sinkgroups.g1.processor.selector.maxTimeOut=10000

# 定义一个sink将数据发送给centos02节点
agent1.sinks.k1.type = avro
agent1.sinks.k1.channel = fileChannel
agent1.sinks.k1.batchSize = 1
agent1.sinks.k1.hostname = centos02
agent1.sinks.k1.port = 1234

# 定义一个sink将数据发送给centos03节点
agent1.sinks.k2.type = avro
agent1.sinks.k2.channel = fileChannel
agent1.sinks.k2.batchSize = 1
agent1.sinks.k2.hostname = centos03
agent1.sinks.k2.port = 1234

Flume采集配置中,Source类型选择 TAILDIR 类型实时采集文件的新增内容,Channel类型选择file类型将采集的数据持久化到本地磁盘,Sink 选择 avro 类型将数据再发送给聚合节点的Flume服务。

2、配置Flume聚合服务

Flume采集 centos01 节点的数据,发送到两个 Flume聚合节点 centos02 和 centos03,那么就需要配置聚合节点的Flume服务来接收数据,而且两个聚合节点Flume配置是一致的。

复制centos01 的Flume安装节点到 centos02、centos03

$ scp -r /opt/modules/apache-flume-1.8.0-bin/  hadoop@centos02:/opt/modules/
$ scp -r /opt/modules/apache-flume-1.8.0-bin/  hadoop@centos03:/opt/modules/

说明:hadoop用户密码为:hadoop@123

在centos02、centos03 Flume安装目录下的conf目录,新建一个配置文件 avro-file-selector-logger.properties


# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'

# 定义Source、Channel、Sink的名称
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1

# Define and configure an avro
agent1.sources.r1.type = avro
agent1.sources.r1.channels = c1
agent1.sources.r1.bind = 0.0.0.0
agent1.sources.r1.port = 1234

# Configure channel
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir = /home/hadoop/data/flume/checkpointDir
agent1.channels.c1.dataDirs = /home/hadoop/data/flume/dataDirs

# Define and configure a  sink
agent1.sinks.k1.type = logger
agent1.sinks.k1.channel = c1

3、Flume集群测试

(1)、启动Flume聚合服务
进入 /opt/modules/apache-flume-1.8.0-bin/conf 目录,在 centos02 和 centos03 节点上分别启动 Flume 进程,启动命令如下:

> cd /opt/modules/apache-flume-1.8.0-bin
> bin/flume-ng agent  -n agent1 -c conf -f  conf/avro-file-selector-logger.properties -Dflume.root.logger=INFO,console

(2)、启动Flume采集服务

> cd /opt/modules/apache-flume-1.8.0-bin
> bin/flume-ng agent  -n agent1 -c conf -f  conf/taildir-file-selector-avro.properties -Dflume.root.logger=INFO,console

(3)、准备测试数据

进入 centos01 节点所在的 /home/hadoop/data/flume/logs/ 目录,向监控日志 sogou.log 文件添加新数据。

> echo '00:00:00    00717725924582846   [闪字吧]   1 2 www.shanziba.com/' >> sogou.log
> echo '00:00:00    41416219018952116   [霍震霆与朱玲玲照片] 2 6 bbs.gouzai.cn/thread-698736.html' >> sogou.log
> echo '00:00:00    5228056822071097    [75810部队]   14 5    www.greatoo.com/greatoo_cn/' >> sogou.log

通过命令不断向 sogou.log 文件新增数据,如能能在centos02 和 centos03节点的Flume服务控制台,查看到均匀打印的用户日志,说明Flume集群构建成功。

file

二、 采集用户行为数据

利用Flume 构建的日志采集集群将用户行为数据分别采集到Kafka集群和HBase集群,为后续用户行为分析做准备。

2.1 Flume与Kafka集成

通过Flume集群将用户行为数据采集到Kafka集群,这里只需要修改 Flume 聚合节点的配置,因为前面已经构建了Flume集群。

配置Flume聚合服务

分别在 centos02 和 centos03 节点,进入 Flume安装目录下的conf目录,新建一个 avro-file-selector-kafka.properties

$ cd /opt/modules/apache-flume-1.8.0-bin/conf
$ vi avro-file-selector-kafka.properties
# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'agent'
# 定义 Source、Channel、Sink的名称
agent1.sources = r1
agent1.channels = c1
agent1.sinks = k1

# 定义和配置一个 avro Source
agent1.sources.r1.type = avro
agent1.sources.r1.channels = c1
agent1.sources.r1.bind = 0.0.0.0
agent1.sources.r1.port = 1234

# 定义和配置一个file Channel
agent1.channels.c1.type = file
agent1.channels.c1.checkpointDir = /home/hadoop/data/flume/checkpointDir
agent1.channels.c1.dataDirs = /home/hadoop/data/flume/dataDirs

# 定义和配置一个Kafka Sink
agent1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.k1.topic = sogoulogs
agent1.sinks.k1.brokerList = centos01:9092,centos02:9092,centos03:9092
agent1.sinks.k1.producer.acks = 1
agent1.sinks.k1.channel = c1

Flume聚合配置中,Source选择 avro 类型接收发送过来的数据,Channel选择file类型将接收的数据持久化到本地磁盘,Sink选择KafkaSink 类型将数据写入Kafka集群。

Flume与Kafka集成测试

(0)、准备工作:
启动Zookeeper集群#
分别在三个节点执行以下命令,启动Zookeeper集群(需要进入Zookeeper安装目录)

cd /opt/modules/zookeeper-3.5.9/bin
[root@centos01 bin]# ./zkServer.sh start

启动Kafka集群#
分别在三个节点上执行以下命令,启动Kafka集群(需要进入Kafka安装目录)

cd /opt/modules/kafka_2.12-2.5.0
bin/kafka-server-start.sh -daemon config/server.properties

(1)、启动Flume聚合服务
分别在 centos02 和 centos03 节点启动Flume服务

> cd /opt/modules/apache-flume-1.8.0-bin
> bin/flume-ng agent  -n agent1 -c conf -f  conf/avro-file-selector-kafka.properties -Dflume.root.logger=INFO,console

centos02 和 centos03 节点启动:

[root@centos02 apache-flume-1.8.0-bin]# bin/flume-ng agent  -n agent1 -c conf -f  conf/avro-file-selector-kafka.properties -Dflume.root.logger=INFO,console

[root@centos03 apache-flume-1.8.0-bin]# bin/flume-ng agent  -n agent1 -c conf -f  conf/avro-file-selector-kafka.properties -Dflume.root.logger=INFO,console

(2)、启动Flume采集服务
在 centos01 节点上启动Flume服务,启动命令如下:

 > cd /opt/modules/apache-flume-1.8.0-bin
> bin/flume-ng agent  -n agent1 -c conf -f  conf/taildir-file-selector-avro.properties -Dflume.root.logger=INFO,console

(3)、启动Kafka消费者服务
创建topic,通过kafka脚本提前创建 sogoulogs 主题:

> cd /opt/modules/kafka_2.12-2.5.0
> bin/kafka-topics.sh \
--create \
--zookeeper centos01:2181, centos02:2181, centos03:2181 \
--replication-factor 2 \
--partitions 2 \
--topic sogoulogs

在 centos01 节点上启动消费者:

> bin/kafka-console-consumer.sh \
--bootstrap-server centos01:9092,centos02:9092,centos03:9092 \
--topic sogoulogs

(4)、准备测试数据
进入 centos01 节点所在的 /home/hadoop/data/flume/logs/ 目录,向监控日志 sogou.log 文件添加新数据。

> echo '00:00:00    00717725924582846   [wenying]   1 2 www.shanziba.com/' >> sogou.log
> echo '00:00:00    41416219018952116   [霍震霆与朱玲玲照片] 2 6 bbs.gouzai.cn/thread-698736.html' >> sogou.log
> echo '00:00:00    5228056822071097    [8341部队]    14 5    www.greatoo.com/greatoo_cn/' >> sogou.log

file

file

可以看到在centos01节点上启动的 Kafka消费者能打印出采集的数据,说明 Flume 成功采集用户行为数据并写入了 Kafka 集群。

三、创建HBase数据库表

启动HBase集群之前,需要先启动Hadoop集群。由于HBase不依赖Hadoop YARN,因此只启动Hadoop HDFS 即可。
在 centos01节点执行以下命令,启动HDFS:

[root@centos01 ~]# su hadoop
[hadoop@centos01 root]$ cd /opt/modules/hadoop-3.1.3
[hadoop@centos01 hadoop-3.1.3]$ sbin/start-dfs.sh

在centos01节点执行以下命令,启动HBase集群。启动HBase集群的同时,会将ZooKeeper集群也同时启动。

$ cd /opt/modules/hbase-2.2.7
$ su root # 需要切换到root 账号:123456
$ bin/start-hbase.sh

在启动HBase 之后,我们可以通过执行以下命令启动HBase Shell:

$ [root@centos01 hbase-2.2.7]# bin/hbase shell
hbase(main):001:0> 

创建HBase业务表:

hbase(main):001:0> create 'sogoulogs','info'
Created table sogoulogs
Took 5.1336 seconds                                                                                            
=> Hbase::Table - sogoulogs

sougoulogs 为表的名称,info为列簇名称。

为者常成,行者常至