离线数仓项目

一、数据仓库概念

二、项目需求及架构设计

三、数据生成模块

3.1 服务器和 JDK 准备

3.1.1 配置IP、主机名
1
2
3
4
[jajiahao@hadoop102]$ vim /etc/sysconfig/network-scripts/ifcfg-ens33
[jajiahao@hadoop102]$ hostnamectl --static set-hostname hadoop102
[jajiahao@hadoop102]$ vim /etc/hosts
[jajiahao@hadoop102]$ vim /etc/sudoers(jajiahao ALL=(ALL) NOPASSWD:ALL)
3.1.2 编写集群分发脚本 xsync ,放到 /usr/local/bin 目录下
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/bin/bash
#1 获取输入参数个数,如果没有参数,直接退出
pcount=$#
if((pcount==0)); then
echo no args;
exit;
fi

#2 获取文件名称
p1=$1
fname=`basename $p1`
echo fname=$fname

#3 获取上级目录到绝对路径
pdir=`cd -P $(dirname $p1); pwd`
echo pdir=$pdir

#4 获取当前用户名称
user=`whoami`

#5 循环
for((host=103; host<105; host++)); do
echo ------------------- hadoop$host --------------
rsync -rvl $pdir/$fname $user@hadoop$host:$pdir
done

修改执行权限

1
[jajiahao@hadoop102]$ chomd 777 xsync
3.1.3 配置 SSH 免密登录

​ 在家目录创建 .ssh 文件,进入到 .ssh 目录下,生成 公钥和私钥

1
[jajiahao@hadoop102]$ ssh-keygen -t rsa

​ 将 公钥 拷贝到 要免密登录的目标机器上

1
2
3
[jajiahao@hadoop102]$ ssh-copy-id hadoop102
[jajiahao@hadoop102]$ ssh-copy-id hadoop103
[jajiahao@hadoop102]$ ssh-copy-id hadoop104

​ 在 hadoop102 上采用 jajiahao 账号配置一下免密登录到 hadoop102、hadoop103、hadoop104 服务器上

​ 在 hadoop102 上采用 root 账号配置一下免密登录到 hadoop102、hadoop103、hadoop104 服务器上

​ 在 hadoop103 上采用 jajiahao 账号配置一下免密登录到 hadoop102、hadoop103、hadoop104 服务器上

3.1.4 安装 JDK

​ 1)卸载现有 JDK

1
[jajiahao@hadoop102 opt]$ sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps -qa | grep -i java

​ 2)将 jdk 包放入 /opt/software 目录下,解压到 /opt/module 目录下

1
[jajiahao@hadoop102]$ tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/

​ 3)配置 JDK 环境变量,新建 /etc/profile.d/my_env.sh 文件

1
[jajiahao@hadoop102]$ sudo vim /etc/profile.d/my_env.sh

​ 4)添加如下内容,保存退出(:wq)

1
2
3
#JAVA_HOME
export JAVA_HOME=/opt/module/jdk1.8.0_212
export PATH=$PATH:$JAVA_HOME/bin

​ 5)让环境变量生效

1
[jajiahao@hadoop102]$ source /etc/profile.d/my_env.sh

​ 6)测试 JDK 是否安装成功

1
[jajiahao@hadoop102]$ java -version

​ 7)分发 JDK

1
[jajiahao@hadoop102]$ xsync /opt/module/jdk1.8.0_212

​ 8)分发 环境变量配置文件

1
[jajiahao@hadoop102]$ sudo xsync /etc/profile.d/my_env.sh

​ 9)分别在hadoop103、hadoop104上执行 source

3.2 模拟数据

​ 3.2.1 将本地的 application.properties、gmall2020-mock-log-2020-04-01.jar、path2.json 上传到 hadoop102 的 /opt/module/applog 目录下

​ 3.2.2 配置 application.properties 文件,修改业务日期

​ 3.2.3 在 /opt/module/applog 路径下执行日志生成命令

1
[jajiahao@hadoop102 applog]$ java -jar gmall2020-mock-log-2020-04-01.jar

​ 3.2.4 在 /home/jajiahao/bin 目录下编写集群日志生成脚本 lg.sh

1
2
3
4
5
6
7
#!/bin/bash

