码迷,mamicode.com
首页 > 其他好文 > 详细

《hadoop 集群搭建、spark安装、Hbase安装、Hive安装、Kafka安装》

时间:2020-07-29 00:41:18      阅读:111      评论:0      收藏:0      [点我收藏+]

标签:share   原因   arc   warning   流式   输出   recover   center   output   

1     hadoop集群安装

https://blog.csdn.net/shshheyi/article/details/84893371

1.1    修改主机名

[root@localhost ~]# vim /etc/hosts  # 三台机器都需要操作

192.168.28.131 master

192.168.77.130 slave1

192.168.77.134 slave2

注:修改hosts中,是立即生效的,无需source或者

 

vim /etc/sysconfig/network

HOSTNAME=master #最后一行添加

 

 [root@localhost ~]# reboot

使用uname -a 可以查看hostname是多少,就可以知道是否修改生效了

 

1.2    配置ssh免密码登录

集群之间的机器需要相互通信,所以我们得先配置免密码登录。在三台机器上分别运行如下命令,生成密钥对

[root@master ~]# ssh-keygen -t rsa  # 三台机器都需要执行这个命令生成密钥对

Generating public/private rsa key pair.

Enter file in which to save the key (/root/.ssh/id_rsa):

Enter passphrase (empty for no passphrase):

Enter same passphrase again:

Your identification has been saved in /root/.ssh/id_rsa.

Your public key has been saved in /root/.ssh/id_rsa.pub.

The key fingerprint is:

0d:00:bd:a3:69:b7:03:d5:89:dc:a8:a2:ca:28:d6:06 root@hadoop000

The key‘s randomart image is:

+--[ RSA 2048]----+

|    .o.          |

|      ..         |

|     . *..       |

|      B +o       |

|     = .S .      |

| E. * .          |

| .oo o .         |

|=. o  o          |

|*..    .         |

+-----------------+

 

[root@master ~]# cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

[root@master ~]# ls .ssh/

authorized_keys  id_rsa  id_rsa.pub  known_hosts

以master为主,执行以下命令,分别把公钥拷贝到其他机器上

[root@master ~]# ssh-copy-id -i ~/.ssh/id_rsa.pub master

[root@master ~]# ssh-copy-id -i ~/.ssh/id_rsa.pub slave1

[root@master ~]# ssh-copy-id -i ~/.ssh/id_rsa.pub slave2

1.3    JDK安装

1.4    下载jdk

http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

root用户使用wget命令将JDK下载到/usr/local/src/目录下

cd /usr/local/src/

tar -zxvf jdk-8u151-linux-x64.tar.gz

mv ./jdk1.8.0_151 /usr/local/jdk1

 

1.5    配置jdk环境变量

vim /etc/profile  # 增加如下内容

 

export JAVA_HOME=/usr/local/jdk1.8.0_181

export JRE_HOME=${JAVA_HOME}/jre

export CLASSPATH=${JAVA_HOME}/lib:${JRE_HOME}/lib:${JRE_HOME}/lib/charsets.jar

export PATH=$PATH:$JAVA_HOME/bin:/usr/local/mysql/bin/

 

source /etc/profile #使文件生效

 

1.6    hadoop安装与配置

1.7    创建文件目录

为了便于管理,给Master的hdfs的NameNode、DataNode及临时文件,在用户目录下创建目录

mkdir -p /data/hdfs/name

mkdir -p /data/hdfs/data

mkdir -p /data/hdfs/tmp

 

然后将这些目录通过scp命令拷贝到Slave1和Slave2的相同目录下。

 

1.8    下载

