码迷,mamicode.com
首页 > 其他好文 > 详细

hadoop1-构建电影推荐系统

时间:2014-06-25 09:46:57      阅读:329      评论:0      收藏:0      [点我收藏+]

标签:style   class   blog   code   java   http   

问题导读:
1. 推荐系统概述;
2. 推荐系统指标设计;
3. Hadoop并行算法;
4. 推荐系统架构;
5. MapReduce程序实现。
 
前言
Netflix电影推荐的百万美金比赛,把“推荐”变成了时下最热门的数据挖掘算法之一。也正是由于Netflix的比赛,让企业界和学科界有了更深层次的技术碰撞。引发了各种网站“推荐”热,个性时代已经到来。

一、 推荐系统概述
电子商务网站是个性化推荐系统重要地应用的领域之一,亚马逊就是个性化推荐系统的积极应用者和推广者,亚马逊的推荐系统深入到网站的各类商品,为亚马逊带来了至少30%的销售额。
不光是电商类,推荐系统无处不在。QQ,人人网的好友推荐;新浪微博的你可能感觉兴趣的人;优酷,土豆的电影推荐;豆瓣的图书推荐;大从点评的餐饮推荐;世纪佳缘的相亲推荐;天际网的职业推荐等。

推荐算法分类:

按数据使用划分:
协同过滤算法:UserCF, ItemCF, ModelCF
基于内容的推荐: 用户内容属性和物品内容属性
社会化过滤:基于用户的社会网络关系

按模型划分:
最近邻模型:基于距离的协同过滤算法
Latent Factor Mode(SVD):基于矩阵分解的模型
Graph:图模型,社会网络图模型

基于用户的协同过滤算法UserCF
基于用户的协同过滤,通过不同用户对物品的评分来评测用户之间的相似性,基于用户之间的相似性做出推荐。简单来讲就是:给用户推荐和他兴趣相似的其他用户喜欢的物品。
用例说明:
bubuko.com,布布扣
算法实现及使用介绍,请参考文章:Mahout推荐算法API详解

基于物品的协同过滤算法ItemCF
基于item的协同过滤,通过用户对不同item的评分来评测item之间的相似性,基于item之间的相似性做出推荐。简单来讲就是:给用户推荐和他之前喜欢的物品相似的物品。
用例说明:

算法实现及使用介绍,请参考文章:Mahout推荐算法API详解
注:基于物品的协同过滤算法,是目前商用最广泛的推荐算法。

协同过滤算法实现,分为2个步骤
  • 1. 计算物品之间的相似度
  • 2. 根据物品的相似度和用户的历史行为给用户生成推荐列表
有关协同过滤的另一篇文章,请参考:RHadoop实践系列之三 R实现MapReduce的协同过滤算法


二、 需求分析:推荐系统指标设计

下面我们将从一个公司案例出发来全面的解释,如何进行推荐系统指标设计。

案例介绍
Netflix电影推荐百万奖金比赛,http://www.netflixprize.com/
Netflix官方网站:www.netflix.com

Netflix,2006年组织比赛是的时候,是一家以在线电影租赁为生的公司。他们根据网友对电影的打分来判断用户有可能喜欢什么电影,并结合会员看过的电影以及口味偏好设置做出判断,混搭出各种电影风格的需求。
收集会员的一些信息,为他们指定个性化的电影推荐后,有许多冷门电影竟然进入了候租榜单。从公司的电影资源成本方面考量,热门电影的成本一般较高,如果Netflix公司能够在电影租赁中增加冷门电影的比例,自然能够提升自身盈利能力。
Netflix 公司曾宣称60%左右的会员根据推荐名单定制租赁顺序,如果推荐系统不能准确地猜测会员喜欢的电影类型,容易造成多次租借冷门电影而并不符合个人口味的会 员流失。为了更高效地为会员推荐电影,Netflix一直致力于不断改进和完善个性化推荐服务,在2006年推出百万美元大奖,无论是谁能最好地优化 Netflix推荐算法就可获奖励100万美元。到2009年,奖金被一个7人开发小组夺得,Netflix随后又立即推出第二个百万美金悬赏。这充分说 明一套好的推荐算法系统是多么重要,同时又是多么困难。
bubuko.com,布布扣
上图为比赛的各支队伍的排名!

