标签:mapreduce cdh5 jdk1.8 string bin ati oat etc combine
Hadoop2.6.0(IDEA中源码编译使用CDH5.7.3,对应Hadoop2.6.0),集群使用原生Hadoop2.6.4,JDK1.8,Intellij IDEA 14 。源码可以在https://github.com/fansy1990/linear_regression 下载。
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
if(randN <= 0) { // 如果randN 比0小,那么不再次打乱数据
context.write(randFloatKey,value);
return ;
}
if(++countI >= randN){// 如果randN等于1,那么每次随机的值都是不一样的
randFloatKey.set(random.nextFloat());
countI =0;
}
context.write(randFloatKey,value);
}theta0 = theta0 -alpha*(h(x)-y)x theta1 = theta1 -alpha*(h(x)-y)x其中,h(x)= theta0 + theta1 * x ;同时,需要注意这里的更新是同步更新,其核心代码如下所示:
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
float[] xy = Utils.str2float(value.toString().split(splitter));
float x = xy[0];
float y = xy[1];
// 同步更新 theta0 and theta1
lastTheta0 = theta0;
theta0 -= alpha *(theta0+theta1* x - y) * x; // 保持theta0 和theta1 不变
theta1 -= alpha *(lastTheta0 + theta1 * x -y) * x;// 保持theta0 和theta1 不变
}protected void cleanup(Context context) throws IOException, InterruptedException {
theta0_1.set(theta0 + splitter + theta1);
context.write(theta0_1,NullWritable.get());
}conf.setLong("mapreduce.input.fileinputformat.split.maxsize",700L);// 获取多个mapper;
job.setNumReduceTasks(0);protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
float[] xy = Utils.str2float(value.toString().split(splitter));
for(int i =0;i<thetas.size() ;i++){
// error = (theta0 + theta1 * x - y) ^2
thetaErrors[i] += (thetas.get(i)[0]+ thetas.get(i)[1] * xy[0] -xy[1]) *
(thetas.get(i)[0]+ thetas.get(i)[1] * xy[0] -xy[1]) ;
thetaNumbers[i]+= 1;
}
}protected void cleanup(Context context) throws IOException, InterruptedException {
for(int i =0;i<thetas.size() ;i++){
theta.set(thetas.get(i));
floatAndLong.set(thetaErrors[i],thetaNumbers[i]);
context.write(theta,floatAndLong);
}
}protected void reduce(FloatAndFloat key, Iterable<FloatAndLong> values, Context context) throws IOException, InterruptedException {
float sumF = 0.0f;
long sumL = 0L ;
for(FloatAndLong value:values){
sumF +=value.getSumFloat();
sumL += value.getSumLong();
}
theta_error.add(new float[]{key.getTheta0(),key.getTheta1(), (float)Math.sqrt((double)sumF / sumL)});
logger.info("theta:{}, error:{}", new Object[]{key.toString(),Math.sqrt(sumF/sumL)});
}protected void cleanup(Context context) throws IOException, InterruptedException {
// 如何加权?
// 方式1:如果误差越小,那么说明权重应该越大;
// 方式2:直接平均值
float [] theta_all = new float[2];
if("average".equals(method)){
// theta_all = theta_error.get(0);
for(int i=0;i< theta_error.size();i++){
theta_all[0] += theta_error.get(i)[0];
theta_all[1] += theta_error.get(i)[1];
}
theta_all[0] /= theta_error.size();
theta_all[1] /= theta_error.size();
} else {
float sumErrors = 0.0f;
for(float[] d:theta_error){
sumErrors += 1/d[2];
}
for(float[] d: theta_error){
theta_all[0] += d[0] * 1/d[2] /sumErrors;
theta_all[1] += d[1] * 1/d[2] /sumErrors;
}
}
context.write(new FloatAndFloat(theta_all),NullWritable.get());
}public static void main(String[] args) throws Exception {
args = new String[]{
"hdfs://master:8020/user/fanzhe/linear_regression.txt",
"hdfs://master:8020/user/fanzhe/shuffle_out",
"1"
} ;
ToolRunner.run(Utils.getConf(),new ShuffleDataJob(),args);
}6.1101,17.592 5.5277,9.1302 8.5186,13.662 。。。Shuffle输出:
public static void main(String[] args) throws Exception {
// <input> <output> <theta0;theta1;alpha> <splitter> // 注意第三个参数使用分号分割
args = new String[]{
"hdfs://master:8020/user/fanzhe/shuffle_out",
"hdfs://master:8020/user/fanzhe/linear_regression",
"1;0;0.01",
","
} ;
ToolRunner.run(Utils.getConf(),new LinearRegressionJob(),args);
}查看输出结果:public static void main(String[] args) throws Exception {
// <input> <output> <theta_path> <splitter> <average|weight>
args = new String[]{
"hdfs://master:8020/user/fanzhe/shuffle_out",
"hdfs://master:8020/user/fanzhe/single_linear_regression_error",
"hdfs://master:8020/user/fanzhe/linear_regression",
",",
"weight"
} ;
ToolRunner.run(Utils.getConf(),new SingleLinearRegressionError(),args);
}这里设置的合并theta值的方式使用加权,读者可以设置为average,从而使用平均值;public static void main(String[] args) throws Exception {
// <input> <output> <theta_path> <splitter>
args = new String[]{
"hdfs://master:8020/user/fanzhe/shuffle_out",
"hdfs://master:8020/user/fanzhe/last_linear_regression_error",
"hdfs://master:8020/user/fanzhe/single_linear_regression_error",
",",
} ;
ToolRunner.run(Utils.getConf(),new LastLinearRegressionError(),args);
}输出结果为:分享,成长,快乐
转载请注明blog地址:http://blog.csdn.net/fansy1990
标签:mapreduce cdh5 jdk1.8 string bin ati oat etc combine
原文地址:http://blog.csdn.net/fansy1990/article/details/52962863