for i in hadoop102 hadoop103
do
echo "========== $i =========="
ssh $i "cd /opt/module/applog/; java -jar gmall2020-mock-log-2020-04-01.jar >/dev/null 2>&1 &"
done

​ 3.2.5 修改脚本执行权限:chmod 777 lg.sh

​ 3.2.6 在 hadoop102 和 hadoop103 下执行脚本 lg.sh ,在 /opt/module/applog/log 目录下查看生成的数据

四、数据采集模块

4.1 集群所有进程查看脚本

​ 1)新建脚本 xcall.sh

1
[jajiahao@hadoop102 bin]$ vim xcall.sh

​ 2)脚本内容如下:

1
2
3
4
5
6
7
#! /bin/bash

for i in hadoop102 hadoop103 hadoop104
do
echo --------- $i ----------
ssh $i "$*"
done

​ 3)修改脚本执行权限

1
[jajiahao@hadoop102 bin]$ chmod 777 xcall.sh

​ 4)启动脚本

1
[jajiahao@hadoop102 bin]$ xcall.sh jps

4.2 Hadoop 安装部署

4.2.1 Hadoop安装

​ 1)hadoop 3.1.3 安装 配置环境变量,将文件解压到 /opt/module 目录下,修改 my_env.sh 文件

1
sudo vim /etc/profile.d/my_env.sh

​ 2)添加如下内容

1
2
3
4
##HADOOP_HOME
export HADOOP_HOME=/opt/module/hadoop-3.1.3
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin

​ 3)让配置文件生效

1
source /etc/profile.d/my_env.sh
4.2.2 配置集群

​ 1)配置核心文件:core-site.xml

1
2
cd $HADOOP_HOME/etc/hadoop
[jajiahao@hadoop102 hadoop]$ vim core-site.xml

​ 文件内容如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
<configuration>
<!-- 指定NameNode的地址 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://hadoop102:9820</value>
</property>
<!-- 指定hadoop数据的存储目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/module/hadoop-3.1.3/data</value>
</property>

<!-- 配置HDFS网页登录使用的静态用户为jajiahao -->
<property>
<name>hadoop.http.staticuser.user</name>
<value>jajiahao</value>
</property>

<!-- 配置该jajiahao(superUser)允许通过代理访问的主机节点 -->
<property>
<name>hadoop.proxyuser.jajiahao.hosts</name>
<value>*</value>
</property>
<!-- 配置该jajiahao(superUser)允许通过代理用户所属组 -->
<property>
<name>hadoop.proxyuser.jajiahao.groups</name>
<value>*</value>
</property>
<!-- 配置该jajiahao(superUser)允许通过代理的用户-->
<property>
<name>hadoop.proxyuser.jajiahao.groups</name>
<value>*</value>
</property>
</configuration>

​ 2)HDFS 配置文件:hdfs-site.xml

1
[jajiahao@hadoop102 hadoop]$ vim hdfs-site.xml

​ 文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
<configuration>
<!-- nn web端访问地址-->
<property>
<name>dfs.namenode.http-address</name>
<value>hadoop102:9870</value>
</property>

<!-- 2nn web端访问地址-->
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>hadoop104:9868</value>
</property>

<!-- 测试环境指定HDFS副本的数量1 -->
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
</configuration>

​ 3)YARN 配置文件:yarn-site.xml

1
[jajiahao@hadoop102 hadoop]$ vim yarn-site.xml

​ 文件内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
<configuration>
<!-- 指定MR走shuffle -->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>

<!-- 指定ResourceManager的地址-->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>hadoop103</value>
</property>

<!-- 环境变量的继承 -->
<property>
<name>yarn.nodemanager.env-whitelist</name>
<value>JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value>
</property>

<!-- yarn容器允许分配的最大最小内存 -->
<property>
<name>yarn.scheduler.minimum-allocation-mb</name>
<value>512</value>
</property>
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>4096</value>
</property>

<!-- yarn容器允许管理的物理内存大小 -->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>4096</value>
</property>

<!-- 关闭yarn对物理内存和虚拟内存的限制检查 -->
<property>
<name>yarn.nodemanager.pmem-check-enabled</name>
<value>false</value>
</property>
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>
</configuration>

​ 4)MapReduce配置文件:mapred-site.xml

1
[jajiahao@hadoop102 hadoop]$ vim mapred-site.xml

​ 文件内容如下:

