标签:des style blog http java color
软件版本:
windows7: Tomcat7、JDK7、Spring4.0.2、Struts2.3、Hibernate4.3、myeclipse10.0、easyui;Linux(centos6.5):Hadoop2.4、Mahout1.0、JDK7;
使用Web工程调用Mahout的相关算法,提供监控,查看任务的执行状态。
自建Web项目,项目首页如下:
[root@node33 data]# jps 6033 NodeManager 5543 NameNode 5629 DataNode 5942 ResourceManager 41611 Jps 5800 SecondaryNameNode 6412 JobHistoryServer
<property> <name>mapreduce.jobhistory.address</name> <value>node33:10020</value> <description>MapReduce JobHistory Server IPC host:port</description> </property>yarn-default.xml:
<name>yarn.application.classpath</name>
<value>
$HADOOP_CONF_DIR,
$HADOOP_COMMON_HOME/share/hadoop/common/*,
$HADOOP_COMMON_HOME/share/hadoop/common/lib/*,
$HADOOP_HDFS_HOME/share/hadoop/hdfs/*,
$HADOOP_HDFS_HOME/share/hadoop/hdfs/lib/*,
$HADOOP_YARN_HOME/share/hadoop/yarn/*,
$HADOOP_YARN_HOME/share/hadoop/yarn/lib/*
</value><name>yarn.resourcemanager.hostname</name>
<value>node33</value>注意classpath的路径是集群的相应路径;mvn -Dhadoop2.version=2.4.1 -DskipTests clean install)导入的包有:
<Context path ="/mh" docBase ="D:\workspase\hadoop_hbase\MahoutAlgorithmPlatform2.1\WebRoot" privileged ="true" reloadable ="false" > </Context>
public int checkConnection(String fsStr,String rm) throws IOException{
Configuration conf = new Configuration();
conf.set("fs.defaultFS", fsStr);
conf.set("yarn.resourcemanager.address", rm);
conf.set("mapreduce.framework.name", "yarn");
FileSystem fs = FileSystem.get(conf);
boolean fsOnline=fs.exists(new Path("/"));
if(!fsOnline){
return 1;
}
JobClient jc = new JobClient(conf);
ClusterStatus cs = jc.getClusterStatus();
if(!"RUNNING".equals(cs.getJobTrackerStatus().toString())){
return 0;
}
// 集群验证成功
HadoopUtils.setConf(conf);
HadoopUtils.setFs(fs);
// 通过判断Hadoop.getConf()是否为null来确定是否已经配置过集群
return 3;
}主要通过两个方面:1、检查HDFS文件;2、检查集群状态是否是running;public static void initialCurrentJobs(int nextJobNum) throws IOException{
/*if(list!=null&&list.size()==10){
list.clear();
}*/
list.clear(); // 清空上次遗留
JobStatus[] jbs=getJc().getAllJobs();
JobID jID = findLastJob(jbs).getJobID();
if(jID==null){
// the first time start the cluster , will be fixed next time
// TODO fix the bug
log.info("The cluster is started before and not running any job !!!");
}
log.info("The last job id is :{}", jID.toString());
for(int i=1;i<=nextJobNum;i++){
CurrentJobInfo cj = new CurrentJobInfo();
cj.setJobId(new JobID(jID.getJtIdentifier(),jID.getId()+i));
list.add(cj);
}
}这里需要注意的是,如果集群是第一次启动,且没有运行MR任务的话,那么获取的任务ID为空,无法初始化(这个在下个版本修复); public static List<CurrentJobInfo> getCurrentJobs() throws IOException{
for(int i=0;i<list.size();i++){
CurrentJobInfo iJob = list.get(i);
RunningJob runningJob =findGivenJob(iJob.getJobId().toString());
if(runningJob==null){
break;
}
if(i==list.size()-1){ // 放在设置的前面
finished=runningJob.isComplete();
}
iJob.setJobName(runningJob.getJobName());
iJob.setJobIdStr(runningJob.getJobStatus().getJobID().toString());
iJob.setMapProgress(Utils.toPercent(runningJob.mapProgress(),2));
iJob.setRedProgress(Utils.toPercent(runningJob.reduceProgress(), 2));
iJob.setState(JobStatus.getJobRunState(runningJob.getJobState())); // 有时map和reduce都到1时,此值仍是Running,需处理
}
return list;
}获取到任务信息后,在任务监控界面就可以监控到任务的运行状态。/**
* 读取聚类中心向量
* @param conf
* @param centerPathDir
* @return
* @throws IOException
*/
public static String readCenter(Configuration conf,String centerPathDir) throws IOException{
StringBuffer buff = new StringBuffer();
Path input = new Path(centerPathDir, "part-*");
if(!HadoopUtils.getFs().exists(input)){
return input+" not exist ,please check the input";
}
for(ClusterWritable cl:new SequenceFileDirValueIterable<ClusterWritable>(input, PathType.GLOB, conf)){
buff.append(cl.getValue().asFormatString(null)).append("\n");
}
return buff.toString();
}Mahout算法调用展示平台2.1,布布扣,bubuko.com
标签:des style blog http java color
原文地址:http://blog.csdn.net/fansy1990/article/details/37339079