首先到Apache官网(http://www.apache.org/dyn/closer.cgi/hadoop/common/)下载Hadoop,从中选择推荐的下载镜像(https://hadoop.apache.org/releases.html),我选择hadoop-3.2.0的版本,并使用以下命令下载到Master机器的/usr/local/目录

 

cd /usr/local

wget https://mirrors.cnnic.cn/apache/hadoop/common/stable/hadoop-3.2.0.tar.gz
tar -zxvf hadoop-3.2.0.tar.gz

1.9    配置hadoop环境变量

vim /etc/profile

export HADOOP_HOME=/usr/local/hadoop-3.2.0

export PATH=$HADOOP_HOME/bin:$PATH

 

source /etc/profile  #使环境变量生效

 

 

hadoop   #发现可以有提示了,则表示配置生效了

 

 

1.10      Hadoop的配置

进入目录/usr/local/hadoop-3.2.0/etc/hadoop,依次修改core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml以及slaves文件

cd /usr/local/hadoop-3.2.0/etc/hadoop

ls

 

 

1.10.1         修改core-site.xml

vim core-site.xml 

<configuration>

<!-- 指定hadoop运行时产生文件的存储路径 -->

<property>

  <name>hadoop.tmp.dir</name>

  <value>file:/data/hdfs/tmp</value>

  <description>A base for other temporary directories.</description>

</property>

<property>

  <name>io.file.buffer.size</name>

  <value>131072</value>

</property>

<!-- 指定HDFS老大(namenode)的通信地址 -->

<property>

  <name>fs.default.name</name> #fs.defaultFS 集群模式

  <value>hdfs://master:9000</value> #主节点上改为hdfs://0.0.0.0:9000

</property>

<property>

<name>hadoop.proxyuser.root.hosts</name>

<value>*</value>

</property>

<property>

<name>hadoop.proxyuser.root.groups</name>

<value>*</value>

</property>

</configuration>

 

注意:hadoop.tmp.dir的value填写对应前面创建的目录

 

 

1.10.2         修改hdfs-site.xml

vim hdfs-site.xml

 

<configuration>

<!-- 设置hdfs副本数量 -->

<property>

<name>dfs.replication</name>

  <value>2</value>

</property>

<!-- 设置namenode存放的路径 -->

<property>

  <name>dfs.namenode.name.dir</name>

  <value>file:/data/hdfs/name</value>

  <final>true</final>

</property>

<property>

<!-- 设置datanode存放的路径 -->

  <name>dfs.datanode.data.dir</name>

  <value>file:/data/hdfs/data</value>

  <final>true</final>

</property>

<!-- 设置namenode的http通讯地址 -->

<property>

  <name>dfs.namenode.secondary.http-address</name>

  <value>master:9001</value>

</property>

<property>

  <name>dfs.webhdfs.enabled</name>

  <value>true</value>

</property>

<property>

  <name>dfs.permissions</name>

  <value>false</value>

</property>

<!-- 主节点地址 -->

<property>

  <name>dfs.namenode.http-address</name>

  <value>master:50070</value> #主节点上修改为0.0.0.0:50070

  <description>开启50070端口,不然web不能访问hadoop</description>

</property>

</configuration>

注意:dfs.namenode.name.dir和dfs.datanode.data.dir的value填写对应前面创建的目录

 

1.10.3         修改mapred-site.xml

复制template,生成xml,命令如下:

cp mapred-site.xml.template mapred-site.xml

vim  mapred-site.xml

<!-- 通知框架MR使用YARN -->

<configuration>

 <property>

  <name>mapreduce.framework.name</name>

  <value>yarn</value>

</property>

<property>

   <name>mapreduce.jobhistory.address</name>

   <value>master:10020</value>

</property>

<property>

   <name>mapreduce.jobhistory.webapp.address</name>

   <value>master:19888</value>

 </property>

</configuration>

 

1.10.4         修改yarn-site.xml

vim yarn-site.xml

<property>

<name>yarn.resourcemanager.address</name>

  <value>master:18040</value>

</property>

<property>

  <name>yarn.resourcemanager.scheduler.address</name>

  <value>master:18030</value>

</property>

<property>

  <name>yarn.resourcemanager.webapp.address</name>

  <value>master:18088</value>

</property>

<property>

  <name>yarn.resourcemanager.resource-tracker.address</name>

  <value>master:18025</value>

</property>

<property>

  <name>yarn.resourcemanager.admin.address</name>

  <value>master:18141</value>

</property>

<!-- reducer取数据的方式是mapreduce_shuffle -->

<property>

  <name>yarn.nodemanager.aux-services</name>

  <value>mapreduce.shuffle</value>

</property>

<property>

  <name>yarn.nodemanager.aux-services.mapreduce.shuffle.class</name>

  <value>org.apache.hadoop.mapred.ShuffleHandler</value>

</property>

 

1.10.5         修改 hadoop-env.sh

cd /usr/local/hadoop-3.2.0/etc/hadoop

vim  hadoop-env.sh

添加

export JAVA_HOME=/usr/local/jdk1.8.0_181

 

 

1.10.6         修改/usr/local/hadoop-3.2.0/etc/hadoop/slaves

将原来的localhost删除,改成如下内容

vim /usr/local/hadoop-3.2.0/etc/hadoop/workers

vim /usr/local/hadoop-3.2.0/etc/hadoop/slaves

 

最后,将整个hadoop-3.2.0文件夹及其子文件夹使用scp复制到slave1和slave2的相同目录中:

scp -r /usr/local/hadoop-3.2.0 root@slave1: /usr/local

scp -r /usr/local/hadoop-3.2.0 root@slave2: /usr/local

1.10.7          关闭防火墙和selinux

在每台机子上都执行此操作:

# systemctl stop firewalld && systemctl disable firewalld

systemctl stop firewalld.service   #停止防火墙

systemctl disable firewalld.service #禁止防火墙开机启动

firewall-cmd --state #检查防火墙状态

永久关闭selinux

vi /etc/selinux/config

 

1.10.8         阿里云配置hadoop远程连接Web页面

登录阿里云——》云服务ECS——》网络与安全(选择安全组)——》点击对应得实例

 

 

1.11      运行Hadoop

1.11.1         格式化NameNode

执行命令:

hadoop namenode -format

 

 

 

1.11.2         启动所有服务

# 开启dfs,包括namenode,datanode,secondarynamenode服务

sbin/start-dfs.sh

# 开启yarn,包括resourcemanager,nodemanager

sbin/start-yarn.sh

#查看集群情况

hadoop dfsadmin -report

/usr/local/hadoop-3.2.0/sbin/stop-all.sh #停止所有服务

/usr/local/hadoop-3.2.0/sbin/start-all.sh #启动所有服务

 

 

1.11.3         启动NameNode

执行命令如下:

/usr/local/hadoop-3.2.0/sbin/hadoop-daemon.sh start namenode

jps

 

 

1.11.4         启动DataNode

执行命令如下:

/usr/local/hadoop-3.2.0/sbin/hadoop-daemons.sh start datanode

 

解决办法:

ssh-keygen -t rsa #然后一直按回车,选择默认的操作即可

cd /root/.ssh

cp id_rsa.pub  authorized_keys

 

 

 

1.11.5         启动yarn

/usr/local/hadoop-3.2.0/sbin/start-yarn.sh

 

运行成功

 

 

运行失败

解决办法:

进入/usr/local/hadoop-3.2.0/sbin目录

注意是在文件开始空白处

在start-dfs.sh,stop-dfs.sh中:

 

HDFS_DATANODE_USER=root

HADOOP_SECURE_DN_USER=hdfs

HDFS_NAMENODE_USER=root

HDFS_SECONDARYNAMENODE_USER=root

 

在start-yarn.sh,stop-yarn.sh中

YARN_RESOURCEMANAGER_USER=root

HADOOP_SECURE_DN_USER=yarn

YARN_NODEMANAGER_USER=root

 

 

1.12      &新增节点

l  有服务器上$HADOOP_HOME/etc/hadoop下workers文件新增slave3的配置

slave1

slave2

slave3

l  启动新节点上的DataNode和NodeManager

在新节点上启动datanode

$HADOOP_HOME/bin/hdfs --daemon start datanode

#在新节点上启动nodemanager

$HADOOP_HOME/bin/yarn --daemon start nodemanager

l  查看集群状态

#查看hdfs各节点状态

$HADOOP_HOME/bin/hdfs dfsadmin -report

#查看yarn各节点状态

$HADOOP_HOME/bin/yarn node -list

1.13      测试hadoop

1.13.1         查看集群状态

执行命令:

/usr/local/hadoop-3.2.0/bin/hdfs dfsadmin -report

 

 

1.13.2         测试YARN

浏览器登录:http://192.168.28.131:18088/cluster

 

 

1.13.3         测试查看HDFS

浏览器登录:http://192.168.28.131:50070

 

 

 

 

 

2     hive安装

2.1    下载

http://hive.apache.org/downloads.html

 

 

cd /usr/local

wget https://mirrors.tuna.tsinghua.edu.cn/apache/hive/hive-2.3.5/apache-hive-2.3.5-bin.tar.gz

 

tar -zxvf apache-hive-2.3.5-bin.tar.gz

 

2.2    配置环境变量

vim /etc/profile

#在文件结尾添加内容如下

export HIVE_HOME=/usr/local/apache-hive-2.3.5-bin

export PATH=$PATH:$HIVE_HOME/bin

 

source /etc/profile   #使环境变量生效

 

2.3    Hive配置hive-site.xml

进入/usr/local/apache-hive-2.3.5-bin/conf目录

执行命令:

cp hive-default.xml.template hive-site.xml

 

<?xml version="1.0" ?>

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>

    <!--使用hadoop新建hdfs目录 -->

<property>

    <name>hive.metastore.warehouse.dir</name>

    <value>/user/hive/warehouse</value>

    <description>location of default database for the warehouse</description>

</property>

<!--使用hadoop新建hdfs临时目录 -->

<property>

     <name>hive.downloaded.resources.dir</name>

     <value>/tmp/hive/${hive.session.id}_resources</value>

     <description>Temporary local directory </description>

 </property>

    <property>

        <name>hive.metastore.local</name>

        <value>true</value>

    </property>

    <property>

        <name>javax.jdo.option.ConnectionURL</name>

        <value>jdbc:mysql://master:3306/hive?createDatabaseIfNotExist=true</value>

    </property>

    <property>

        <name>javax.jdo.option.ConnectionDriverName</name>

        <value>com.mysql.jdbc.Driver</value>

    </property>

    <property>

        <name>javax.jdo.option.ConnectionUserName</name>

        <value>root</value>

    </property>

    <property>

        <name>javax.jdo.option.ConnectionPassword</name>

        <value>123456</value>

     </property>

    <property>

      <name>hive.metastore.schema.verification</name>

      <value>false</value>

    </property>

</configuration>

2.4    创建必要目录hdfs目录

[root@master conf]# hadoop fs -mkdir -p /user/hive/warehouse

[root@master conf]# hadoop fs -mkdir -p /tmp/hive

[root@master conf]# hadoop fs -chmod 777 /user/hive/warehouse

[root@master conf]# hadoop fs -chmod 777 /tmp/hive

2.5    Hive配置hive-env.sh 文件

进入/usr/local/apache-hive-2.3.5-bin/conf目录

cp hive-env.sh.template  hive-env.sh

vim hive-env.sh

export HADOOP_HOME=/usr/local/hadoop-3.2.0

export HIVE_CONF_DIR=/usr/local/apache-hive-2.3.5-bin/conf

export HIVE_AUX_JARS_PATH=/usr/local/apache-hive-2.3.5-bin/lib

 

2.6    安装mysql

#CentOS7的yum源中默认好像是没有mysql

2.6.1.1    1.下载mysql的repo源

cd /usr/local

wget http://repo.mysql.com/mysql-community-release-el7-5.noarch.rpm

 

2.6.1.2    2.安装mysql-community-release-el7-5.noarch.rpm包

rpm -ivh mysql-community-release-el7-5.noarch.rpm

 

2.6.1.3    3.安装mysql

yum install mysql-server mysql-devel -y

 

 

查看mysql状态启动及停止

service mysqld status

service mysqld start

service mysqld stop

 

 

 

 

 

 

 

2.6.1.4    4. 修改mysql的配置文件,登陆时跳过密码项

vim /etc/my.cnf

skip-grant-tables     #[mysqld] 部分,skip-grant-tables添加 保存退出

 

 

service mysqld restart

 

 

2.6.1.5    5.登录mysql

mysql -uroot -p #进入mysql中,此时不需要输入密码就可以登陆

 

mysql>show databases;

mysql>use mysql;

mysql>UPDATE user SET password=PASSWORD(‘123456‘) WHERE user=‘root‘;

mysql>FLUSH PRIVILEGES;

mysql>QUIT

 

2.6.1.6    6.修改/etc/my.conf配置文件

将之前更改的配置文件/etc/my.cnf中的 skip-grant-tables 删除

 service mysqld restart

 

 

2.6.1.7    7.配置默认编码为utf8

修改/etc/my.cnf配置文件,在[mysqld]下添加编码配置,如下所示:

 

[mysqld]

character_set_server=utf8

init_connect=‘SET NAMES utf8‘

 

2.6.1.8    8.进行授权操作

#重载授权表:

mysql>GRANT ALL PRIVILEGES ON *.* TO ‘root‘@‘%‘ IDENTIFIED BY ‘123456‘ WITH GRANT OPTION;

mysql>create database hive;

mysql>FLUSH PRIVILEGES;

 

2.6.1.9    9.创建数据库hive,用来保存hive元数据

mysql>create database hive;

mysql>create user ‘hive‘ identified by ‘hive‘;

mysql>grant all privileges on *.* to ‘hive‘ with grant option;

mysql>flush privileges;

 

2.6.1.10  10.将MySQL驱动包上载到lib目录

wget https://gitee.com/boyuecom/tool/raw/master/mysql-connector-java-5.1.6-bin.jar

cp mysql-connector-java-5.1.6-bin.jar /usr/local/apache-hive-2.3.5-bin/lib

 

2.7    数据初始化

执行命令:

schematool   -initSchema  -dbType  mysql

 

 

 

 

 

 

 

2.8    测试hive

hive

 

 

 

 

3     spark安装配置

3.1    Scala环境搭建

http://www.scala-lang.org/downloads

3.1.1 下载

cd /usr/local

wget https://downloads.lightbend.com/scala/2.13.0/scala-2.13.0.tgz

tar -zxf scala-2.13.0.tgz

3.1.2 配置环境变量

vim /etc/profile

      export SCALA_HOME=/usr/local/scala-2.13.0

export PATH=$SCALA_HOME/bin:$PATH

source /etc/profile  #使环境变量生效

3.1.3 日志配置

cp conf/log4j.properties.template conf/log4j.properties

在第一行替换:

log4j.rootCategory=INFO, console

通过:

log4j.rootCategory=WARN, console

3.1.4 验证安装是否成功

scala -version

 

3.2    spark安装

Spark是基于内存计算的大数据分布式计算框架。Spark基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量廉价硬件之上,形成集群

1.提供分布式计算功能,将分布式存储的数据读入,同时将任务分发到各个节点进行计算;

2.基于内存计算,将磁盘数据读入内存,将计算的中间结果保存在内存,这样可以很好的进行迭代运算;

3.支持高容错;

4.提供多计算范式

3.3    下载

http://spark.apache.org/downloads.html

 

 

cd /usr/local

wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz

tar -xvf spark-2.1.0-bin-hadoop2.7.tgz

 

3.4    配置环境变量

3.4.1 /etc/profile

vim /etc/profile

  export SPARK_HOME=/usr/local/spark-2.1.0-bin-hadoop2.7

export PATH=$SPARK_HOME/bin:$PATH

 

 

source /etc/profile  #使环境变量生效

 

3.4.2 spark-env.sh

cd /usr/local/spark-2.1.0-bin-hadoop2.7/conf

cp spark-env.sh.template spark-env.sh

vim spark-env.sh

export SCALA_HOME=/usr/local/scala-2.13.0

export JAVA_HOME=/usr/local/jdk1.8.0_181

export SPARK_MASTER_IP=master

export SPARK_WORKER_MEMORY=1g

export HADOOP_CONF_DIR=/usr/local/hadoop-3.2.0/etc/hadoop

3.4.3 slaves

cd /usr/local/spark-2.1.0-bin-hadoop2.7/conf

cp slaves.template slaves

vim slaves

slave1

slave2

3.5    hadoop集群测试

vim wordcount.txt

Hello hadoop
hello spark
hello bigdata
执行下列命令:
hadoop fs -mkdir -p /Hadoop/Input
hadoop fs -put wordcount.txt /Hadoop/Input
hadoop jar /usr/local/hadoop-3.2.0/share/hadoop/mapreduce/hadoop-mapreduce-examples-3.2.0.jar wordcount /Hadoop/Input /Hadoop/Output

 

等待mapreduce执行完毕后,查看结果

hadoop fs -cat /Hadoop/Output/*

hadoop集群搭建成功!

 

3.6    测试spark

spark-submit 详细参数说明

参数名

参数

--master

master 的地址,提交任务到哪里执行,例如 spark://host:port,  yarn,  local

--deploy-mode

在本地 (client) 启动 driver 或在 cluster 上启动,默认是 client

--class

应用程序的主类,仅针对 java 或 scala 应用

--name

应用程序的名称

--jars

用逗号分隔的本地 jar 包,设置后,这些 jar 将包含在 driver 和 executor 的 classpath 下

--packages

包含在driver 和executor 的 classpath 中的 jar 的 maven 坐标

--exclude-packages

为了避免冲突 而指定不包含的 package

--repositories

远程 repository

--conf PROP=VALUE

指定 spark 配置属性的值;例如 -conf spark.executor.extraJavaOptions="-XX:MaxPermSize=256m"

--properties-file

加载的配置文件,默认为 conf/spark-defaults.conf

--driver-memory

Driver内存,默认 1G

--driver-java-options

传给 driver 的额外的 Java 选项

--driver-library-path

传给 driver 的额外的库路径

--driver-class-path

传给 driver 的额外的类路径

--driver-cores

Driver 的核数,默认是1。在 yarn 或者 standalone 下使用

--executor-memory

每个 executor 的内存,默认是1G

--total-executor-cores

所有 executor 总共的核数。仅仅在 mesos 或者 standalone 下使用

--num-executors

启动的 executor 数量。默认为2。在 yarn 下使用

--executor-core

每个 executor 的核数。在yarn或者standalone下使用

 

3.6.1 Local模式

Local模式就是运行在一台计算机上的模式,通常就是用于在本机上练手和测试。它可以通过以下几种方式设置Master:

local:所有计算都运行在一个线程当中,没有任何并行计算,通常我们在本机执行一些测试代码,或者练手,就用这种模式;

local[K]:指定使用几个线程来运行计算,比如local[4]就是运行4个Worker线程。通常我们的CPU有几个Core,就指定几个线程,最大化利用CPU的计算能力;

local[*]:这种模式直接帮你按照CPU最多Cores来设置线程数了。

bin/spark-submit \

--class org.apache.spark.examples.SparkPi \

--executor-memory 1G \

--total-executor-cores 2 \

./examples/jars/spark-examples_2.11-2.1.0.jar \

100

(1)基本语法

bin/spark-submit \

--class <main-class>

--master <master-url> \

--deploy-mode <deploy-mode> \

--conf <key>=<value> \

... # other options

<application-jar> \

[application-arguments]

(2)参数说明:

--master 指定Master的地址,默认为Local

--class: 你的应用的启动类 (如 org.apache.spark.examples.SparkPi)

--deploy-mode: 是否发布你的驱动到worker节点(cluster) 或者作为一个本地客户端 (client) (default: client)*

--conf: 任意的Spark配置属性, 格式key=value. 如果值包含空格,可以加引号“key=value”

application-jar: 打包好的应用jar,包含依赖. 这个URL在集群中全局可见。 比如hdfs:// 共享存储系统, 如果是 file:// path, 那么所有的节点的path都包含同样的jar

application-arguments: 传给main()方法的参数

--executor-memory 1G 指定每个executor可用内存为1G

--total-executor-cores 2 指定每个executor使用的cup核数为2个

3)结果展示

该算法是利用蒙特·卡罗算法求PI

 

3.6.2 Yarn模式(重点)

l  概述

Spark客户端直接连接Yarn,不需要额外构建Spark集群。有yarn-client和yarn-cluster两种模式,主要区别在于:Driver程序的运行节点。

yarn-client:Driver程序运行在客户端,适用于交互、调试,希望立即看到app的输出

yarn-cluster:Driver程序运行在由RM(ResourceManager)启动的AP(APPMaster)适用于生产环境。

 

l  安装使用

1) 修改hadoop配置文件yarn-site.xml,添加如下内容:

vim /usr/local/hadoop-3.2.0/etc/hadoop/yarn-site.xml

    <!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->

        <property>

                <name>yarn.nodemanager.pmem-check-enabled</name>

                <value>false</value>

        </property>

        <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->

        <property>

                <name>yarn.nodemanager.vmem-check-enabled</name>

                <value>false</value>

        </property>

2)修改spark-env.sh,添加如下配置:

vim /usr/local/spark-2.1.0-bin-hadoop2.7/conf spark-env.sh

export YARN_CONF_DIR=/usr/local/hadoop-3.2.0/etc/hadoop

3)分发配置文件

[atguigu@hadoop102 conf]$xsync/opt/module/hadoop-2.7.2/etc/hadoop/yarn-site.xml

[atguigu@hadoop102 conf]$ xsync spark-env.sh

4)执行一个程序

[atguigu@hadoop102 spark]$ bin/spark-submit \

--class org.apache.spark.examples.SparkPi \

--master yarn \

--deploy-mode client \

./examples/jars/spark-examples_2.11-2.1.0.jar \

100

注意:在提交任务之前需启动HDFS以及YARN集群。

l  日志查看

1) 修改配置文件spark-defaults.conf

cd /usr/local/spark-2.1.0-bin-hadoop2.7/conf

cp spark-defaults.conf.template spark-defaults.conf

添加如下内容:

spark.yarn.historyServer.address=master:18080

spark.history.ui.port=18080

2)重启spark历史服务

[atguigu@hadoop102 spark]$ sbin/stop-history-server.sh

[atguigu@hadoop102 spark]$ sbin/start-history-server.sh

3)提交任务到Yarn执行

[atguigu@hadoop102 spark]$ bin/spark-submit \

--class org.apache.spark.examples.SparkPi \

--master yarn \

--deploy-mode client \

./examples/jars/spark-examples_2.11-2.1.0.jar \

100

4)Web页面查看日志

 

 

 

3.6.3 启动spark-shel

[root@master spark-2.1.0-bin-hadoop2.7]#vim wordcount.txt

Hello hadoop
hello spark
hello bigdata
执行下列命令:
[root@master spark-2.1.0-bin-hadoop2.7]#hadoop fs -mkdir -p /Hadoop/Input

[root@master spark-2.1.0-bin-hadoop2.7]#hadoop fs -put wordcount.txt /Hadoop/Input

[root@master spark-2.1.0-bin-hadoop2.7]#bin/spark-shell

 

scala> sc

scala> val file=sc.textFile("hdfs://master:9000/Hadoop/Input/wordcount.txt")

scala> val rdd = file.flatMap(line => line.split(" ")).map(word => (word,1)).reduceByKey(_+_)

scala> rdd.collect()

scala> rdd.foreach(println)

 

 

 

import os

import sys

spark_name = os.environ.get(‘SPARK_HOME‘,None)

if not spark_name:

    raise ValueErrorError(‘spark环境没有配置好‘)

sys.path.insert(0,os.path.join(spark_name,‘python‘))

sys.path.insert(0,os.path.join(spark_name,‘python/lib/py4j-0.10.4-src.zip‘))

exec(open(os.path.join(spark_name,‘python/pyspark/shell.py‘)).read())

 

 

 

 

3.6.4 RDD的创建

在Spark中创建RDD的创建方式可以分为三种:从集合中创建RDD;从外部存储创建RDD;从其他RDD创建。

3.6.4.1    从集合中创建

从集合中创建RDD,Spark主要提供了两种函数:parallelize和makeRDD

1)使用parallelize()从集合创建

scala> val rdd = sc.parallelize(Array(1,2,3,4,5,6,7,8))

rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24

2)使用makeRDD()从集合创建

scala> val rdd1 = sc.makeRDD(Array(1,2,3,4,5,6,7,8))

rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at <console>:24

 

3.6.4.2    由外部存储系统的数据集创建

包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等,我们会在第4章详细介绍。

scala> val rdd2= sc.textFile("hdfs://hadoop102:9000/RELEASE")

rdd2: org.apache.spark.rdd.RDD[String] = hdfs:// hadoop102:9000/RELEASE MapPartitionsRDD[4] at textFile at <console>:24

3.6.4.3    从其他RDD创建

3.6.5 RDD的计算方式(俩类算子):

 

    1. 变换(Transformations):

        特点: 懒执行,变换只是一些指令集并不会去马上执行,需要等到有Actions操作的时候才会真正的据算结果

        比如: map()    flatMap()    groupByKey    reduceByKey

    2. 操作(Actions):

        特点: 立即执行

        比如: count()    take()    collect()   top()    first()

RDD的持久化存储(cache和persist)

默认情况下使用Action在RDD上时Spark 会重新计算刷新RDD.但是这俩种持久化方法可以将RDD放在内存当中,这样第二次使用的时候action在RDD上时候Spark 不会重新计算刷新RDD

rows = sc.textFile(‘/user/hadoop/hello.txt‘)

rows.persist()   # 或者  rows.cache()

rows.count()    # 第一次执行,会将RDD放在内存上

rows.count()    # 第二次执行不会重新从文件读取RDD

3.6.6 map()与flatMap()

 

3.6.7 filter()

过滤,将符合条件的数据留下来

 

3.6.8 reduce()与reduceByKey()

l  reduce将RDD中元素前两个传给输入函数,产生一个新的return值,新产生的return值与RDD中下一个元素(第三个元素)组成两个元素,再被传给输入函数,直到最后只有一个值为止

 

l  reduceByKey就是对元素为键值对的RDD中Key相同的元素的Value进行reduce操作,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的键值对。(去键重)

 

3.6.9 count()与countByValue()

 

3.6.10         RDD基本转换操作

 

3.6.11         关于键值对类型的转换操作

pathA = [(‘a‘,1),(‘b‘,1),(‘c‘,2),(‘d‘,3)]

pathB = [(‘c‘,1),(‘d‘,3),(‘e‘,3),(‘f‘,4),]

a = sc.parallelize(pathA)

b = sc.parallelize(pathB)

print(a.join(b).collect())              # 交集

print(a.rightOuterJoin(b).collect())     # 右连接

print(a.leftOuterJoin(b).collect())       # 左连接

print(a.cogroup(b).collect())        # 全连接   

print(a.subtractByKey(b).collect())     # 减连接

3.6.12         RDD元素取值操作:

take(n)        返回前n个元素

top(n)         返回最大的n个元素

first()          返回第一个元素

collect()      返回所有元素,一般元素少的话才会使用

lookup(key) 返回某键值下的所有值

collectAsMap()返回的是一MAP形式的串行化结果

countByKey() 返回的是每一键组内的记录数

3.7    pyspark

【Example】

#!/usr/bin/env

#-*- coding:utf-8 -*-

import os

from pyspark.sql import SparkSession

 

spark=SparkSession.builder.appName("boye").getOrCreate()

#运行在本地(local),2个线程

# spark = SparkSession.builder.appName("test").master("local[2]").getOrCreate()

sc = spark.sparkContext

textFile = sc.textFile("file:///usr/local/test/urls")

#获取域名

#rdd = textFile.filter(lambda x:x.__contains__("http")).map( lambda x:(x.split("\t")[1].split("/")[2],1))

#获取url

rdd = textFile.filter(lambda x:x.__contains__("http")).map( lambda x:(x.split("\t")[1],1))

rdd = rdd.reduceByKey(lambda a,b:a+b)

#sortBy 升序排序

rdd = rdd.sortBy(lambda x:x[0]).map(lambda x:"\t".join([str(i) for i in x]))

os.popen(‘rm -rf /usr/local/test/spark_urls‘)

rdd.saveAsTextFile("file:///usr/local/test/spark_urls")

4     hbase下载安装

4.1    下载解压

cd /usr/local/

wget http://archive.apache.org/dist/hbase/1.3.1/hbase-1.3.1-bin.tar.gz

tar -zxvf hbase-1.3.1-bin.tar.gz

4.2    进入目录,配置文件

cd /usr/local/hbase-1.3.1/conf

vim hbase-site.xml

 

4.2.1 单机模式

<!-- 指定HRegion服务器的位置,即数据存放位置  -->

<property>

    <name>hbase.rootdir</name>

    <value>file:///tmp/hbase</value>

</property>

 

4.2.2 伪分布式模式

<!-- 指定HRegion服务器的位置,即数据存放位置  -->

<property>

    <name>hbase.rootdir</name>

    <value>hdfs://localhost:9000/hbase</value>

</property>

<!-- 指定HLog和Hfile的副本个数  -->

<property>

    <name>dfs.replication</name>

    <value>1</value>

</property>

 

4.2.3 完全分布式模式

<!--  指定HRegion服务器的位置,即数据存放位置 -->

<property>

    <name>hbase.rootdir</name>

    <value>hdfs://master:9000/hbase</value>

</property>

<!-- 指定HBase运行模式,false表示单机模式或伪分布式,true表示完全分布式模式  -->

<property>

    <name>hbase.clister.distributed</name>

    <value>true</value>

</property>

<!-- 指定master位置  -->

<property>

    <name>hbase.master</name>

    <value>hdfs://master:60000</value>

</property>

<!--  指定zookeeper集群 -->

<property>

    <name>hbase.zookeeper.quorum</name>

    <value>master,slave1,slave2</value>

</property>

 

4.3    配置环境变量

vim /etc/profile

最后一行添加

export HBASE_HOME=/usr/local/hbase-1.3.1

export PATH=$HBASE_HOME/bin:$PATH

 

source /etc/profile #使环境变量生效

4.4    运行与停止

进入bin目录

cd $HBASE_HOME/bin

4.4.1 单机模式

sh start-hbase.sh

4.4.2 查看

jps

 

4.4.3 伪分布式模式

sh start-dfs.sh

sh start-hbase.sh

4.4.4 完全分布式模式

sh start-dfs.sh

sh zookeeper.sh start

sh start-hbase.sh

4.4.5 关闭hbase

sh stop-hbase.sh

4.4.6 报错

Could not start ZK at requested port of 2181. ZK was started at port: 2182. Aborting a

HMaster和HRegionServer是Hbase的两个子进程,但是使用jps发现没有启动起来,所以去我们配置的logs查看错误信息。提示:

Could not start ZK at requested port of 2181.  ZK was started at port: 2182.  Aborting as clients (e.g. shell) will not be able to find this ZK quorum.

但是在hbase-env.sh文件中设置了export HBASE_MANAGES_ZK=false

设置不使用自带zookeeper,这一步设置完按理说就可以使用独立的zookeeper程序了,但是还是报错。很明显,这是启动自带zookeeper与独立zookeeper冲突了。因为把hbase.cluster.distributed设置为false,也就是让hbase以standalone模式运行时,依然会去启动自带的zookeeper。

 

所以要做如下设置,值为true:

vim conf/hbase-site.xml

<property>

        <name>hbase.cluster.distributed</name>

        <value>true</value>

</property>

4.5    Hbase- Shell命令

HBase是一个分布式的、面向列的开源数据库,源于google的一篇论文《bigtable:一个结构化数据的分布式存储系统》。HBase是Google Bigtable的开源实现,它利用Hadoop HDFS作为其文件存储系统,利用Hadoop MapReduce来处理HBase中的海量数据,利用Zookeeper作为协同服务。

HBase以表的形式存储数据。表有行和列组成。列划分为若干个列族/列簇(column family)

HBase数据模型

 

 

  1. HBase的存储机制

HBase是一个面向列的数据库,在表中它由行排序。表模式定义只能列族,也就是键值对。一个表有多个列族以及每一个列族可以有任意数量的列。后续列的值连续存储在磁盘上。表中的每个单元格值都具有时间戳。总之,在一个HBase:

²  表是行的集合。

²  行是列族的集合。

²  列族是列的集合。

²  列是键值对的集合。

4.5.1 基本命令

hbase shell #进入Hbase数据库

help [‘command‘] 查看帮助命令

status     #查询服务器状态

whoami  #查询当前用户

version   #当前hbase使用的版本号

操作

命令表达式

创建表  

create ‘table_name, ‘family1‘,‘family2‘,‘familyN‘

添加记录

put ‘table_name‘, ‘rowkey‘, ‘family:column‘, ‘value‘

查看记录

get ‘table_name‘, ‘rowkey‘     --查询单条记录

查看表中的记录总数

count  ‘table_name‘          --这个命令并不快

删除记录

第一种方式删除一条记录单列的数据,第二种方式删除整条记录

delete ‘table_name‘ ,‘rowkey‘,‘family_name:column‘

deleteall ‘table_name‘,‘rowkey‘

删除一张表

1、disable ‘table_name‘     

2、drop ‘table_name‘

查看所有记录

scan "table_name" ,{LIMIT=>10}     --LIMIT=>10 只返回10条记录

4.5.2 权限管理

4.5.2.1    分配权限

#语法 : grant <user> <permissions> <table> <column family> <column qualifier> 参数后面用逗号分隔

# 权限用五个字母表示: "RWXCA".

# READ(‘R‘), WRITE(‘W‘), EXEC(‘X‘), CREATE(‘C‘), ADMIN(‘A‘)

# 例如,给用户‘test‘分配对表t1有读写的权限,

hbase(main)> grant ‘test‘,‘RW‘,‘t1‘

4.5.2.2    查看权限

# 语法:user_permission <table>

# 例如,查看表t1的权限列表

hbase(main)> user_permission ‘t1‘

4.5.2.3    收回权限

# 与分配权限类似,语法:revoke <user> <table> <column family> <column qualifier>

# 例如,收回test用户在表t1上的权限

hbase(main)> revoke ‘test‘,‘t1‘

4.5.3 DDL操作

4.5.3.1    create-创建一个表

语法:create <table>, {NAME => <family>, VERSIONS => <VERSIONS>}

create ‘表名称‘, ‘列名称1‘,‘列名称2‘,‘列名称N‘

create ‘table1‘, ‘tab1_id‘, ‘tab1_add‘, ‘tab1_info‘

4.5.3.2    list-列出所有的表

list

 

4.5.3.3    describe-获得表的描述

describe "table1"

 

4.5.3.4    exists-查看表是否存在

exists ‘table2‘

4.5.3.5    disable、dorp -删除一个表

disable ‘table1‘

drop ‘table1‘

注:先要屏蔽该表,才能对该表进行删除

4.5.3.6    is_enabled-判断表是否为‘enable’

is_enabled ‘table1‘

4.5.3.7    is_disabled-判断表是否为‘disable’

is_disabled ‘table1‘

4.5.3.8    alter-修改表

²  修改emp表的personal data列族的VERSIONS值为5

alter ‘emp’,NAME=>’personal data’,VERSIONS=>5

²  可以将表设置为只读模式,命令如下:

alter ‘tablename’,READONLY

²  删除表范围运算符,需首先将表disable:

alter ‘tablename’,METHOD=>’table_att_unset’,NAME=>’MAX_FILESIZE’

删除列族,需首先将表disable:

alter ‘tablename’,’delete’=>’column family’

删除一个列族之后,这个列族的数据也会全部被删除

4.5.4 DML操作

4.5.4.1    put-插入几条记录

语法:put <table>,<rowkey>,<family:column>,<value>,<timestamp>

create ‘member‘,‘member_id‘,‘address‘,‘info‘

put ‘member‘, ‘scutshuxue1‘, ‘info:age‘, ‘24‘

put ‘member‘, ‘scutshuxue2‘, ‘info:birthday‘, ‘1987-06-17‘

put ‘member‘, ‘scutshuxue3‘, ‘info:company‘, ‘alibaba‘

put ‘member‘, ‘scutshuxue‘, ‘address:contry‘, ‘china‘

put ‘member‘, ‘scutshuxue‘, ‘address:province‘, ‘zhejiang‘

put ‘member‘, ‘scutshuxue‘, ‘address:city‘, ‘hangzhou‘

4.5.4.2    scan-查看所有记录

scan "表名称" , [‘列名称:‘]

# 语法:scan <table>, {COLUMNS => [ <family:column>,.... ], LIMIT => num}

scan ‘User‘, {LIMIT => 2}

scan “table1”

 

4.5.4.3    count-查询表中有多少行

# 语法:count <table>, {INTERVAL => intervalNum, CACHE => cacheNum}

count ‘member‘

4.5.4.4    get-获得数据

get ‘表名称‘, ‘行名称‘

# 语法:get <table>,<rowkey>,[<family:column>,....]

获得一行的所有数据

get ‘member‘,‘scutshuxue‘

获得某行,某列族的所有数据

get ‘member‘,‘scutshuxue‘,‘info‘

获得某行,某列族,某列的所有数据

get ‘member‘,‘scutshuxue‘,‘info:company‘

4.5.4.5    给‘id’这个行健增加‘column_famaly1:addr‘字段,并使用counter实现递增

hbase(main):002:0> incr ‘table‘,‘id‘,‘column_famaly1:addr‘

COUNTER VALUE = 1

0 row(s) in 0.0340 seconds

 

4.5.4.6    put-更新一条记录

put ‘表名称‘, ‘行名称‘, ‘列名称:‘, ‘值‘

put ‘member‘, ‘scutshuxue‘, ‘info:age‘, 99  --把scutshuxue年龄改为99

 

4.5.4.7    delete/deleteall-删除

delete  ‘表名‘ ,‘行名称‘ , ‘列名称‘

# 语法:delete <table>, <rowkey>,  <family:column> , <timestamp>,必须指定列名

delete ‘member‘, ‘scutshuxue‘, ‘info:age‘ --删除行‘scutshuxue‘, 列族为‘info‘ 中age的值

 

# 语法:deleteall <table>, <rowkey>,  <family:column> , <timestamp>,可以不指定列名,删除整行数据

deleteall ‘member‘, ‘scutshuxue‘ --删除整行

4.5.4.8    将整个表清空

truncate ‘member‘

5     Zookeeper安装

官网:http://archive.apache.org/dist/zookeeper/

l  下载

wget http://archive.apache.org/dist/zookeeper/zookeeper-3.4.5/zookeeper-3.4.5.tar.gz

l  解压

tar -zxvf zookeeper-3.4.5.tar.gz

l  进入解压目录,创建data和logs目录

cd  /usr/local/zookeeper-3.4.5

mkdir data

mkdir logs

l  在conf目录下新建zoo.cfg文件,写入以下内容保存

vim /usr/local/zookeeper-3.4.5/conf/zoo.cfg

       tickTime=2000

       dataDir=/usr/local/zookeeper-3.4.5/data

       dataLogDir=/usr/local/zookeeper-3.4.5/logs

       clientPort=2181

5.1    启动和停止

进入bin目录,启动、停止、重启分和查看当前节点状态(包括集群中是何角色)别执行:

cd /usr/local/zookeeper-3.4.5/bin

./zkServer.sh start

./zkServer.sh stop

./zkServer.sh restart

./zkServer.sh status

5.2    查看进程

ps -aux | grep ‘zookeeper‘

 

5.3    伪集群模式

伪集群模式就是在同一主机启动多个zookeeper并组成集群,

下边以在192.168.28.131主机上创3个zookeeper组集群为例。

将通过第一大点安装的zookeeper,复制成zookeeper1/zookeeper2/zookeeper3三份

zookeeper1配置

zookeeper1配置文件conf/zoo.cfg修改如下:

       tickTime=2000

       dataDir=/usr/local/zookeeper1/data

       dataLogDir=/usr/local/zookeeper1/logs

       clientPort=2181

       initLimit=5

       syncLimit=2

       server.1=192.168.28.131:2888:3888

       server.2=192.168.28.131:4888:5888

       server.3=192.168.28.131:6888:7888

zookeeper1的data/myid配置如下

       echo ‘1‘ > data/myid

 

l  zookeeper2配置

 zookeeper2配置文件conf/zoo.cfg修改如下:

       tickTime=2000

       dataDir=/usr/local/zookeeper2/data

       dataLogDir=/usr/local/zookeeper2/logs

       clientPort=3181

       initLimit=5

       syncLimit=2

       server.1=192.168.28.131:2888:3888

       server.2=192.168.28.131:4888:5888

       server.3=192.168.28.131:6888:7888

l  zookeeper2的data/myid配置如下:

       echo ‘2‘ > data/myid

 

zookeeper3配置

 zookeeper3配置文件conf/zoo.cfg修改如下:

       tickTime=2000

       dataDir=/usr/local/zookeeper3/data

       dataLogDir=/usr/local/zookeeper3/logs

       clientPort=4181

       initLimit=5

       syncLimit=2

       server.1=192.168.28.131:2888:3888

       server.2=192.168.28.131:4888:5888

       server.3=192.168.28.131:6888:7888

      

l  zookeeper3的data/myid配置如下:

       echo ‘3‘ > data/myid

 

l  最后使用命令把三个zookeeper都启动即可,启动顺序随意没要求

sh /usr/local/zookeeper1/bin/zkServer.sh  start

sh /usr/local/zookeeper2/bin/zkServer.sh  start

sh /usr/local/zookeeper3/bin/zkServer.sh  start

 

5.4    集群模式

集群模式就是在不同主机上安装zookeeper然后组成集群的模式;下边以在192.168.28.131/132/133三台主机为例。

将第1.1到1.3步中安装好的zookeeper打包复制到132和133上,并都解压到同样的目录下。

5.4.1 conf/zoo.cfg文件修改

三个zookeeper的conf/zoo.cfg修改如下:

tickTime=2000

dataDir=/usr/local/zookeeper-3.4.5/data

dataLogDir=/usr/local/zookeeper-3.4.5/logs

clientPort=2181

initLimit=5

syncLimit=2

server.1=192.168.220.131:2888:3888

server.2=192.168.220.132:2888:3888

server.3=192.168.220.133:2888:3888

对于132和133,由于安装目录都是zookeeper-3.4.5所以dataDir和dataLogDir不需要改变,又由于在不同机器上所以clientPort也不需要改变

所以此时132和133的conf/zoo.cfg的内容与131一样即可

5.4.2 data/myid文件修改

l  131 data/myid修改如下

echo ‘1‘ > data/myid

l  132 data/myid修改如下

echo ‘2‘ > data/myid

l  133 data/myid修改如下

echo ‘3‘ > data/myid

5.5    基本命令使用

以下命令不管是单机、伪集群、集群模式都适用;伪集群和集群模式随便连接其中一个zookeeper即可。

进入zookeeper的bin目录,使用zkCli连接zookeeper

./bin/zkServer.sh status #查看集群状态

./zkCli.sh    # 默认连接localhost:2181

./zkCli.sh -server 192.168.220.128:2181  #指定ip和端口

命令

描述

help

查看所有支持的命令

ls /

查看目录下有哪些节点。以根目录为例

create

/example_path

"example_data"

创建一个节点;加-s表示创建顺序节点,即会自动在给定的路径后面再加上一个数字串,保证路径不重复;默认是持久节点,加-e是临时节点

get /example_path

查看节点内容;返回第一行即是节点的内容,如果第一行空白或null那就说明该节点创建时就没有值;后续的cZxid到numChildren都是该节点的一些属性信息;其中numChildren标识该节点下有多少个子节点

delete /example_path

删除一个没有子节点的节点

rmr /example_path

递规删除节点及其所有子节点

quit

退出zkCli

 

6     kafka安装部署

参考:https://blog.csdn.net/luanpeng825485697/article/details/81036028

6.1    背景

6.1.1 Apache Kafka 概述

在大数据中,使用了大量的数据。 关于数据,我们有两个主要挑战。第一个挑战是如何收集大量的数据,第二个挑战是分析收集的数据。为了克服这些挑战,您必须需要一个消息系统。

Kafka专为分布式高吞吐量系统而设计。 Kafka往往工作得很好,作为一个更传统的消息代理的替代品。 与其他消息传递系统相比,Kafka具有更好的吞吐量,内置分区,复制和固有的容错能力,这使得它非常适合大规模消息处理应用程序。

6.1.2 什么是消息系统?

消息系统负责将数据从一个应用程序传输到另一个应用程序,因此应用程序可以专注于数据,但不担心如何共享它。 分布式消息传递基于可靠消息队列的概念。 消息在客户端应用程序和消息传递系统之间异步排队。 有两种类型的消息模式可用 - 一种是点对点,另一种是发布 - 订阅(pub-sub)消息系统。 大多数消息模式遵循 pub-sub 。

6.1.3 点对点消息系统

在点对点系统中,消息被保留在队列中。 一个或多个消费者可以消耗队列中的消息,但是特定消息只能由最多一个消费者消费。 一旦消费者读取队列中的消息,它就从该队列中消失。 该系统的典型示例是订单处理系统,其中每个订单将由一个订单处理器处理,但多个订单处理器也可以同时工作。 下图描述了结构。

 

6.1.4 发布 - 订阅消息系统

在发布 - 订阅系统中,消息被保留在主题中。 与点对点系统不同,消费者可以订阅一个或多个主题并使用该主题中的所有消息。 在发布 - 订阅系统中,消息生产者称为发布者,消息使用者称为订阅者。 一个现实生活的例子是Dish电视,它发布不同的渠道,如运动,电影,音乐等,任何人都可以订阅自己的频道集,并获得他们订阅的频道时可用。

 

6.1.5 什么是Kafka?

Apache Kafka是一个分布式发布 - 订阅消息系统和一个强大的队列,可以处理大量的数据,并使您能够将消息从一个端点传递到另一个端点。 Kafka适合离线和在线消息消费。 Kafka消息保留在磁盘上,并在群集内复制以防止数据丢失。 Kafka构建在ZooKeeper同步服务之上。 它与Apache Storm和Spark非常好地集成,用于实时流式数据分析。

 

6.1.5.1    好处

以下是Kafka的几个好处:

  1. 可靠性 - Kafka是分布式,分区,复制和容错的。
  2. 可扩展性 - Kafka消息传递系统轻松缩放,无需停机。
  3. 耐用性 - Kafka使用分布式提交日志,这意味着消息会尽可能快地保留在磁盘上,因此它是持久的。
  4. 性能 - Kafka对于发布和订阅消息都具有高吞吐量。 即使存储了许多TB的消息,它也保持稳定的性能。

Kafka非常快,并保证零停机和零数据丢失。

6.1.5.2    用例

Kafka可以在许多用例中使用。 其中一些列出如下:

l  指标 - Kafka通常用于操作监控数据。 这涉及聚合来自分布式应用程序的统计信息,以产生操作数据的集中馈送。

l  日志聚合解决方案 - Kafka可用于跨组织从多个服务收集日志,并使它们以标准格式提供给多个服务器。

l  流处理 - 流行的框架(如Storm和Spark Streaming)从主题中读取数据,对其进行处理,并将处理后的数据写入新主题,供用户和应用程序使用。 Kafka的强耐久性在流处理的上下文中也非常有用。

需要Kafka

Kafka是一个统一的平台,用于处理所有实时数据Feed。 Kafka支持低延迟消息传递,并在出现机器故障时提供对容错的保证。 它具有处理大量不同消费者的能力。 Kafka非常快,执行2百万写/秒。 Kafka将所有数据保存到磁盘,这实质上意味着所有写入都会进入操作系统(RAM)的页面缓存。 这使得将数据从页面缓存传输到网络套接字非常有效。

6.2    Kafka消费组织原理

 

 

 

6.3    下载安装

http://kafka.apache.org/downloads

 

 

l  下载

wget https://mirrors.cnnic.cn/apache/kafka/2.3.0/kafka_2.11-2.3.0.tgz

l  解压

tar -zxvf kafka_2.11-2.3.0.tgz

l  进入目录

cd /usr/local/kafka_2.11-2.3.0

l  修改配置文件

vi conf/server.properties

#broker的全局唯一编号,不能重复

broker.id=0

#删除topic功能使能

delete.topic.enable=true

#处理网络请求的线程数量

num.network.threads=3

#用来处理磁盘IO的现成数量

num.io.threads=8

#发送套接字的缓冲区大小

socket.send.buffer.bytes=102400

#接收套接字的缓冲区大小

socket.receive.buffer.bytes=102400

#请求套接字的缓冲区大小

socket.request.max.bytes=104857600

#kafka运行日志存放的路径   

log.dirs=/usr/local/kafka_2.11-2.3.0/logs

#topic在当前broker上的分区个数

num.partitions=1

#用来恢复和清理data下数据的线程数量

num.recovery.threads.per.data.dir=1

#segment文件保留的最长时间,超时将被删除

log.retention.hours=168

#配置连接Zookeeper集群地址

zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181

l  配置环境变量

sudo vi /etc/profile

 

#KAFKA_HOME

export KAFKA_HOME=/usr/local/kafka_2.11-2.3.0

export PATH=$PATH:$KAFKA_HOME/bin

 

[atguigu@hadoop102 module]$ source /etc/profile

 

l  启动服务器

注:需要先启动 zookeeper

bin/kafka-server-start.sh config/server.properties

 

l  停止服务器

bin/kafka-server-stop.sh config/server.properties

6.4    单节点 - 单代理配置

https://www.cnblogs.com/rilley/p/5391268.html

#每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumers

broker.id=0

#kafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2

log.dirs=/usr/local/kafka/kafka-logs

#消息体的最大大小,单位是字节

message.max.bytes = 1000000

zookeeper.connect=master:2181

listeners=PLAINTEXT://:9092

num.network.threads=3

num.io.threads=8

socket.send.buffer.bytes=102400

socket.receive.buffer.bytes=102400

socket.request.max.bytes=104857600

num.partitions=3 #分区

num.recovery.threads.per.data.dir=1

offsets.topic.replication.factor=1

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

#日志清理策略 选择有:delete和compact 主要针对过期数据的处理,或是日志文件达到限制的额度,会被 topic创建时的指定参数覆盖

log.cleanup.policy = delete

#数据存储的最大时间 超过这个时间 会根据log.cleanup.policy设置的策略处理数据

log.retention.hours=168

#控制toppic分区中每个segment(00000000000000006223.log)的大小

log.segment.bytes=1024 * 1024 * 1024

#这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment 会被 topic创建时的指定参数覆盖

log.roll.hours = 24*7

#topic每个分区的最大文件大小,一个topic的大小限制 = 分区数*log.retention.bytes =-1 没有大小限制log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除,会被topic创建时的指定参数覆盖

log.retention.bytes=-1

#文件大小检查的周期时间,是否处罚 log.cleanup.policy中设置的策略

log.retention.check.interval.ms=300000

log.cleaner.enable=false     #是否开启日志压缩

log.cleaner.threads =1       #日志压缩运行的线程数

#日志压缩去重时候的缓存空间 ,在空间允许的情况下,越大越好

log.cleaner.dedupe.buffer.size=500*1024*1024

zookeeper.connection.timeout.ms=60000

#对于压缩的日志保留的最长时间,也是客户端消费消息的最长时间,同log.retention.minutes的区别在于一个控制未压缩数据,一个控制压缩后的数据。会被topic创建时的指定参数覆盖

log.cleaner.delete.retention.ms = 1 day

#对于segment日志的索引文件大小限制,会被topic创建时的指定参数覆盖

log.index.size.max.bytes = 10 * 1024 * 1024

#当执行一个fetch操作后,需要一定的空间来扫描最近的offset大小,设置越大,代表扫描速度越快,但是也更好内存,一般情况下不需要搭理这个参数

log.index.interval.bytes = 4096

group.initial.rebalance.delay.ms=0

6.4.1 创建topic

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

参数说明:

–zookeeper:指定kafka连接zk的连接url,该值和server.properties文件中的配置项{zookeeper.connect}一样

–replication-factor:指定副本数量

–partitions:指定分区数量

–topic:主题名称

6.4.2 查看所有的topic信息

bin/kafka-topics.sh --list --zookeeper localhost:2181

6.4.3 启动生产者发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

6.4.4 启动消费者以接收消息

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

--from-beginning:会把主题中以往所有的数据都读取出来

6.5     单节点多代理配置

拷贝server.properties三份

cd /usr/local/kafka_2.11-2.3.0

cp server.properties server-1.properties

cp server.properties server-2.properties

cp server.properties server-3.properties

l  修改server-1.properties文件

# broker的全局唯一编号,不能重复

broker.id=1

# 监听

listeners=PLAINTEXT://:9093

# 日志目录

log.dirs=/home/hadoop/kafka-logs-1

 

l  修改server-2.properties文件

# broker的全局唯一编号,不能重复

broker.id=2

# 监听

listeners=PLAINTEXT://:9094

# 日志目录

log.dirs=/home/hadoop/kafka-logs-2

 

l  修改server-3.properties文件

# broker的全局唯一编号,不能重复

broker.id=3

# 监听

listeners=PLAINTEXT://:9095

# 日志目录

log.dirs=/home/hadoop/kafka-logs-3

 

l  启动Zookeeper

cd /usr/local/zookeeper-3.4.5/bin

sh zkServer.sh start

ZooKeeper JMX enabled by default

Using config: /usr/local/zookeeper-3.4.5/bin/../conf/zoo.cfg

Starting zookeeper ... STARTED

启动Kafka(分别启动server1、2、3)

cd /usr/local/kafka_2.11-2.3.0

bin/kafka-server-start.sh -daemon config/server-1.properties

bin/kafka-server-start.sh -daemon config/server-2.properties

bin/kafka-server-start.sh -daemon config/server-3.properties

 

 

6.5.1 创建topic(指定副本数量为3)

bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3

-partitions 1 --topic Multibrokerapplication

6.5.2 检查哪个代理正在侦听当前创建的主题

bin/kafka-topics.sh --describe --zookeeper localhost:2181

--topic Multibrokerappli-cation

 

6.5.3 启动生产者以发送消息

bin/kafka-console-producer.sh --broker-list localhost:9092

--topic Multibrokerapplication

6.5.4 启动消费者以接收消息

bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092  --topic myjob --from-beginning

bin/kafka-console-consumer.sh  --bootstrap-server localhost:9092  --topic myjob --group myjob-group

6.6    基本操作

l  查看指定topic信息

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic test

 

l  查看积压

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group test-group

LogEndOffset:下一条将要被加入到日志的消息的位移

CurrentOffset:当前消费的位移

LAG :消息堆积量

 

l  修改主题

bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name

--parti-tions count

l  删除topic

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic first

注:需要server.properties中设置delete.topic.enable=true否则只是标记删除。

l  查看某个Topic的详情

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic first

l  修改分区数

bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic first --partitions 6

 

删除主题

语法:

bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name

 

 

6.7    kafka启动报错处理

zookeeper 集群启动没有问题,集群状态也正常,但是启动kafka 确报了这个错误:

Timed out waiting for connection while in state: CONNECTING (连接超时)

 

原因:

a)      zookeeper 访问不了。 查看下zookeeper (网络是否通)是否正常启动。

b)      kafka 的zookeeper (server.properties里面)访问地址不正确,检查一下。

c)      kafka 的 broker.id (server.properties里面)没有注释掉,这里集群最好注释掉,不要手动指定。

d)      修改 kafka 配置 连接超时间,这里是以毫秒为单位。

zookeeper.connection.timeout.ms=60000

 

启动Kafka报Java HotSpot(TM) 64-Bit Server VM warning: INFO: os::commit_memory(0x00000000c0000000, 107

原因:启动kafka报JAVA HotSpot 内存不足

直接在bin下面

vim kafka-server-start.sh

 

《hadoop 集群搭建、spark安装、Hbase安装、Hive安装、Kafka安装》

标签:share   原因   arc   warning   流式   输出   recover   center   output   

原文地址:https://www.cnblogs.com/boye169/p/13394594.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!