1
2
3
4
5
6
7
<configuration>
<!-- 指定MapReduce程序运行在Yarn上 -->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>

​ 5)配置 workers

1
[jajiahao@hadoop102 hadoop]$ vim workers

​ 内容如下:

1
2
3
hadoop102
hadoop103
hadoop104
4.2.3 配置历史服务器

​ 1)配置 mapred-site.xml,添加如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
<!-- 历史服务器端地址 -->
<property>
<name>mapreduce.jobhistory.address</name>
<value>hadoop101:10020</value>
</property>

<!-- 历史服务器web端地址 -->
<property>
<name>mapreduce.jobhistory.webapp.address</name>
<value>hadoop101:19888</value>
</property>

4.2.4 配置日志的聚集,配置 yarn-site.xml ,增加以下内容:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<!-- 开启日志聚集功能 -->
<property>
<name>yarn.log-aggregation-enable</name>
<value>true</value>
</property>

<!-- 设置日志聚集服务器地址 -->
<property>
<name>yarn.log.server.url</name>
<value>http://hadoop102:19888/jobhistory/logs</value>
</property>

<!-- 设置日志保留时间为7天 -->
<property>
<name>yarn.log-aggregation.retain-seconds</name>
<value>604800</value>
</property>
4.2.5 分发 hadoop-3.1.3 、my_env.sh ,使用 source
1
[jajiahao@hadoop102 module]$ xsync hadoop-3.1.3/
4.2.6 启动 hadoop 集群

​ 1)如果集群是第一次启动,需要在hadoop102节点格式化NameNode(注意格式化之前,一定要先停止上次启动的所有namenode和datanode进程,然后再删除data和log数据)

1
2
[jajiahao@hadoop102 hadoop-3.1.3]$ bin/hdfs namenode -format
[jajiahao@hadoop102 hadoop-3.1.3]$ rm -rf data/ logs/

​ 2) 启动 HDFS

1
[jajiahao@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh

​ 3) 在配置了ResourceManager的节点(hadoop103)启动YARN

1
[jajiahao@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh

​ 4) 查看是否启动

1
[jajiahao@hadoop102 hadoop-3.1.3]$ xcall.sh jps
4.2.7 Hadoop 群起脚本
1
2
3
[jajiahao@hadoop102 bin]$ pwd
/home/jajiahao/bin
[jajiahao@hadoop102 bin]$ vim hdp.sh

​ 输入如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
#!/bin/bash
if [ $# -lt 1 ]
then
echo "No Args Input..."
exit ;
fi
case $1 in
"start")
echo " =================== 启动 hadoop集群 ==================="

echo " --------------- 启动 hdfs ---------------"
ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/start-dfs.sh"
echo " --------------- 启动 yarn ---------------"
ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh"
echo " --------------- 启动 historyserver ---------------"
ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon start historyserver"
;;
"stop")
echo " =================== 关闭 hadoop集群 ==================="

echo " --------------- 关闭 historyserver ---------------"
ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon stop historyserver"
echo " --------------- 关闭 yarn ---------------"
ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh"
echo " --------------- 关闭 hdfs ---------------"
ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh"
;;
*)
echo "Input Args Error..."
;;
esac

​ 修改脚本权限

1
[jajiahao@hadoop102 bin]$ chmod 777 hdp.sh

​ 测试脚本 hdp.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
[jajiahao@hadoop102 bin]$ hdp.sh stop
=================== 关闭 hadoop集群 ===================
--------------- 关闭 historyserver ---------------
--------------- 关闭 yarn ---------------
Stopping nodemanagers
Stopping resourcemanager
--------------- 关闭 hdfs ---------------
Stopping namenodes on [hadoop102]
Stopping datanodes
Stopping secondary namenodes [hadoop104]

[jajiahao@hadoop102 bin]$ hdp.sh start
=================== 启动 hadoop集群 ===================
--------------- 启动 hdfs ---------------
Starting namenodes on [hadoop102]
Starting datanodes
Starting secondary namenodes [hadoop104]
--------------- 启动 yarn ---------------
Starting resourcemanager
Starting nodemanagers
--------------- 启动 historyserver ---------------
4.2.8 集群时间同步
4.2.9 LZO 压缩配置
1
2
3
4
# 将jar包移动到 common 目录下
[jajiahao@hadoop102 software]$ mv hadoop-lzo-0.4.20.jar /opt/module/hadoop-3.1.3/share/hadoop/common/
# 分发 jar 包
[jajiahao@hadoop102 common]$ xsync hadoop-lzo-0.4.20.jar

