离线数仓项目 一、数据仓库概念 二、项目需求及架构设计 三、数据生成模块 3.1 服务器和 JDK 准备 3.1.1 配置IP、主机名 1 2 3 4 [jajiahao@hadoop102]$ vim /etc/sysconfig/network-scripts/ifcfg-ens33 [jajiahao@hadoop102]$ hostnamectl --static set-hostname hadoop102 [jajiahao@hadoop102]$ vim /etc/hosts [jajiahao@hadoop102]$ vim /etc/sudoers(jajiahao ALL=(ALL) NOPASSWD:ALL)
3.1.2 编写集群分发脚本 xsync ,放到 /usr/local/bin 目录下 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 # !/bin/bash # 1 获取输入参数个数,如果没有参数,直接退出 pcount=$# if((pcount==0)); then echo no args; exit; fi # 2 获取文件名称 p1=$1 fname=`basename $p1` echo fname=$fname # 3 获取上级目录到绝对路径 pdir=`cd -P $(dirname $p1); pwd` echo pdir=$pdir # 4 获取当前用户名称 user=`whoami` # 5 循环 for((host=103; host<105; host++)); do echo ------------------- hadoop$host -------------- rsync -rvl $pdir/$fname $user@hadoop$host:$pdir done
修改执行权限
1 [jajiahao@hadoop102]$ chomd 777 xsync
3.1.3 配置 SSH 免密登录 在家目录创建 .ssh 文件,进入到 .ssh 目录下,生成 公钥和私钥
1 [jajiahao@hadoop102]$ ssh-keygen -t rsa
将 公钥 拷贝到 要免密登录的目标机器上
1 2 3 [jajiahao@hadoop102]$ ssh-copy-id hadoop102 [jajiahao@hadoop102]$ ssh-copy-id hadoop103 [jajiahao@hadoop102]$ ssh-copy-id hadoop104
在 hadoop102 上采用 jajiahao 账号配置一下免密登录到 hadoop102、hadoop103、hadoop104 服务器上
在 hadoop102 上采用 root 账号配置一下免密登录到 hadoop102、hadoop103、hadoop104 服务器上
在 hadoop103 上采用 jajiahao 账号配置一下免密登录到 hadoop102、hadoop103、hadoop104 服务器上
3.1.4 安装 JDK 1)卸载现有 JDK
1 [jajiahao@hadoop102 opt]$ sudo rpm -qa | grep -i java | xargs -n1 sudo rpm -e --nodeps -qa | grep -i java
2)将 jdk 包放入 /opt/software 目录下,解压到 /opt/module 目录下
1 [jajiahao@hadoop102]$ tar -zxvf jdk-8u212-linux-x64.tar.gz -C /opt/module/
3)配置 JDK 环境变量,新建 /etc/profile.d/my_env.sh 文件
1 [jajiahao@hadoop102]$ sudo vim /etc/profile.d/my_env.sh
4)添加如下内容,保存退出(:wq)
1 2 3 # JAVA_HOME export JAVA_HOME=/opt/module/jdk1.8.0_212 export PATH=$PATH:$JAVA_HOME/bin
5)让环境变量生效
1 [jajiahao@hadoop102]$ source /etc/profile.d/my_env.sh
6)测试 JDK 是否安装成功
1 [jajiahao@hadoop102]$ java -version
7)分发 JDK
1 [jajiahao@hadoop102]$ xsync /opt/module/jdk1.8.0_212
8)分发 环境变量配置文件
1 [jajiahao@hadoop102]$ sudo xsync /etc/profile.d/my_env.sh
9)分别在hadoop103、hadoop104上执行 source
3.2 模拟数据 3.2.1 将本地的 application.properties、gmall2020-mock-log-2020-04-01.jar、path2.json 上传到 hadoop102 的 /opt/module/applog 目录下
3.2.2 配置 application.properties 文件,修改业务日期
3.2.3 在 /opt/module/applog 路径下执行日志生成命令
1 [jajiahao@hadoop102 applog]$ java -jar gmall2020-mock-log-2020-04-01.jar
3.2.4 在 /home/jajiahao/bin 目录下编写集群日志生成脚本 lg.sh
1 2 3 4 5 6 7 # !/bin/bash for i in hadoop102 hadoop103 do echo "========== $i ==========" ssh $i "cd /opt/module/applog/; java -jar gmall2020-mock-log-2020-04-01.jar >/dev/null 2>&1 &" done
3.2.5 修改脚本执行权限:chmod 777 lg.sh
3.2.6 在 hadoop102 和 hadoop103 下执行脚本 lg.sh ,在 /opt/module/applog/log 目录下查看生成的数据
四、数据采集模块 4.1 集群所有进程查看脚本 1)新建脚本 xcall.sh
1 [jajiahao@hadoop102 bin]$ vim xcall.sh
2)脚本内容如下:
1 2 3 4 5 6 7 # ! /bin/bash for i in hadoop102 hadoop103 hadoop104 do echo --------- $i ---------- ssh $i "$*" done
3)修改脚本执行权限
1 [jajiahao@hadoop102 bin]$ chmod 777 xcall.sh
4)启动脚本
1 [jajiahao@hadoop102 bin]$ xcall.sh jps
4.2 Hadoop 安装部署 4.2.1 Hadoop安装 1)hadoop 3.1.3 安装 配置环境变量,将文件解压到 /opt/module 目录下,修改 my_env.sh 文件
1 sudo vim /etc/profile.d/my_env.sh
2)添加如下内容
1 2 3 4 # export HADOOP_HOME=/opt/module/hadoop-3.1.3 export PATH=$PATH:$HADOOP_HOME/bin export PATH=$PATH:$HADOOP_HOME/sbin
3)让配置文件生效
1 source /etc/profile.d/my_env.sh
4.2.2 配置集群 1)配置核心文件:core-site.xml
1 2 cd $HADOOP_HOME/etc/hadoop [jajiahao@hadoop102 hadoop]$ vim core-site.xml
文件内容如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 <configuration > <property > <name > fs.defaultFS</name > <value > hdfs://hadoop102:9820</value > </property > <property > <name > hadoop.tmp.dir</name > <value > /opt/module/hadoop-3.1.3/data</value > </property > <property > <name > hadoop.http.staticuser.user</name > <value > jajiahao</value > </property > <property > <name > hadoop.proxyuser.jajiahao.hosts</name > <value > *</value > </property > <property > <name > hadoop.proxyuser.jajiahao.groups</name > <value > *</value > </property > <property > <name > hadoop.proxyuser.jajiahao.groups</name > <value > *</value > </property > </configuration >
2)HDFS 配置文件:hdfs-site.xml
1 [jajiahao@hadoop102 hadoop]$ vim hdfs-site.xml
文件内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 <configuration > <property > <name > dfs.namenode.http-address</name > <value > hadoop102:9870</value > </property > <property > <name > dfs.namenode.secondary.http-address</name > <value > hadoop104:9868</value > </property > <property > <name > dfs.replication</name > <value > 1</value > </property > </configuration >
3)YARN 配置文件:yarn-site.xml
1 [jajiahao@hadoop102 hadoop]$ vim yarn-site.xml
文件内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 <configuration > <property > <name > yarn.nodemanager.aux-services</name > <value > mapreduce_shuffle</value > </property > <property > <name > yarn.resourcemanager.hostname</name > <value > hadoop103</value > </property > <property > <name > yarn.nodemanager.env-whitelist</name > <value > JAVA_HOME,HADOOP_COMMON_HOME,HADOOP_HDFS_HOME,HADOOP_CONF_DIR,CLASSPATH_PREPEND_DISTCACHE,HADOOP_YARN_HOME,HADOOP_MAPRED_HOME</value > </property > <property > <name > yarn.scheduler.minimum-allocation-mb</name > <value > 512</value > </property > <property > <name > yarn.scheduler.maximum-allocation-mb</name > <value > 4096</value > </property > <property > <name > yarn.nodemanager.resource.memory-mb</name > <value > 4096</value > </property > <property > <name > yarn.nodemanager.pmem-check-enabled</name > <value > false</value > </property > <property > <name > yarn.nodemanager.vmem-check-enabled</name > <value > false</value > </property > </configuration >
4)MapReduce配置文件:mapred-site.xml
1 [jajiahao@hadoop102 hadoop]$ vim mapred-site.xml
文件内容如下:
1 2 3 4 5 6 7 <configuration > <property > <name > mapreduce.framework.name</name > <value > yarn</value > </property > </configuration >
5)配置 workers
1 [jajiahao@hadoop102 hadoop]$ vim workers
内容如下:
1 2 3 hadoop102 hadoop103 hadoop104
4.2.3 配置历史服务器 1)配置 mapred-site.xml,添加如下内容:
1 2 3 4 5 6 7 8 9 10 11 12 <property > <name > mapreduce.jobhistory.address</name > <value > hadoop101:10020</value > </property > <property > <name > mapreduce.jobhistory.webapp.address</name > <value > hadoop101:19888</value > </property >
4.2.4 配置日志的聚集,配置 yarn-site.xml ,增加以下内容: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 <property > <name > yarn.log-aggregation-enable</name > <value > true</value > </property > <property > <name > yarn.log.server.url</name > <value > http://hadoop102:19888/jobhistory/logs</value > </property > <property > <name > yarn.log-aggregation.retain-seconds</name > <value > 604800</value > </property >
4.2.5 分发 hadoop-3.1.3 、my_env.sh ,使用 source 1 [jajiahao@hadoop102 module]$ xsync hadoop-3.1.3/
4.2.6 启动 hadoop 集群 1)如果集群是第一次启动,需要在hadoop102节点格式化NameNode(注意格式化之前,一定要先停止上次启动的所有namenode和datanode进程,然后再删除data和log数据)
1 2 [jajiahao@hadoop102 hadoop-3.1.3]$ bin/hdfs namenode -format [jajiahao@hadoop102 hadoop-3.1.3]$ rm -rf data/ logs/
2) 启动 HDFS
1 [jajiahao@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh
3) 在配置了ResourceManager的节点(hadoop103)启动YARN
1 [jajiahao@hadoop103 hadoop-3.1.3]$ sbin/start-yarn.sh
4) 查看是否启动
1 [jajiahao@hadoop102 hadoop-3.1.3]$ xcall.sh jps
4.2.7 Hadoop 群起脚本 1 2 3 [jajiahao@hadoop102 bin]$ pwd /home/jajiahao/bin [jajiahao@hadoop102 bin]$ vim hdp.sh
输入如下内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 # !/bin/bash if [ $# -lt 1 ] then echo "No Args Input..." exit ; fi case $1 in "start") echo " =================== 启动 hadoop集群 ===================" echo " --------------- 启动 hdfs ---------------" ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/start-dfs.sh" echo " --------------- 启动 yarn ---------------" ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/start-yarn.sh" echo " --------------- 启动 historyserver ---------------" ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon start historyserver" ;; "stop") echo " =================== 关闭 hadoop集群 ===================" echo " --------------- 关闭 historyserver ---------------" ssh hadoop102 "/opt/module/hadoop-3.1.3/bin/mapred --daemon stop historyserver" echo " --------------- 关闭 yarn ---------------" ssh hadoop103 "/opt/module/hadoop-3.1.3/sbin/stop-yarn.sh" echo " --------------- 关闭 hdfs ---------------" ssh hadoop102 "/opt/module/hadoop-3.1.3/sbin/stop-dfs.sh" ;; *) echo "Input Args Error..." ;; esac
修改脚本权限
1 [jajiahao@hadoop102 bin]$ chmod 777 hdp.sh
测试脚本 hdp.sh
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 [jajiahao@hadoop102 bin]$ hdp.sh stop =================== 关闭 hadoop集群 =================== --------------- 关闭 historyserver --------------- --------------- 关闭 yarn --------------- Stopping nodemanagers Stopping resourcemanager --------------- 关闭 hdfs --------------- Stopping namenodes on [hadoop102] Stopping datanodes Stopping secondary namenodes [hadoop104] [jajiahao@hadoop102 bin]$ hdp.sh start =================== 启动 hadoop集群 =================== --------------- 启动 hdfs --------------- Starting namenodes on [hadoop102] Starting datanodes Starting secondary namenodes [hadoop104] --------------- 启动 yarn --------------- Starting resourcemanager Starting nodemanagers --------------- 启动 historyserver ---------------
4.2.8 集群时间同步 4.2.9 LZO 压缩配置 1 2 3 4 # 将jar包移动到 common 目录下 [jajiahao@hadoop102 software]$ mv hadoop-lzo-0.4.20.jar /opt/module/hadoop-3.1.3/share/hadoop/common/ # 分发 jar 包 [jajiahao@hadoop102 common]$ xsync hadoop-lzo-0.4.20.jar
1)修改 core-site.xml 增加配置支持LZO压缩
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 <configuration > <property > <name > io.compression.codecs</name > <value > org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.BZip2Codec, org.apache.hadoop.io.compress.SnappyCodec, com.hadoop.compression.lzo.LzoCodec, com.hadoop.compression.lzo.LzopCodec </value > </property > <property > <name > io.compression.codec.lzo.class</name > <value > com.hadoop.compression.lzo.LzoCodec</value > </property > </configuration >
1 2 [jajiahao@hadoop102 hadoop]$ vim core-site.xml [jajiahao@hadoop102 hadoop]$ xsync core-site.xml
2)测试 LZO 压缩
1 2 3 4 5 # 测试数据准备 [jajiahao@hadoop102 ~]$ hadoop fs -mkdir /input [jajiahao@hadoop102 hadoop-3.1.3]$ hadoop fs -put README.txt /input # 测试 [jajiahao@hadoop102 hadoop-3.1.3]$ hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount -Dmapreduce.output.fileoutputformat.compress=true -Dmapreduce.output.fileoutputformat.codec=com.hadoop.compression.lzo.LzopCode /input /output
4.2.10 LZO 创建索引 1)创建LZO文件的索引,LZO压缩文件的可切片特性依赖于其索引,故我们需要手动为LZO压缩文件创建索引。若无索引,则LZO文件的切片只有一个。
2)测试
1 2 3 4 5 6 7 8 # 1. 将bigtable.lzo(200M)上传到集群的根目录 [jajiahao@hadoop102 module]$ hadoop fs -put bigtable.lzo /input # 2. 执行wordcount程序 [jajiahao@hadoop102 module]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount -Dmapreduce.job.inputformat.class=com.hadoop.mapreduce.LzoTextInputFormat /input /output # 3. 对上传的LZO文件建索引 [jajiahao@hadoop102 module]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/common/hadoop-lzo-0.4.20.jar com.hadoop.compression.lzo.DistributedLzoIndexer /input/bigtable.lzo # 4. 再次执行WordCount程序 [jajiahao@hadoop102 module]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.1.3.jar wordcount -Dmapreduce.job.inputformat.class=com.hadoop.mapreduce.LzoTextInputFormat /input /output2
4.2.11 基准测试 1)测试 HDFS 写性能
测试内容:向 HDFS 集群写 7(CPU核数 -1 )个128M的文件
1 2 3 4 5 6 7 8 9 10 [jajiahao@hadoop102 module]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -write -nrFiles 7 -fileSize 128MB # 结果 2020-11-04 13:18:25,002 INFO fs.TestDFSIO: ----- TestDFSIO ----- : write 2020-11-04 13:18:25,002 INFO fs.TestDFSIO: Date & time: Wed Nov 04 13:18:25 CST 2020 2020-11-04 13:18:25,002 INFO fs.TestDFSIO: Number of files: 7 2020-11-04 13:18:25,002 INFO fs.TestDFSIO: Total MBytes processed: 896 2020-11-04 13:18:25,002 INFO fs.TestDFSIO: Throughput mb/sec: 44.51 2020-11-04 13:18:25,002 INFO fs.TestDFSIO: Average IO rate mb/sec: 92.26 2020-11-04 13:18:25,002 INFO fs.TestDFSIO: IO rate std deviation: 85.19 2020-11-04 13:18:25,002 INFO fs.TestDFSIO: Test exec time sec: 22.63
注意:
Number of files:生成mapTask数量,一般是集群中CPU核数-1,我们测试虚拟机就按照实际的物理内存-1分配即可
Total MBytes processed:单个map处理的文件大小
Throughput mb/sec :单个mapTak的吞吐量
计算方式:处理的总文件大小/每一个mapTask写数据的时间累加
集群整体吞吐量:生成mapTask数量*单个mapTak的吞吐量
Average IO rate mb/sec:单个mapTak的吞吐量
计算方式:每个mapTask处理文件大小/每一个mapTask写数据的时间 累加/生成mapTask数量
IO rate std deviation:方差、反映各个mapTask处理的差值,越小越均衡
2)测试 HDFS 读性能
测试内容:读取 HDFS 集群7个128M的文件
1 2 3 4 5 6 7 8 9 10 [jajiahao@hadoop102 module]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -read -nrFiles 7 -fileSize 128MB # 结果 2020-11-04 13:27:31,475 INFO fs.TestDFSIO: ----- TestDFSIO ----- : read 2020-11-04 13:27:31,475 INFO fs.TestDFSIO: Date & time: Wed Nov 04 13:27:31 CST 2020 2020-11-04 13:27:31,475 INFO fs.TestDFSIO: Number of files: 7 2020-11-04 13:27:31,475 INFO fs.TestDFSIO: Total MBytes processed: 896 2020-11-04 13:27:31,475 INFO fs.TestDFSIO: Throughput mb/sec: 289.31 2020-11-04 13:27:31,475 INFO fs.TestDFSIO: Average IO rate mb/sec: 415.26 2020-11-04 13:27:31,475 INFO fs.TestDFSIO: IO rate std deviation: 289.65 2020-11-04 13:27:31,475 INFO fs.TestDFSIO: Test exec time sec: 17.13
3)删除测试生成数据
1 [jajiahao@hadoop102 module]$ hadoop jar /opt/module/hadoop-3.1.3/share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -clean
4.3 Zookeeper 安装 4.3.1 集群规划 在hadoop102、hadoop103和hadoop104三个节点上部署Zookeeper
4.3.2 解压安装 1)解压 Zookeeper安装包到 /opt/module/ 目录下
1 [jajiahao@hadoop102 software]$ tar -zxvf zookeeper-3.5.7.tar.gz -C /opt/module/
2)修改 /opt/module/apache-zookeeper-3.5.7-bin名称为zookeeper-3.5.7
1 [jajiahao@hadoop102 module]$ mv apache-zookeeper-3.5.7-bin/ zookeeper-3.5.7
3)分发 /opt/module/zookeeper-3.5.7 目录内容到 hadoop103、hadoop104
1 [jajiahao@hadoop102 module]$ xsync zookeeper-3.5.7/
4.3.3 ZK集群启动停止脚本 1)在hadoop102的/home/atguigu/bin目录下创建脚本
1 [jajiahao@hadoop102 bin]$ vim zk.sh
脚本内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 # !/bin/bash case $1 in "start"){ for i in hadoop102 hadoop103 hadoop104 do echo ---------- zookeeper $i 启动 ------------ ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh start" done };; "stop"){ for i in hadoop102 hadoop103 hadoop104 do echo ---------- zookeeper $i 停止 ------------ ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh stop" done };; "status"){ for i in hadoop102 hadoop103 hadoop104 do echo ---------- zookeeper $i 状态 ------------ ssh $i "/opt/module/zookeeper-3.5.7/bin/zkServer.sh status" done };; esac
2)增加脚本执行权限
1 [jajiahao@hadoop102 bin]$ chmod 777 zk.sh
3)脚本测试:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 # 查看状态 [jajiahao@hadoop102 bin]$ zk.sh status ---------- zookeeper hadoop102 状态 ------------ ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: follower ---------- zookeeper hadoop103 状态 ------------ ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: leader ---------- zookeeper hadoop104 状态 ------------ ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Client port found: 2181. Client address: localhost. Mode: follower # 集群停止 [jajiahao@hadoop102 bin]$ zk.sh stop ---------- zookeeper hadoop102 停止 ------------ ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Stopping zookeeper ... STOPPED ---------- zookeeper hadoop103 停止 ------------ ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Stopping zookeeper ... STOPPED ---------- zookeeper hadoop104 停止 ------------ ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Stopping zookeeper ... STOPPED # 集群启动 [jajiahao@hadoop102 bin]$ zk.sh start ---------- zookeeper hadoop102 启动 ------------ ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Starting zookeeper ... STARTED ---------- zookeeper hadoop103 启动 ------------ ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Starting zookeeper ... STARTED ---------- zookeeper hadoop104 启动 ------------ ZooKeeper JMX enabled by default Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg Starting zookeeper ... STARTED
4.4 Kafka 安装部署 1)解压安装包
1 [jajiahao@hadoop102 software]$ tar -zxvf kafka_2.11-2.4.1.tgz -C /opt/module/
2)修改解压后的文件名称
1 [jajiahao@hadoop102 module]$ mv kafka_2.11-2.4.1/ kafka
3)在 /opt/module/kafka 目录下创建 logs 文件夹
1 2 [jajiahao@hadoop102 module]$ cd kafka/ [jajiahao@hadoop102 kafka]$ mkdir logs
4)修改配置文件
1 2 [jajiahao@hadoop102 kafka]$ cd config/ [jajiahao@hadoop102 config]$ vim server.properties
修改或者增加以下内容:
1 2 3 4 5 6 7 8 # broker 的全局唯一编号,不能重复 broker.id=0 # 删除topic功能使能 delete.topic.enable=true # kafka运行日志存放的路径 log.dirs=/opt/module/kafka/data # 配置连接Zookeeper集群地址 zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka
5)配置环境变量
1 [jajiahao@hadoop102 module]$ sudo vim /etc/profile.d/my_env.sh
添加如下内容:
1 2 3 # KAFKA_HOME export KAFKA_HOME=/opt/module/kafka export PATH=$PATH:$KAFKA_HOME/bin
让环境变量生效
1 [jajiahao@hadoop102 module]$ source /etc/profile.d/my_env.sh
6)分发安装包(注意:分发之后记得配置其他机器的环境变量)
1 [jajiahao@hadoop102 module]$ xsync kafka/
7)分别在hadoop103和hadoop104上修改配置文件 /opt/module/kafka/config/server.properties 中的 broker.id=1、broker.id=2
8)启动集群:依次在hadoop102、hadoop103、hadoop104节点上启动 kafka
1 2 3 [jajiahao@hadoop102 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties [jajiahao@hadoop103 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties [jajiahao@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties
9)关闭集群
1 2 3 [jajiahao@hadoop102 kafka]$ bin/kafka-server-stop.sh [jajiahao@hadoop103 kafka]$ bin/kafka-server-stop.sh [jajiahao@hadoop104 kafka]$ bin/kafka-server-stop.sh
10)新建群起脚本,在 /home/jajiahao/bin 目录下创建脚本 kf.sh
1 [jajiahao@hadoop102 bin]$ vim kf.sh
脚本内容如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 # !/bin/bash case $1 in "start"){ for i in hadoop102 hadoop103 hadoop104 do echo " --------启动 $i Kafka-------" ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties " done };; "stop"){ for i in hadoop102 hadoop103 hadoop104 do echo " --------停止 $i Kafka-------" ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh stop" done };; esac
11)修改脚本执行权限
1 [jajiahao@hadoop102 bin]$ chmod 777 kf.sh
12)脚本测试
1 2 3 4 # kf集群脚本启动 [jajiahao@hadoop102 module]$ kf.sh start # kf集群脚本停止 [jajiahao@hadoop102 module]$ kf.sh stop
13)Kafka 压力测试
1 2 3 [jajiahao@hadoop102 kafka]$ bin/kafka-producer-perf-test.sh --topic test --record-size 100 --num-records 100000 --throughput -1 --producer-props bootstrap.servers=hadoop102:9092,hadoop103:9092,hadoop104:9092 # 结果 100000 records sent, 40933.278756 records/sec (3.90 MB/sec), 593.53 ms avg latency, 820.00 ms max latency, 633 ms 50th, 807 ms 95th, 817 ms 99th, 819 ms 99.9th.
14)Kafka Consumer 压力测试
1 2 3 [jajiahao@hadoop102 kafka]$ bin/kafka-consumer-perf-test.sh --broker-list hadoop102:9092,hadoop103:9092,hadoop104:9092 --topic test --fetch-size 10000 --messages 10000000 --threads 1 # 结果 2020-11-09 14:32:23:001, 2020-11-09 14:32:35:193, 9.5367, 0.7822, 100000, 8202.0997, 1604903543892, -1604903531700, -0.0000, -0.0001
测试结果说明:
1 2 3 start.time, end.time, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec 开始测试时间,测试结束时间,共消费数据9.5367MB,吞吐量0.7822MB/s,共消费100000条,平均每秒消费8202.0997条, 再平衡时间(ms),fetch平均时间(ms),fetch平均大小(MB/s),fetch平均条数(条)
4.5 Flume 安装配置 4.5.1 Flume 安装 1) 将 apache-flume-1.9.0-bin.tar.gz 上传到 linux 的 /opt/software 目录下
2)解压apache-flume-1.9.0-bin.tar.gz到/opt/module/目录下
1 [jajiahao@hadoop102 software]$ tar -zxvf apache-flume-1.9.0-bin.tar.gz -C /opt/module/
3) 修改 apache-flume-1.9.0-bin 的名称为 flume
1 [jajiahao@hadoop102 module]$ mv apache-flume-1.9.0-bin/ flume
4) 将lib文件夹下的guava-11.0.2.jar删除或修改以兼容Hadoop 3.1.3
1 2 [jajiahao@hadoop102 lib]$ mv guava-11.0.2.jar guava-11.0.2.jar.bak [jajiahao@hadoop102 lib]$ rm -rf guava-11.0.2.jar.bak
5) 将 flume/conf 下的 flume-env.sh.template 文件修改为 flume-env.sh,并配置 flume-env.sh 文件
1 2 [jajiahao@hadoop102 conf]$ mv flume-env.sh.template flume-env.sh [jajiahao@hadoop102 conf]$ vim flume-env.sh
添加如下内容:
1 export JAVA_HOME=/opt/module/jdk1.8.0_212
6) 分发flume
1 [jajiahao@hadoop102 module]$ xsync flume/
4.5.2 Flume 配置 1) 在 /opt/module/flume/conf 目录下创建 file-flume-kafka.conf 文件
1 [jajiahao@hadoop102 conf]$ vim file_flume_kafka.conf
在文件配置如下内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 # 为各组件命名 a1.sources = r1 a1.channels = c1 # 描述source a1.sources.r1.type = TAILDIR a1.sources.r1.filegroups = f1 a1.sources.r1.filegroups.f1 = /opt/module/applog/log/app.* a1.sources.r1.positionFile = /opt/module/flume/taildir_position.json a1.sources.r1.interceptors = i1 a1.sources.r1.interceptors.i1.type = com.jajiahao.flume.interceptor.ETLInterceptor$Builder # 描述channel a1.channels.c1.type = org.apache.flume.channel.kafka.KafkaChannel a1.channels.c1.kafka.bootstrap.servers = hadoop102:9092,hadoop103:9092 a1.channels.c1.kafka.topic = topic_log a1.channels.c1.parseAsFlumeEvent = false # 绑定source 和channel以及sink和channel的关系 a1.sources.r1.channels = c1
注:com.atguigu.flume.interceptor.ETLInterceptor是自定义的拦截器的全类名,需要根据用户自定义的拦截器做相应修改
4.5.3 Flume 拦截器 1)创建Maven工程flume-interceptor
2) 创建包名:com.atguigu.flume.interceptor
3) 在pom.xml 文件中添加如下配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 <dependencies > <dependency > <groupId > org.apache.flume</groupId > <artifactId > flume-ng-core</artifactId > <version > 1.9.0</version > <scope > provided</scope > </dependency > <dependency > <groupId > com.alibaba</groupId > <artifactId > fastjson</artifactId > <version > 1.2.62</version > </dependency > </dependencies > <build > <plugins > <plugin > <artifactId > maven-compiler-plugin</artifactId > <version > 2.3.2</version > <configuration > <source > 1.8</source > <target > 1.8</target > </configuration > </plugin > <plugin > <artifactId > maven-assembly-plugin</artifactId > <configuration > <descriptorRefs > <descriptorRef > jar-with-dependencies</descriptorRef > </descriptorRefs > </configuration > <executions > <execution > <id > make-assembly</id > <phase > package</phase > <goals > <goal > single</goal > </goals > </execution > </executions > </plugin > </plugins > </build >
4) 在com.atguigu.flume.interceptor包下创建 JSONUtils 类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 package com.jajiahao.flume.interceptor;import com.alibaba.fastjson.JSON;import com.alibaba.fastjson.JSONException;public class JSONUtils { public static boolean isValidate (String log) { try { JSON.parse(log); return true ; }catch (JSONException e){ return false ; } } }
5)在 com.atguigu.flume.interceptor 包下创建 ETLInterceptor 类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 package com.jajiahao.flume.interceptor;import org.apache.flume.Context;import org.apache.flume.Event;import org.apache.flume.interceptor.Interceptor;import java.nio.charset.StandardCharsets;import java.util.Iterator;import java.util.List;public class ETLInterceptor implements Interceptor { @Override public void initialize () { } @Override public Event intercept (Event event) { byte [] body = event.getBody(); String log = new String(body, StandardCharsets.UTF_8); if (JSONUtils.isValidate(log)){ return event; } return null ; } @Override public List<Event> intercept (List<Event> list) { Iterator<Event> iterator = list.iterator(); while (iterator.hasNext()){ Event next = iterator.next(); if (intercept(next)==null ){ iterator.remove(); } } return list; } @Override public void close () { } public static class Builder implements Interceptor .Builder { @Override public Interceptor build () { return new ETLInterceptor(); } @Override public void configure (Context context) { } } }
6)打包得到 flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar 文件
7)将打好的包放入到 hadoop102 的 /opt/module/flume/lib 目录下
1 2 [jajiahao@hadoop102 lib]$ ls | grep inter flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
8)分发 jar包 到hadoop103、hadoop104
1 [jajiahao@hadoop102 lib]$ xsync flume-interceptor-1.0-SNAPSHOT-jar-with-dependencies.jar
4.5.4 Flume 启动停止脚本 1)在/home/atguigu/bin目录下创建脚本 f1.sh
1 [jajiahao@hadoop102 bin]$ vim f1.sh
在脚本中填写如下内容:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 # ! /bin/bash case $1 in "start"){ for i in hadoop102 hadoop103 do echo " --------启动 $i 采集flume-------" ssh $i "nohup /opt/module/flume/bin/flume-ng agent --conf-file /opt/module/flume/conf/file-flume-kafka.conf --name a1 -Dflume.root.logger=INFO,LOGFILE >/opt/module/flume/log1.txt 2>&1 &" done };; "stop"){ for i in hadoop102 hadoop103 do echo " --------停止 $i 采集flume-------" ssh $i "ps -ef | grep file-flume-kafka | grep -v grep |awk '{print \$2}' | xargs -n1 kill -9 " done };; esac
2)增加脚本执行权限
1 [jajiahao@hadoop102 bin]$ chmod 777 f1.sh
3)f1 集群启动脚本
1 2 3 [jajiahao@hadoop102 module]$ f1.sh start --------启动 hadoop102 采集flume------- --------启动 hadoop103 采集flume-------
4)f1 集群停止脚本
1 [jajiahao@hadoop102 module]$ f1.sh stop