补充说明:
1. Netflix的比赛是基于静态数据的,就是给定“训练级”,匹配“结果集”,“结果集”也是提前就做好的,所以这与我们每天运营的系统,其实是不一样的。
2. Netflix用于比赛的数据集是小量的,整个全集才666MB,而实际的推荐系统都要基于大量历史数据的,动不动就会上GB,TB等

所以,我们在真实的环境中设计推荐的时候,要全面考量数据量,算法性能,结果准确度等的指标。

推荐算法选型:基于物品的协同过滤算法ItemCF,并行实现
数据量:基于Hadoop架构,支持GB,TB,PB级数据量
算法检验:可以通过 准确率,召回率,覆盖率,流行度 等指标评判。
结果解读:通过ItemCF的定义,合理给出结果解释

三、 算法模型:Hadoop并行算法
这里我使用”Mahout In Action”书里,第一章第六节介绍的分步式基于物品的协同过滤算法进行实现。Chapter 6: Distributing recommendation computations
测试数据集:small.csv

  1. 1,101,5.0
  2. 1,102,3.0
  3. 1,103,2.5
  4. 2,101,2.0
  5. 2,102,2.5
  6. 2,103,5.0
  7. 2,104,2.0
  8. 3,101,2.0
  9. 3,104,4.0
  10. 3,105,4.5
  11. 3,107,5.0
  12. 4,101,5.0
  13. 4,103,3.0
  14. 4,104,4.5
  15. 4,106,4.0
  16. 5,101,4.0
  17. 5,102,3.0
  18. 5,103,2.0
  19. 5,104,4.0
  20. 5,105,3.5
  21. 5,106,4.0
复制代码


每行3个字段,依次是用户ID,电影ID,用户对电影的评分(0-5分,每0.5为一个评分点!)
算法的思想:
1. 建立物品的同现矩阵
2. 建立用户对物品的评分矩阵
3. 矩阵计算推荐结果

1). 建立物品的同现矩阵
按用户分组,找到每个用户所选的物品,单独出现计数及两两一组计数。

  1.         [101] [102] [103] [104] [105] [106] [107]
  2. [101]    5      3      4      4       2       2      1
  3. [102]    3      3      3      2       1       1      0
  4. [103]    4      3      4      3       1       2      0
  5. [104]    4      2      3      4       2       2      1
  6. [105]    2      1      1      2       2       1      1
  7. [106]    2      1      2      2       1       2      0
  8. [107]    1      0      0      1       1       0      1
复制代码



2). 建立用户对物品的评分矩阵
按用户分组,找到每个用户所选的物品及评分
     
  1.         U3
  2. [101] 2.0
  3. [102] 0.0
  4. [103] 0.0
  5. [104] 4.0
  6. [105] 4.5
  7. [106] 0.0
  8. [107] 5.0
复制代码



3). 矩阵计算推荐结果
同现矩阵*评分矩阵=推荐结果
bubuko.com,布布扣
图片摘自”Mahout In Action”

MapReduce任务设计
bubuko.com,布布扣
图片摘自”Mahout In Action”

解读MapRduce任务:
步骤1: 按用户分组,计算所有物品出现的组合列表,得到用户对物品的评分矩阵
步骤2: 对物品组合列表进行计数,建立物品的同现矩阵
步骤3: 合并同现矩阵和评分矩阵
步骤4: 计算推荐结果列表

四、 架构设计:推荐系统架构







上图中,左边是Application业务系统,右边是Hadoop的HDFS, MapReduce。

  • 业务系统记录了用户的行为和对物品的打分
  • 设置系统定时器CRON,每xx小时,增量向HDFS导入数据(userid,itemid,value,time)。
  • 完成导入后,设置系统定时器,启动MapReduce程序,运行推荐算法。
  • 完成计算后,设置系统定时器,从HDFS导出推荐结果数据到数据库,方便以后的及时查询。