​ 1)修改 core-site.xml 增加配置支持LZO压缩

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
<configuration>
<property>
<name>io.compression.codecs</name>
<value>
org.apache.hadoop.io.compress.GzipCodec,
org.apache.hadoop.io.compress.DefaultCodec,
org.apache.hadoop.io.compress.BZip2Codec,
org.apache.hadoop.io.compress.SnappyCodec,
com.hadoop.compression.lzo.LzoCodec,
com.hadoop.compression.lzo.LzopCodec
</value>
</property>

<property>
<name>io.compression.codec.lzo.class</name>
<value>com.hadoop.compression.lzo.LzoCodec</value>
</property>
</configuration>
1
2
[jajiahao@hadoop102 hadoop]$ vim core-site.xml 
[jajiahao@hadoop102 hadoop]$ xsync core-site.xml

​ 2)测试 LZO 压缩

1
2
3
4
5
# 测试数据准备
[jajiahao@hadoop102 ~]$ hadoop fs -mkdir /input
[jajiahao@hadoop102 hadoop-3.1.3]$ hadoop fs -put README.txt /input
# 测试
[jajiahao@hadoop102 hadoop-3.1.3]$ hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.codec=com.hadoop.compression.lzo.LzopCode /input /output
4.2.10 LZO 创建索引

​ 1)创建LZO文件的索引,LZO压缩文件的可切片特性依赖于其索引,故我们需要手动为LZO压缩文件创建索引。若无索引,则LZO文件的切片只有一个。

1

​ 2)测试

1
2
3
4
5
6
7
8
# 1. 将bigtable.lzo(200M)上传到集群的根目录
[jajiahao@hadoop102 module]$ hadoop fs -put bigtable.lzo /input
# 2. 执行wordcount程序
[jajiahao@hadoop102 module]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount -Dmapreduce.job.inputformat.class=com.hadoop.mapreduce.LzoTextInputFormat /input /output
# 3. 对上传的LZO文件建索引
[jajiahao@hadoop102 module]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /input/bigtable.lzo
# 4. 再次执行WordCount程序
[jajiahao@hadoop102 module]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount -Dmapreduce.job.inputformat.class=com.hadoop.mapreduce.LzoTextInputFormat /input /output2
4.2.11 基准测试

​ 1)测试 HDFS 写性能

​ 测试内容:向 HDFS 集群写 7(CPU核数 -1 )个128M的文件

1
2
3
4
5
6
7
8
9
10
[jajiahao@hadoop102 module]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -write -nrFiles 7 -fileSize 128MB
# 结果
2020-11-04 13:18:25,002 INFO fs.TestDFSIO: ----- TestDFSIO ----- : write
2020-11-04 13:18:25,002 INFO fs.TestDFSIO: Date & time: Wed Nov 04 13:18:25 CST 2020
2020-11-04 13:18:25,002 INFO fs.TestDFSIO: Number of files: 7
2020-11-04 13:18:25,002 INFO fs.TestDFSIO: Total MBytes processed: 896
2020-11-04 13:18:25,002 INFO fs.TestDFSIO: Throughput mb/sec: 44.51
2020-11-04 13:18:25,002 INFO fs.TestDFSIO: Average IO rate mb/sec: 92.26
2020-11-04 13:18:25,002 INFO fs.TestDFSIO: IO rate std deviation: 85.19
2020-11-04 13:18:25,002 INFO fs.TestDFSIO: Test exec time sec: 22.63

​ 注意:

​ Number of files:生成mapTask数量,一般是集群中CPU核数-1,我们测试虚拟机就按照实际的物理内存-1分配即可

​ Total MBytes processed:单个map处理的文件大小

​ Throughput mb/sec :单个mapTak的吞吐量

​ 计算方式:处理的总文件大小/每一个mapTask写数据的时间累加

​ 集群整体吞吐量:生成mapTask数量*单个mapTak的吞吐量

​ Average IO rate mb/sec:单个mapTak的吞吐量

​ 计算方式:每个mapTask处理文件大小/每一个mapTask写数据的时间 累加/生成mapTask数量

