标签:负载平衡 缺省 共享库 option 流操作 命令执行 分析 执行引擎 table
一旦数据仓库开始使用,就需要不断从源系统给数据仓库提供新数据。为了确保数据流的稳定,需要使用所在平台上可用的任务调度器来调度ETL定期执行。调度模块是ETL系统必不可少的组成部分,它不但是数据仓库的基本需求,也对项目的成功起着举足轻重的作用。本篇说明如何使用HDP中的Oozie和Falcon服务实现ETL执行自动化。last_value=`sqoop job --show myjob_incremental_import | grep incremental.last.value | awk ‘{print $3}‘` 该值在后面重建作业时会用到。create database sqoop; create user ‘sqoop‘@‘hdp2‘ identified by ‘sqoop‘; grant all privileges on sqoop.* to ‘sqoop‘@‘hdp2‘; flush privileges;
在Ambari的Sqoop -> Configs -> Custom sqoop-site中添加如图2所示的参数
mysql> show tables; +-----------------+ | Tables_in_sqoop | +-----------------+ | SQOOP_ROOT | +-----------------+ 1 row in set (0.00 sec)
use sqoop; insert into SQOOP_ROOT values (NULL, ‘sqoop.hsqldb.job.storage.version‘, ‘0‘);
sqoop job --list此时并不会返回先前已经创建的myjob_incremental_import作业,因为此时MySQL中没有元数据信息。该命令执行完成后,MySQL的sqoop库中有了一个名为SQOOP_SESSIONS的空表,该表存储sqoop job相关信息。
mysql> show tables; +-----------------+ | Tables_in_sqoop | +-----------------+ | SQOOP_ROOT | | SQOOP_SESSIONS | +-----------------+ 2 rows in set (0.00 sec)
alter table SQOOP_ROOT engine=myisam; alter table SQOOP_SESSIONS engine=myisam;因为每次执行增量抽取后都会更新last_value值,如果使用Innodb可能引起事务锁超时错误。
sqoop job --create myjob_incremental_import -- import --connect "jdbc:mysql://172.16.1.127:3306/source?usessl=false&user=dwtest&password=123456" --table sales_order --target-dir /data/ext/sales_order --compress --where "entry_date < current_date()" --incremental append --check-column order_number --last-value $last_value上面的命令执行后,SQOOP_SESSIONS表中存储了Sqoop job的信息。
select * from SQOOP_SESSIONS\G ... *************************** 53. row *************************** job_name: myjob_incremental_import propname: sqoop.property.set.id propval: 0 propclass: schema *************************** 54. row *************************** job_name: myjob_incremental_import propname: sqoop.tool propval: import propclass: schema *************************** 55. row *************************** job_name: myjob_incremental_import propname: temporary.dirRoot propval: _sqoop propclass: SqoopOptions *************************** 56. row *************************** job_name: myjob_incremental_import propname: verbose propval: false propclass: SqoopOptions 56 rows in set (0.00 sec)此时执行sqoop job --list可以看到刚创建的job。
sqoop job --list ... Available jobs: myjob_incremental_import关于使用MySQL作为Sqoop元数据存储的配置,可以参考“Using SQOOP with MySQL as metastore”。
Failing Oozie Launcher, Main class [org.apache.oozie.action.hadoop.SqoopMain], main() threw exception, org/json/JSONObject在我的HDP2.5.0安装中没有该文件,需要自行下载,然后拷贝到相关目录。
cp java-json.jar /usr/hdp/current/sqoop-client/lib/ su - hdfs -c ‘hdfs dfs -put /usr/hdp/current/sqoop-client/lib/java-json.jar /user/oozie/share/lib/lib_20170208131207/sqoop/‘
oozie:x:506:504:Oozie user:/home/oozie:/bin/bash
su - oozie ssh-keygen ... 一路回车生成密钥文件 ... su - # 将oozie的公钥复制到root的authorized_keys文件中 cat /home/oozie/.ssh/id_rsa.pub >> authorized_keys完成以上配置后,在oozie用户下可以免密码ssh root@hdp2。关于oozie调用本地shell脚本可以参考“OOZIE调用shell脚本做mr计算挂死问题分析和解决”。
<?xml version="1.0" encoding="UTF-8"?>
<workflow-app xmlns="uri:oozie:workflow:0.4" name="RegularETL">
<start to="hdfsCommands"/>
<action name="hdfsCommands">
<fs>
<delete path=‘${nameNode}/data/ext/sales_order/*‘/>
</fs>
<ok to="fork-node"/>
<error to="fail"/>
</action>
<fork name="fork-node">
<path start="sqoop-customer" />
<path start="sqoop-product" />
<path start="sqoop-sales_order" />
</fork>
<action name="sqoop-customer">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>import</arg>
<arg>--connect</arg>
<arg>jdbc:mysql://172.16.1.127:3306/source?useSSL=false</arg>
<arg>--username</arg>
<arg>dwtest</arg>
<arg>--password</arg>
<arg>123456</arg>
<arg>--table</arg>
<arg>customer</arg>
<arg>--target-dir</arg>
<arg>/data/ext/customer</arg>
<arg>--delete-target-dir</arg>
<arg>--compress</arg>
</sqoop>
<ok to="joining"/>
<error to="fail"/>
</action>
<action name="sqoop-product">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<arg>import</arg>
<arg>--connect</arg>
<arg>jdbc:mysql://172.16.1.127:3306/source?useSSL=false</arg>
<arg>--username</arg>
<arg>dwtest</arg>
<arg>--password</arg>
<arg>123456</arg>
<arg>--table</arg>
<arg>product</arg>
<arg>--target-dir</arg>
<arg>/data/ext/product</arg>
<arg>--delete-target-dir</arg>
<arg>--compress</arg>
</sqoop>
<ok to="joining"/>
<error to="fail"/>
</action>
<action name="sqoop-sales_order">
<sqoop xmlns="uri:oozie:sqoop-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<command>job --meta-connect jdbc:mysql://hdp2/sqoop?user=sqoop&password=sqoop --exec myjob_incremental_im
port</command>
<archive>/user/oozie/share/lib/lib_20170208131207/sqoop/java-json.jar#java-json.jar</archive>
</sqoop>
<ok to="joining"/>
<error to="fail"/>
</action>
<join name="joining" to="psql-node"/>
<action name="psql-node">
<ssh xmlns="uri:oozie:ssh-action:0.1">
<host>${focusNodeLogin}</host>
<command>${myScript}</command>
<capture-output/>
</ssh>
<ok to="end"/>
<error to="fail"/>
</action>
<kill name="fail">
<message>Sqoop failed, error message[${wf:errorMessage(wf:lastErrorNode())}]</message>
</kill>
<end name="end"/>
</workflow-app> 这个工作流的DAG如图3所示。# 上传工作流文件 hdfs dfs -put -f workflow.xml /user/oozie/ # 上传MySQL JDBC驱动文件到Oozie的共享库目录中 hdfs dfs -put /var/lib/ambari-agent/tmp/mysql-connector-java-5.1.38-bin.jar /user/oozie/share/lib/lib_20170208131207/sqoop/
#!/bin/bash # 使用gpadmin用户执行定期装载函数 su - gpadmin -c ‘export PGPASSWORD=123456;psql -U dwtest -d dw -h hdp3 -c "set search_path=tds;select fn_regular_load ();"‘该shell文件内容很简单,可执行的就一行,调用psql执行HAWQ定期数据装载函数。
hdfs dfs -mkdir /apps/falcon/primaryCluster hdfs dfs -mkdir /apps/falcon/primaryCluster/staging hdfs dfs -mkdir /apps/falcon/primaryCluster/working
hdfs dfs -chown -R falcon:users /apps/falcon/*
hdfs dfs -chmod -R 777 /apps/falcon/primaryCluster/staging hdfs dfs -chmod -R 755 /apps/falcon/primaryCluster/working
su - hdfs -c ‘hdfs dfs -chmod -R 777 /data/ext‘等到下午一点开始第一次执行RegularETL Process,之后每半小时执行一次。Falcon的执行结果如图7所示。
HAWQ实践(五)——自动调度工作流(Oozie、Falcon)
标签:负载平衡 缺省 共享库 option 流操作 命令执行 分析 执行引擎 table
原文地址:http://blog.csdn.net/wzy0623/article/details/72476848