五、 程序开发:MapReduce程序实现
  注意:原有参考是在一个job中完成所有步子,在这里将其分解了,每一步都是一个完整的job,(扩展,可以使用Mapreduce链实现)
win7的开发环境 和 Hadoop的运行环境 ,请参考文章:用Maven构建Hadoop项目
新建Java类:
Step1.java,按用户分组,计算所有物品出现的组合列表,得到用户对物品的评分矩阵
package recommend;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Step1 {

	public static class Map extends Mapper<Object, Text, IntWritable, Text>{
		private static IntWritable k = new IntWritable();
		private static Text v = new Text();
		
		protected void map(Object key, Text value, Context context) 
				throws java.io.IOException ,InterruptedException {
			String[] splits = value.toString().split(",");
			if(splits.length != 3){
				return;
			}
			int userId = Integer.parseInt(splits[0]);
			String itemId = splits[1];
			String pref = splits[2];
			//解析出用户ID和关联的商品ID与打分。并输出
			k.set(userId);
			v.set(itemId+":"+pref);
			context.write(k, v);
		};
	}
	public static class Reduce extends Reducer<IntWritable, Text, IntWritable, Text>{
		private static StringBuilder sub = new StringBuilder(256);
		private static Text v = new Text();
		
		protected void reduce(IntWritable key, Iterable<Text> values, Context context) 
				throws java.io.IOException ,InterruptedException {
			//合并用户的关联商品ID,作为一个组
for (Text v : values) {
				sub.append(v.toString()+",");
			}
			v.set(sub.toString());
			context.write(key, v);
			sub.delete(0, sub.length());
		};
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
		if(otherArgs.length != 2){
			System.err.println("Usage:Step1");
			System.exit(2);
		}
		Job job = new Job(conf,"Step1");
		job.setJarByClass(Step1.class);
		
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

 计算结果:

[root@hadoop ~]# hadoop dfs -cat /step1/*

1       102:3.0,103:2.5,101:5.0,

2       101:2.0,102:2.5,103:5.0,104:2.0,

3       107:5.0,101:2.0,104:4.0,105:4.5,

4       101:5.0,103:3.0,104:4.5,106:4.0,

5       101:4.0,102:3.0,103:2.0,104:4.0,105:3.5,106:4.0,

Step2.java,对物品组合列表进行计数,建立物品的同现矩阵

使用Step1的输出结果作为输入的数据文件

程序代码:
  
package recommend;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

//使用Step1的输出结果作为输入的数据文件
public class Step2 {

	public static class Map extends Mapper<Object, Text, Text, IntWritable>{
		private static Text k = new Text();
		private static IntWritable v = new IntWritable(1);
		
		protected void map(Object key, Text value, Context context) 
				throws java.io.IOException ,InterruptedException {
			String[] tokens = value.toString().split("\t")[1].split(",");
			
			//商品的相关是相互的,当然也包含自己。
			String item1Id = null;
			String item2Id = null;
			for (int i = 0; i < tokens.length; i++) {
				item1Id = tokens[i].split(":")[0];
				for (int j = 0; j < tokens.length; j++) {
					item2Id = tokens[j].split(":")[0];
					k.set(item1Id+":"+item2Id);
					context.write(k, v);
				}
			}
		};
	}
	public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable>{
		private static IntWritable v = new IntWritable();
		
		protected void reduce(Text key, Iterable<IntWritable> values, Context context) 
				throws java.io.IOException ,InterruptedException {
			int count = 0;
			//计算总的次数
			for (IntWritable temp : values) {
				count++;
			}
			v.set(count);
			context.write(key, v);
		};
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
		if(otherArgs.length != 2){
			System.err.println("Usage:Step2");
			System.exit(2);
		}
		Job job = new Job(conf,"Step2");
		job.setJarByClass(Step2.class);
		
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(IntWritable.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

 计算结果:

[root@hadoop ~]# hadoop dfs -cat /step2/*

101:101 5

101:102 3

101:103 4

101:104 4

101:105 2

101:106 2

101:107 1

102:101 3

102:102 3

102:103 3

102:104 2

102:105 1

102:106 1

103:101 4

103:102 3

103:103 4

103:104 3

103:105 1

103:106 2

104:101 4

104:102 2

104:103 3

104:104 4

104:105 2

104:106 2

104:107 1

105:101 2

105:102 1

105:103 1

105:104 2

105:105 2

105:106 1

105:107 1

106:101 2

106:102 1

106:103 2

106:104 2

106:105 1

106:106 2

107:101 1

107:104 1

107:105 1

107:107 1

Step3.java,合并同现矩阵和评分矩阵

(忽略了原有参考的step3_2这一步,因为他的输出是和step2的输出是一样的。)

package recommend;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Step3 {

	public static class Map extends Mapper<LongWritable, Text, IntWritable, Text>{
		private static IntWritable k = new IntWritable();
		private static Text v = new Text();
		
		protected void map(LongWritable key, Text value, Context context) 
				throws java.io.IOException ,InterruptedException {
			String[] tokens = value.toString().split("\t");
			String[] vector = tokens[1].split(",");
			for (String s : vector) {
				String[] t = s.split(":");
				//设置商品ID
				k.set(Integer.parseInt(t[0]));
				//设置用户ID:评分
				v.set(tokens[0]+":"+t[1]);
				context.write(k, v);
			}
		};
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
		if(otherArgs.length != 2){
			System.err.println("Usage:Step3");
			System.exit(2);
		}
		Job job = new Job(conf,"Step3");
		job.setJarByClass(Step3.class);
		
		job.setMapperClass(Map.class);
		
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(Text.class);
		
		FileInputFormat.addInputPath(job, new Path(args[0]));
		FileOutputFormat.setOutputPath(job, new Path(args[1]));
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}

计算结果:

[root@hadoop ~]# hadoop dfs -cat /step3_1/*

101     5:4.0

101     1:5.0

101     2:2.0

101     3:2.0

101     4:5.0

102     1:3.0

102     5:3.0

102     2:2.5

103     2:5.0

103     5:2.0

103     1:2.5

103     4:3.0

104     2:2.0

104     5:4.0

104     3:4.0

104     4:4.5

105     3:4.5

105     5:3.5

106     5:4.0

106     4:4.0

107     3:5.0

Step4.java,计算推荐结果列表(将step2和step3_1的输出作为map的输入文件)
程序代码:
package recommend;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

public class Step4 {

	public static class Map extends Mapper<Object, Text, IntWritable, Text>{
		private static IntWritable k = new IntWritable();
		private static Text v = new Text();
		
		private static java.util.Map<Integer, List<Coocurence>> matrix = new HashMap<Integer, List<Coocurence>>();
		
		protected void map(Object key, Text value, Context context) 
				throws java.io.IOException ,InterruptedException {
			//文件一格式、101		5:4.0   文件二格式、101:101		5
			String[] tokens = value.toString().split("\t");
			String[] v1 = tokens[0].split(":");
			String[] v2 = tokens[1].split(":");
			//文件二 101:101 	   5
			if(v1.length > 1){
				int itemId1 = Integer.parseInt(v1[0]);
				int itemId2 = Integer.parseInt(v1[1]);
				int num = Integer.parseInt(tokens[1]);
				
				List<Coocurence> list = null;
				if(matrix.containsKey(itemId1)){
					list = matrix.get(itemId1);
				}else{
					list = new ArrayList<Coocurence>();
				}
				list.add(new Coocurence(itemId1, itemId2, num));
				matrix.put(itemId1,list);
			}
			//文件一 101		5:4.0
			if(v2.length > 1){
				int itemId = Integer.parseInt(tokens[0]);
				int userId = Integer.parseInt(v2[0]);
				double pref = Double.parseDouble(v2[1]);
				
				k.set(userId);
				for (Coocurence c : matrix.get(itemId)) {
					v.set(c.getItemId2()+","+pref*c.getNum());
					context.write(k, v);
				}
			}
		};
	}
	public static class Reduce extends Reducer<IntWritable, Text, IntWritable, Text>{
		private static Text v = new Text();
		
		protected void reduce(IntWritable key, Iterable<Text> values, Context context) 
				throws java.io.IOException ,InterruptedException {
			java.util.Map<String, Double> result = new HashMap<String, Double>();
			for (Text t : values) {
				String[] str = t.toString().split(",");
				if(result.containsKey(str[0])){
					result.put(str[0], result.get(str[0])+Double.parseDouble(str[1]));
				}else {
					result.put(str[0], Double.parseDouble(str[1]));
				}
			}
			Iterator<String> iter = result.keySet().iterator();
			while (iter.hasNext()){
				String itemId = iter.next();
				double score = result.get(itemId);
				v.set(itemId+","+score);
				context.write(key, v);
			}
		};
	}
	public static void main(String[] args) throws Exception {
		Configuration conf = new Configuration();
		String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
		if(otherArgs.length != 3){
			System.err.println("Usage:Step4");
			System.exit(2);
		}
		Job job = new Job(conf,"Step4");
		job.setJarByClass(Step4.class);
		
		job.setMapperClass(Map.class);
		job.setReducerClass(Reduce.class);
		
		job.setMapOutputKeyClass(IntWritable.class);
		job.setMapOutputValueClass(Text.class);
		job.setOutputKeyClass(IntWritable.class);
		job.setOutputValueClass(Text.class);
		//设置step2和step3_1两个输入目录作为输入,所以系统需要三个参数配置
		FileInputFormat.addInputPath(job,new Path(args[0]));
		FileInputFormat.addInputPath(job,new Path(args[1]));
		FileOutputFormat.setOutputPath(job, new Path(args[2]));
		
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}
class Coocurence{
	private int itemId1;
	private int itemId2;
	private int num;
	
	public Coocurence(int itemId1, int itemId2, int num) {
		super();
		this.itemId1 = itemId1;
		this.itemId2 = itemId2;
		this.num = num;
	}
	public int getItemId1() {
		return itemId1;
	}
	public void setItemId1(int itemId1) {
		this.itemId1 = itemId1;
	}
	public int getItemId2() {
		return itemId2;
	}
	public void setItemId2(int itemId2) {
		this.itemId2 = itemId2;
	}
	public int getNum() {
		return num;
	}
	public void setNum(int num) {
		this.num = num;
	}
}

 计算结果:

[root@hadoop ~]# hadoop dfs -cat /output/*

1       107,5.0

1       106,18.0

1       105,15.5

1       104,33.5

1       103,39.0

1       102,31.5

1       101,44.0

2       107,4.0

2       106,20.5

2       105,15.5

2       104,36.0

2       103,41.5

2       102,32.5

2       101,45.5

3       107,15.5

3       106,16.5

3       105,26.0

3       104,38.0

3       103,24.5

3       102,18.5

3       101,40.0

4       107,9.5

4       106,33.0

4       105,26.0

4       104,55.0

4       103,53.5

4       102,37.0

4       101,63.0

5       107,11.5

5       106,34.5

5       105,32.0

5       104,59.0

5       103,56.5

5       102,42.5

5       101,68.0

--------------------------------------------------------------------------------

参考:http://www.aboutyun.com/thread-8155-1-1.html

这样我们就自己编程实现了MapReduce化基于物品的协同过滤算法。
RHadoop的实现方案,请参考文章:RHadoop实践系列之三 R实现MapReduce的协同过滤算法
Mahout的实现方案,请参考文章:Mahout分步式程序开发 基于物品的协同过滤ItemCF
我已经把整个MapReduce的实现都放到了github上面:
https://github.com/bsspirit/maven_hadoop_template/releases/tag/recommend

hadoop1-构建电影推荐系统,布布扣,bubuko.com

hadoop1-构建电影推荐系统

标签:style   class   blog   code   java   http   

原文地址:http://www.cnblogs.com/jsunday/p/3807320.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!