​ IO rate std deviation:方差、反映各个mapTask处理的差值,越小越均衡

​ 2)测试 HDFS 读性能

​ 测试内容:读取 HDFS 集群7个128M的文件

1
2
3
4
5
6
7
8
9
10
[jajiahao@hadoop102 module]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -read -nrFiles 7 -fileSize 128MB
# 结果
2020-11-04 13:27:31,475 INFO fs.TestDFSIO: ----- TestDFSIO ----- : read
2020-11-04 13:27:31,475 INFO fs.TestDFSIO: Date & time: Wed Nov 04 13:27:31 CST 2020
2020-11-04 13:27:31,475 INFO fs.TestDFSIO: Number of files: 7
2020-11-04 13:27:31,475 INFO fs.TestDFSIO: Total MBytes processed: 896
2020-11-04 13:27:31,475 INFO fs.TestDFSIO: Throughput mb/sec: 289.31
2020-11-04 13:27:31,475 INFO fs.TestDFSIO: Average IO rate mb/sec: 415.26
2020-11-04 13:27:31,475 INFO fs.TestDFSIO: IO rate std deviation: 289.65
2020-11-04 13:27:31,475 INFO fs.TestDFSIO: Test exec time sec: 17.13

​ 3)删除测试生成数据

1
[jajiahao@hadoop102 module]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -clean

4.3 Zookeeper 安装

4.3.1 集群规划

​ 在hadoop102、hadoop103和hadoop104三个节点上部署Zookeeper

4.3.2 解压安装

​ 1)解压 Zookeeper安装包到 /opt/module/ 目录下

1
[jajiahao@hadoop102 software]$ tar -zxvf zookeeper-3.5.7.tar.gz -C /opt/module/

​ 2)修改 /opt/module/apache-zookeeper-3.5.7-bin名称为zookeeper-3.5.7

1
[jajiahao@hadoop102 module]$ mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7

​ 3)分发 /opt/module/zookeeper-3.5.7 目录内容到 hadoop103、hadoop104

1
[jajiahao@hadoop102 module]$ xsync zookeeper-3.5.7/
4.3.3 ZK集群启动停止脚本

​ 1)在hadoop102的/home/atguigu/bin目录下创建脚本

1
[jajiahao@hadoop102 bin]$ vim zk.sh

​ 脚本内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
#!/bin/bash

case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
echo ---------- zookeeper $i 启动 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start"
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo ---------- zookeeper $i 停止 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop"
done
};;
"status"){
for i in hadoop102 hadoop103 hadoop104
do
echo ---------- zookeeper $i 状态 ------------
ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status"
done
};;
esac

​ 2)增加脚本执行权限

1
[jajiahao@hadoop102 bin]$ chmod 777 zk.sh 

​ 3)脚本测试:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 查看状态
[jajiahao@hadoop102 bin]$ zk.sh status
---------- zookeeper hadoop102 状态 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower
---------- zookeeper hadoop103 状态 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: leader
---------- zookeeper hadoop104 状态 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost.
Mode: follower

# 集群停止
[jajiahao@hadoop102 bin]$ zk.sh stop
---------- zookeeper hadoop102 停止 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
---------- zookeeper hadoop103 停止 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED
---------- zookeeper hadoop104 停止 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Stopping zookeeper ... STOPPED

# 集群启动
[jajiahao@hadoop102 bin]$ zk.sh start
---------- zookeeper hadoop102 启动 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
---------- zookeeper hadoop103 启动 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
---------- zookeeper hadoop104 启动 ------------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

4.4 Kafka 安装部署

​ 1)解压安装包

1
[jajiahao@hadoop102 software]$ tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/

​ 2)修改解压后的文件名称

1
[jajiahao@hadoop102 module]$ mv kafka_2.11-2.4.1/ kafka

​ 3)在 /opt/module/kafka 目录下创建 logs 文件夹

1
2
[jajiahao@hadoop102 module]$ cd kafka/
[jajiahao@hadoop102 kafka]$ mkdir logs

​ 4)修改配置文件

1
2
[jajiahao@hadoop102 kafka]$ cd config/
[jajiahao@hadoop102 config]$ vim server.properties

​ 修改或者增加以下内容:

1
2
3
4
5
6
7
8
# broker 的全局唯一编号,不能重复
broker.id=0
# 删除topic功能使能
delete.topic.enable=true
# kafka运行日志存放的路径
log.dirs=/opt/module/kafka/data
# 配置连接Zookeeper集群地址
zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

​ 5)配置环境变量

1
[jajiahao@hadoop102 module]$ sudo vim /etc/profile.d/my_env.sh

​ 添加如下内容:

1
2
3
#KAFKA_HOME
export KAFKA_HOME=/opt/module/kafka
export PATH=$PATH:$KAFKA_HOME/bin

​ 让环境变量生效

1
[jajiahao@hadoop102 module]$ source /etc/profile.d/my_env.sh

​ 6)分发安装包(注意:分发之后记得配置其他机器的环境变量)

1
[jajiahao@hadoop102 module]$ xsync kafka/

​ 7)分别在hadoop103和hadoop104上修改配置文件 /opt/module/kafka/config/server.properties 中的 broker.id=1、broker.id=2

​ 8)启动集群:依次在hadoop102、hadoop103、hadoop104节点上启动 kafka

1
2
3
[jajiahao@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
[jajiahao@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
[jajiahao@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties

​ 9)关闭集群

1
2
3
[jajiahao@hadoop102 kafka]$ bin/kafka-server-stop.sh
[jajiahao@hadoop103 kafka]$ bin/kafka-server-stop.sh
[jajiahao@hadoop104 kafka]$ bin/kafka-server-stop.sh

​ 10)新建群起脚本,在 /home/jajiahao/bin 目录下创建脚本 kf.sh

1
[jajiahao@hadoop102 bin]$ vim kf.sh

​ 脚本内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#!/bin/bash

case $1 in
"start"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------启动 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties "
done
};;
"stop"){
for i in hadoop102 hadoop103 hadoop104
do
echo " --------停止 $i Kafka-------"
ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop"
done
};;
esac

​ 11)修改脚本执行权限

1
[jajiahao@hadoop102 bin]$ chmod 777 kf.sh

​ 12)脚本测试

1
2
3
4
# kf集群脚本启动
[jajiahao@hadoop102 module]$ kf.sh start
# kf集群脚本停止
[jajiahao@hadoop102 module]$ kf.sh stop

​ 13)Kafka 压力测试

1
2
3
[jajiahao@hadoop102 kafka]$ bin/kafka-producer-perf-test.sh  --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092
# 结果
100000 records sent, 40933.278756 records/sec (3.90 MB/sec), 593.53 ms avg latency, 820.00 ms max latency, 633 ms 50th, 807 ms 95th, 817 ms 99th, 819 ms 99.9th.

​ 14)Kafka Consumer 压力测试

1
2
3
[jajiahao@hadoop102 kafka]$ bin/kafka-consumer-perf-test.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --fetch-size 10000 --messages 10000000 --threads 1
# 结果
2020-11-09 14:32:23:001, 2020-11-09 14:32:35:193, 9.5367, 0.7822, 100000, 8202.0997, 1604903543892, -1604903531700, -0.0000, -0.0001

​ 测试结果说明:

1
2
3
start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
开始测试时间,测试结束时间,共消费数据9.5367MB,吞吐量0.7822MB/s,共消费100000条,平均每秒消费8202.0997条,
再平衡时间(ms),fetch平均时间(ms),fetch平均大小(MB/s),fetch平均条数(条)

4.5 Flume 安装配置

4.5.1 Flume 安装

​ 1) 将 apache-flume-1.9.0-bin.tar.gz 上传到 linux 的 /opt/software 目录下

​ 2)解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下

1
[jajiahao@hadoop102 software]$ tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/module/

​ 3) 修改 apache-flume-1.9.0-bin 的名称为 flume

1
[jajiahao@hadoop102 module]$ mv apache-flume-1.9.0-bin/ flume

​ 4) 将lib文件夹下的guava-11.0.2.jar删除或修改以兼容Hadoop 3.1.3

1
2
[jajiahao@hadoop102 lib]$ mv guava-11.0.2.jar guava-11.0.2.jar.bak
[jajiahao@hadoop102 lib]$ rm -rf guava-11.0.2.jar.bak

​ 5) 将 flume/conf 下的 flume-env.sh.template 文件修改为 flume-env.sh,并配置 flume-env.sh 文件

1
2
[jajiahao@hadoop102 conf]$ mv flume-env.sh.template flume-env.sh
[jajiahao@hadoop102 conf]$ vim flume-env.sh

​ 添加如下内容:

1
export JAVA_HOME=/opt/module/jdk1.8.0_212

​ 6) 分发flume

1
[jajiahao@hadoop102 module]$ xsync flume/
4.5.2 Flume 配置

​ 1) 在 /opt/module/flume/conf 目录下创建 file-flume-kafka.conf 文件

1
[jajiahao@hadoop102 conf]$ vim file_flume_kafka.conf

​ 在文件配置如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
# 为各组件命名
a1.sources = r1
a1.channels = c1

# 描述source
a1.sources.r1.type = TAILDIR
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.*
a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = com.jajiahao.flume.interceptor.ETLInterceptor$Builder

# 描述channel
a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092
a1.channels.c1.kafka.topic = topic_log
a1.channels.c1.parseAsFlumeEvent = false

# 绑定source和channel以及sink和channel的关系
a1.sources.r1.channels = c1

​ 注:com.atguigu.flume.interceptor.ETLInterceptor是自定义的拦截器的全类名,需要根据用户自定义的拦截器做相应修改

4.5.3 Flume 拦截器

​ 1)创建Maven工程flume-interceptor

​ 2) 创建包名:com.atguigu.flume.interceptor

​ 3) 在pom.xml 文件中添加如下配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
<dependencies>
<dependency>
<groupId>org.apache.flume</groupId>
<artifactId>flume-ng-core</artifactId>
<version>1.9.0</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.62</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

​ 4) 在com.atguigu.flume.interceptor包下创建 JSONUtils 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.jajiahao.flume.interceptor;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONException;

public class JSONUtils {
public static boolean isValidate(String log) {
try{
JSON.parse(log);
return true;
}catch(JSONException e){
return false;
}
}
}

​ 5)在 com.atguigu.flume.interceptor 包下创建 ETLInterceptor 类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.jajiahao.flume.interceptor;

import org.apache.flume.Context;
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;

import java.nio.charset.StandardCharsets;
import java.util.Iterator;
import java.util.List;

public class ETLInterceptor implements Interceptor {

@Override
public void initialize() {

}

@Override
public Event intercept(Event event) {
// 1. 获取数据
byte[] body = event.getBody();
String log = new String(body, StandardCharsets.UTF_8);
// 2. 校验
if(JSONUtils.isValidate(log)){
return event;
}
return null;
}

@Override
public List<Event> intercept(List<Event> list) {

Iterator<Event> iterator = list.iterator();

while(iterator.hasNext()){
Event next = iterator.next();
if(intercept(next)==null){
iterator.remove();
}
}

return list;
}

@Override
public void close() {

}

public static class Builder implements Interceptor.Builder{

@Override
public Interceptor build() {
return new ETLInterceptor();
}

@Override
public void configure(Context context) {

}
}
}

​ 6)打包得到 flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar 文件

​ 7)将打好的包放入到 hadoop102 的 /opt/module/flume/lib 目录下

1
2
[jajiahao@hadoop102 lib]$ ls | grep inter
flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar

​ 8)分发 jar包 到hadoop103、hadoop104

1
[jajiahao@hadoop102 lib]$ xsync flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
4.5.4 Flume 启动停止脚本

​ 1)在/home/atguigu/bin目录下创建脚本 f1.sh

1
[jajiahao@hadoop102 bin]$ vim f1.sh

​ 在脚本中填写如下内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
#! /bin/bash

case $1 in
"start"){
for i in hadoop102 hadoop103
do
echo " --------启动 $i 采集flume-------"
ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1 &"
done
};;
"stop"){
for i in hadoop102 hadoop103
do
echo " --------停止 $i 采集flume-------"
ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 "
done

};;
esac

​ 2)增加脚本执行权限

1
[jajiahao@hadoop102 bin]$ chmod 777 f1.sh 

​ 3)f1 集群启动脚本

1
2
3
[jajiahao@hadoop102 module]$ f1.sh start
--------启动 hadoop102 采集flume-------
--------启动 hadoop103 采集flume-------

​ 4)f1 集群停止脚本

1
[jajiahao@hadoop102 module]$ f1.sh stop