如何评价spark的机器学习框架 和 tensorflow的机器学习系统

谷歌的机器学习系统 TensorFlow 开源了 - 文章 - 伯乐在线
& 谷歌的机器学习系统 TensorFlow 开源了
Recode 中文站 11 月 10 日报道
机器学习作为人工智能的一种类型,可以让软件根据大量的数据来对未来的情况进行阐述或预判。如今,机器学习在硅谷非常流行,并吸引了多家知名企业竞相涉猎该领域。例如,Facebook 就大力投资机器学习;微软不甘示弱,也在加大对机器学习的投资力度;苹果也在悄然涉足这一领域。谷歌当然不甘落后,如今又开始加大在机器学习领域的研究。为了保持自己的创新力——也为了吸引宝贵的人才——谷歌需要维持自己在先进技术方面的领导者地位。
事实上,多年以来,谷歌内部一直在使用一种机器学习系统,代号为“TensorFlow”。如今,谷歌正在将此系统成为开源系统,并将此系统的参数公布给业界工程师、学者和拥有大量编程能力的技术人员。
此举极具谷歌的风格。打个不太恰当的比喻,如今谷歌对待 TensorFlow 系统,有点类似于该公司对待旗下移动操作系统 Android。长期以来,谷歌一直非常积极地参与到机器学习相关的科研事务之中。与之相比,作为谷歌竞争对手的苹果公司就没有这样做,尽管苹果可能会采取类似的方法来寻求类似的目的,例如在语音识别、地图甚至是在可能的汽车制造方面。如果更多的数据科学家开始使用谷歌的系统来从事机器学习方面的研究,那么这将有利于谷歌对日益发展的机器学习行业拥有更多的主导权。
另外,更为重要的是,作为人工智能的一个分支,机器学习正在逐步从学术论文向实际产品转化。谷歌此前已经推出了该公司的第一代机器学习系统,代号为“DistBelief”,以此来识别谷歌应用中的语言和 Photos 中的图片。如今的 TensorFlow 系统也是在那些产品的基础之上而发展起来的,并对早期系统的不足进行了改进。除此之外,谷歌上周还宣布在旗下邮箱应用 Inbox 里推出智能自动回复功能:Smart Reply,它能帮助用户筛选适合语境的回复短句。Smart Reply 正是基于强大的机器学习系统,对海量邮件里的场景、邮件写作风格和写作语气进行分析,从而生成一些场景化极强的回复内容。
除此之外,机器学习还在向纵深领域发展,如今又诞生了“深度学习”业务。作为机器学习的一种,深度学习能够支持像由致幻药引起幻觉的神经网络图像识别等功能。谷歌公司内部目前已经在 1200 多个不同的产品分类和产品代码库对深度学习进行了测试。
谷歌首席执行官桑德·皮采(Sundar Pichai)在最近一次财报电话会议上表示:“机器学习是一种核心的转变方式,通过机器学习,我们再重新思考我们所从事的一切。我们目前正处于初期阶段,但用户将看到谷歌以系统的方式来思考我们将如何把机器学习应用到所有的这些领域。”
谷歌方面在解释 TensorFlow 时声称,“应当有一种真正的工具,能够让研究人员用来尝试他们疯狂的创意。如果那些创意产生作用的话,那么他们将能够直接转化成产品,而不需要研究人员再重新编写代码。”
可能感兴趣的话题
关于伯乐在线博客
在这个信息爆炸的时代,人们已然被大量、快速并且简短的信息所包围。然而,我们相信:过多“快餐”式的阅读只会令人“虚胖”,缺乏实质的内涵。伯乐在线内容团队正试图以我们微薄的力量,把优秀的原创文章和译文分享给读者,为“快餐”添加一些“营养”元素。
新浪微博:
推荐微信号
(加好友请注明来意)
– 好的话题、有启发的回复、值得信赖的圈子
– 分享和发现有价值的内容与观点
– 为IT单身男女服务的征婚传播平台
– 优秀的工具资源导航
– 翻译传播优秀的外文文章
– 国内外的精选文章
– UI,网页,交互和用户体验
– 专注iOS技术分享
– 专注Android技术分享
– JavaScript, HTML5, CSS
– 专注Java技术分享
– 专注Python技术分享
& 2017 伯乐在线&>&Google开源的机器学习框架Tensorflow谷歌官方教程
Google开源的机器学习框架Tensorflow谷歌官方教程
上传大小:5.46MB
Tensorflow官方教程,整理自tensorflow.org
综合评分:4(1位用户评分)
所需积分:
下载个数:90
{%username%}回复{%com_username%}{%time%}\
/*点击出现回复框*/
$(".respond_btn").on("click", function (e) {
$(this).parents(".rightLi").children(".respond_box").show();
e.stopPropagation();
$(".cancel_res").on("click", function (e) {
$(this).parents(".res_b").siblings(".res_area").val("");
$(this).parents(".respond_box").hide();
e.stopPropagation();
/*删除评论*/
$(".del_comment_c").on("click", function (e) {
var id = $(e.target).attr("id");
$.getJSON('/index.php/comment/do_invalid/' + id,
function (data) {
if (data.succ == 1) {
$(e.target).parents(".conLi").remove();
alert(data.msg);
$(".res_btn").click(function (e) {
var q = $("#form1").serializeArray();
console.log(q);
var res_area_r = $.trim($(".res_area_r").val());
if (res_area_r == '') {
$(".res_text").css({color: "red"});
$.post("/index.php/comment/do_comment_reply/", q,
function (data) {
if (data.succ == 1) {
var $target,
evt = e || window.
$target = $(evt.target || evt.srcElement);
var $dd = $target.parents('dd');
var $wrapReply = $dd.find('.respond_box');
console.log($wrapReply);
var mess = $(".res_area_r").val();
var str = str.replace(/{%header%}/g, data.header)
.replace(/{%href%}/g, 'http://' + window.location.host + '/user/' + data.username)
.replace(/{%username%}/g, data.username)
.replace(/{%com_username%}/g, _username)
.replace(/{%time%}/g, data.time)
.replace(/{%id%}/g, data.id)
.replace(/{%mess%}/g, mess);
$dd.after(str);
$(".respond_box").hide();
$(".res_area_r").val("");
$(".res_area").val("");
$wrapReply.hide();
alert(data.msg);
}, "json");
/*删除回复*/
$(".rightLi").on("click",'.del_comment_r', function (e) {
var id = $(e.target).attr("id");
$.getJSON('/index.php/comment/do_comment_del/' + id,
function (data) {
if (data.succ == 1) {
$(e.target).parent().parent().parent().parent().parent().remove();
$(e.target).parents('.res_list').remove()
alert(data.msg);
//填充回复
function KeyP(v) {
$(".res_area_r").val($.trim($(".res_area").val()));
评论共有1条
很好非常感谢。
审核通过送C币
正则表达式学习电子书
文本编辑器转辑
创建者:zhouyue777111
MATLAb Robotic Toolbox 合集
创建者:kroc_kroc
上传者其他资源上传者专辑
Tensorflow教程01Introduction
MNIST Data手写数字图片识别数据集
自编数独解谜助手HTML+Javascript
开发技术热门标签
VIP会员动态
前端开发重难点
17年软考最新真题及解析
物联网全栈开发专题
二十大技术领域优质资源
spring mvc+mybatis+mysql+maven+bootstrap 整合实现增删查改简单实例.zip
CSDN&VIP年卡&4000万程序员的必选
Google开源的机器学习框架Tensorflow谷歌官方教程
会员到期时间:
剩余下载个数:
剩余C币:0
剩余积分:6726
积分不足!
资源所需积分
当前拥有积分
您可以选择
程序员的必选
绿色安全资源
资源所需积分
当前拥有积分
当前拥有C币
(仅够下载10个资源)
全站1200个资源免积分下载
资源所需积分
当前拥有积分
当前拥有C币
全站1200个资源免积分下载
资源所需积分
当前拥有积分
当前拥有C币
您的积分不足,将扣除 10 C币
全站1200个资源免积分下载
你当前的下载分为234。
你还不是VIP会员
开通VIP会员权限,免积分下载
你下载资源过于频繁,请输入验证码
你下载资源过于频繁,请输入验证码
您因违反CSDN下载频道规则而被锁定帐户,如有疑问,请联络:!
若举报审核通过,可奖励20下载分
被举报人:
hangzhouhao
举报的资源分:
请选择类型
资源无法下载
资源无法使用
标题与实际内容不符
含有危害国家安全内容
含有反动色情等内容
含广告内容
版权问题,侵犯个人或公司的版权
*详细原因:
Google开源的机器学习框架Tensorflow谷歌官方教程9360人阅读
【机器学习】(4)
Tensorflow
TensorFlow 是谷歌开源的机器学习框架,相对于其它现有框架来说,其具有比较好的扩展性,但是也牺牲了它的速度。
下面介绍Tensorflow 的基本使用:
1, tensorflow 基本操作:
import tensorflow as tf
import numpy as np
a = tf.placeholder("float") # 创建符号变量
b = tf.placeholder("float")
y = tf.mul(a, b) # 乘法操作,作用在符号变量上。
sess = tf.Session() # 创建会话,计算符号变量表达式
print "%f + %f = %f"%(4, 5,
sess.run(y, feed_dict={a: a1, b: b1})
# 生成训练数据 + 噪声,下面为了拟合 $$ Y = 2X $$
trX = np.linspace(-1, 1, 101)
trY = 2 * trX + np.random.randn(*trX.shape) * 0.33 # y=2x,但是加入了噪声
X = tf.placeholder("float") #输入输出符号变量
Y = tf.placeholder("float")
# 定义模型
def model(X, w):
return tf.mul(X, w) # 线性回归只需要调用乘法操作即可。
# 模型权重 W 用变量表示
w = tf.Variable(0.0, name="weights") # 共享变量
y_model = model(X, w)
# 定义损失函数
cost = (tf.pow(Y-y_model, 2)) # 平方损失函数
# 构建优化器,最小化损失函数。
train_op = tf.train.GradientDescentOptimizer(0.01).minimize(cost)
# 构建会话
sess = tf.Session()
# 初始化所有的符号共享变量
init = tf.initialize_all_variables()
# 运行会话
sess.run(init)
# 迭代训练
for i in range(100):
for (x, y) in zip(trX, trY):
sess.run(train_op, feed_dict={X: x, Y: y})
# 打印权重w
print(sess.run(w))
y=sigmoid(X*W+b)
def init_weights(shape):
return tf.Variable(tf.random_normal(shape, stddev=0.01))
def model(X, w):
return tf.matmul(X, w)
mnist = input_data.read_data_sets("MNIST_data/", one_hot=True)
trX, trY, teX, teY = mnist.train.images, mnist.train.labels, mnist.test.images, mnist.test.labels
X = tf.placeholder("float", [None, 784])
Y = tf.placeholder("float", [None, 10])
w = init_weights([784, 10])
py_x = model(X, w)
cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(py_x, Y))
train_op = tf.train.GradientDescentOptimizer(0.05).minimize(cost)
predict_op = tf.argmax(py_x, 1)
sess = tf.Session()
init = tf.initialize_all_variables()
sess.run(init)
for i in range(100):
for start, end in zip(range(0, len(trX), 128), range(128, len(trX), 128)):
sess.run(train_op, feed_dict={X: trX[start:end], Y: trY[start:end]})
print i, np.mean(np.argmax(teY, axis=1) ==
sess.run(predict_op, feed_dict={X: teX, Y: teY}))
Tensorflow基本教程[
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:653205次
积分:5124
积分:5124
排名:第5682名
原创:77篇
评论:722条
新浪微博:
阅读:6115
阅读:27213
阅读:143065
(2)(1)(3)(3)(3)(4)(7)(3)(12)(4)(9)(3)(2)(5)(3)(2)(7)(5)(1)
(window.slotbydup = window.slotbydup || []).push({
id: '4740887',
container: s,
size: '250,250',
display: 'inlay-fix'苹果/安卓/wp
积分 54739, 距离下一级还需 1156 积分
权限: 自定义头衔, 签名中使用图片, 隐身, 设置帖子权限, 设置回复可见, 签名中使用代码
道具: 彩虹炫, 涂鸦板, 雷达卡, 热点灯, 金钱卡, 显身卡, 匿名卡, 抢沙发, 提升卡, 沉默卡, 千斤顶, 变色卡, 置顶卡
购买后可立即获得
权限: 隐身
道具: 金钱卡, 彩虹炫, 雷达卡, 热点灯, 涂鸦板
TA的文库&&
摘要: 机器学习(尤其是深度学习)最近已经在语音识别、图像识别、自然语言处理和推荐/搜索引擎等方面取得了变革性的成功。这些技术在自动驾驶汽车、数字医疗系统、CRM、广告、物联网等方面的应用非常有前途。当然,资本带 ...
本帖隐藏的内容
09:18:04 上传
支持楼主:、
购买后,论坛将奖励 10 元论坛资金给楼主,以表示您对TA发好贴的支持
载入中......
TensorFlow 具有优势
鼓励积极发帖讨论
总评分:&论坛币 + 20&
谢谢楼主分享!
鼓励积极发帖讨论
总评分:&论坛币 + 20&
看看学习学习
鼓励积极发帖讨论
总评分:&论坛币 + 20&
鼓励积极发帖讨论
总评分:&论坛币 + 20&
ding!!!!!!!!
鼓励积极发帖讨论
总评分:&论坛币 + 20&
鼓励积极发帖讨论
总评分:&论坛币 + 20&
鼓励积极发帖讨论
总评分:&论坛币 + 20&
鼓励积极发帖讨论
总评分:&论坛币 + 20&
初级热心勋章
初级热心勋章
中级热心勋章
中级热心勋章
高级热心勋章
高级热心勋章
特级热心勋章
高级热心勋章
初级信用勋章
初级信用勋章
&nbsp&nbsp|
&nbsp&nbsp|
&nbsp&nbsp|
&nbsp&nbsp|
&nbsp&nbsp|
&nbsp&nbsp|
如有投资本站或合作意向,请联系(010-);
邮箱:service@pinggu.org
投诉或不良信息处理:(010-)
论坛法律顾问:王进律师1746人阅读
Spark机器学习(3)
1 什么是ALS
ALS是交替最小二乘(alternating least squares)的简称。在机器学习中,ALS特指使用交替最小二乘求解的一个协同推荐算法。它通过观察到的所有用户给商品的打分,来推断每个用户的喜好并向用户推荐适合的商品。举个例子,我们看下面一个8*8的用户打分矩阵。
这个矩阵的每一行代表一个用户(u1,u2,…,u8)、每一列代表一个商品(v1,v2,…,v8)、用户的打分为1-9分。这个矩阵只显示了观察到的打分,我们需要推测没有观察到的打分。比如(u6,v5)打分多少?如果以数独的方式来解决这个问题,可以得到唯一的结果。
因为数独的规则很强,每添加一条规则,就让整个系统的自由度下降一个量级。当我们满足所有的规则时,整个系统的自由度就降为1了,也就得出了唯一的结果。对于上面的打分矩阵,如果我们不添加任何条件的话,也即打分之间是相互独立的,我们就没法得到(u6,v5)的打分。
所以在这个用户打分矩阵的基础上,我们需要提出一个限制其自由度的合理假设,使得我们可以通过观察已有打分来猜测未知打分。
ALS的核心就是这样一个假设:打分矩阵是近似低秩的。换句话说,就是一个m*n的打分矩阵可以由分解的两个小矩阵U(m*k)和V(k*n)的乘积来近似,即A=UVT,k&=m,n。这就是ALS的矩阵分解方法。这样我们把系统的自由度从O(mn)降到了O((m+n)k)。
那么ALS的低秩假设为什么是合理的呢?我们描述一个人的喜好经常是在一个抽象的低维空间上进行的,并不需要一一列出他喜好的事物。例如,我喜好看侦探影片,可能代表我喜欢《神探夏洛特》、《神探狄仁杰》等。这些影片都符合我对自己喜好的描述,也就是说他们在这个抽象的低维空间的投影和我的喜好相似。
再抽象一些来描述这个问题,我们把某个人的喜好映射到了低维向量ui上,同时将某个影片的特征映射到了维度相同的向量vj上,那么这个人和这个影片的相似度就可以表述成这两个向量之间的内积uTivj 。
我们把打分理解成相似度,那么打分矩阵A就可以由用户喜好矩阵和产品特征矩阵的乘积UVT来近似了。
低维空间的选取是一个问题。这个低维空间要能够很好的区分事物,那么就需要一个明确的可量化目标,这就是重构误差。在ALS中我们使用F范数来量化重构误差,就是每个元素重构误差的平方和。这里存在一个问题,我们只观察到部分打分,A中的大量未知元是我们想推断的,所以这个重构误差是包含未知数的。
解决方案很简单:只计算已知打分的重构误差。
后面的章节我们将从原理上讲解spark中实现的ALS模型。
2 spark中ALS的实现原理
Spark利用交换最小二乘解决矩阵分解问题分两种情况:数据集是显式反馈和数据集是隐式反馈。由于隐式反馈算法的原理是在显示反馈算法原理的基础上作的修改,所以我们在此只会具体讲解数据集为隐式反馈的算法。
算法实现所依据的文献见参考文献【1】。
从广义上讲,推荐系统基于两种不同的策略:基于内容的方法和基于协同过滤的方法。Spark中使用协同过滤的方式。协同过滤分析用户以及用户相关的产品的相关性,用以识别新的用户-产品相关性。协同过滤系统需要的唯一信息是用户过去的行为信息,比如对产品的评价信息。协同过滤是领域无关的,所以它可以方便解决基于内容方法难以解决的许多问题。
推荐系统依赖不同类型的输入数据,最方便的是高质量的显式反馈数据,它们包含用户对感兴趣商品明确的评价。例如,Netflix收集的用户对电影评价的星星等级数据。但是显式反馈数据不一定总是找得到,因此推荐系统可以从更丰富的隐式反馈信息中推测用户的偏好。
隐式反馈类型包括购买历史、浏览历史、搜索模式甚至鼠标动作。例如,购买同一个作者许多书的用户可能喜欢这个作者。
许多研究都集中在处理显式反馈,然而在很多应用场景下,应用程序重点关注隐式反馈数据。因为可能用户不愿意评价商品或者由于系统限制我们不能收集显式反馈数据。在隐式模型中,一旦用户允许收集可用的数据,在客户端并不需要额外的显式数据。文献中的系统避免主动地向用户收集显式反馈信息,所以系统仅仅依靠隐式信息。
了解隐式反馈的特点非常重要,因为这些特质使我们避免了直接调用基于显式反馈的算法。最主要的特点有如下几种:
没有负反馈。通过观察用户行为,我们可以推测那个商品他可能喜欢,然后购买,但是我们很难推测哪个商品用户不喜欢。这在显式反馈算法中并不存在,因为用户明确告诉了我们哪些他喜欢哪些他不喜欢。
隐式反馈是内在的噪音。虽然我们拼命的追踪用户行为,但是我们仅仅只是猜测他们的偏好和真实动机。例如,我们可能知道一个人的购买行为,但是这并不能完全说明偏好和动机,因为这个商品可能作为礼物被购买而用户并不喜欢它。
显示反馈的数值值表示偏好(preference),隐式回馈的数值值表示信任(confidence)。基于显示反馈的系统用星星等级让用户表达他们的喜好程度,例如一颗星表示很不喜欢,五颗星表示非常喜欢。基于隐式反馈的数值值描述的是动作的频率,例如用户购买特定商品的次数。一个较大的值并不能表明更多的偏爱。但是这个值是有用的,它描述了在一个特定观察中的信任度。
一个发生一次的事件可能对用户偏爱没有用,但是一个周期性事件更可能反映一个用户的选择。
评价隐式反馈推荐系统需要合适的手段。
2.2 显式反馈模型
潜在因素模型由一个针对协同过滤的交替方法组成,它以一个更加全面的方式发现潜在特征来解释观察的ratings数据。我们关注的模型由奇异值分解(SVD)推演而来。一个典型的模型将每个用户u(包含一个用户-因素向量ui)和每个商品v(包含一个用户-因素向量vj)联系起来。
预测通过内积rij=uTivj来实现。另一个需要关注的地方是参数估计。许多当前的工作都应用到了显式反馈数据集中,这些模型仅仅基于观察到的rating数据直接建模,同时通过一个适当的正则化来避免过拟合。公式如下:
在公式(2.1)中,lambda是正则化的参数。正规化是为了防止过拟合的情况发生,具体参见文献【3】。这样,我们用最小化重构误差来解决协同推荐问题。我们也成功将推荐问题转换为了最优化问题。
2.3 隐式反馈模型
在显式反馈的基础上,我们需要做一些改动得到我们的隐式反馈模型。首先,我们需要形式化由rij变量衡量的信任度的概念。我们引入了一组二元变量pij ,它表示用户u对商品v的偏好。pij的公式如下:
换句话说,如果用户购买了商品,我们认为用户喜欢该商品,否则我们认为用户不喜欢该商品。然而我们的信念(beliefs)与变化的信任(confidence)等级息息相关。首先,很自然的,pij的值为0和低信任有关。用户对一个商品没有得到一个正的偏好可能源于多方面的原因,并不一定是不喜欢该商品。例如,用户可能并不知道该商品的存在。
另外,用户购买一个商品也并不一定是用户喜欢它。因此我们需要一个新的信任等级来显示用户偏爱某个商品。一般情况下,rij越大,越能暗示用户喜欢某个商品。因此,我们引入了一组变量cij,它衡量了我们观察到pij的信任度。cij一个合理的选择如下所示:
按照这种方式,我们存在最小限度的信任度,并且随着我们观察到的正偏向的证据越来越多,信任度也会越来越大。
我们的目的是找到用户向量ui以及商品向量vj来表明用户偏好。这些向量分别是用户因素(特征)向量和商品因素(特征)向量。本质上,这些向量将用户和商品映射到一个公用的隐式因素空间,从而使它们可以直接比较。这和用于显式数据集的矩阵分解技术类似,但是包含两点不一样的地方:
(1)我们需要考虑不同的信任度,(2)最优化需要考虑所有可能的u,v对,而不仅仅是和观察数据相关的u,v对。因此,通过最小化下面的损失函数来计算相关因素(factors)。
2.4 求解最小化损失函数
考虑到损失函数包含m*n个元素,m是用户的数量,n是商品的数量。一般情况下,m*n可以到达几百亿。这么多的元素应该避免使用随机梯度下降法来求解,因此,spark选择使用交替最优化方式求解。
公式(2.1)和公式(2.4)是非凸函数,无法求解最优解。但是,固定公式中的用户-特征向量或者商品-特征向量,公式就会变成二次方程,可以求出全局的极小值。交替最小二乘的计算过程是:交替的重新计算用户-特征向量和商品-特征向量,每一步都保证降低损失函数的值,直到找到极小值。
交替最小二乘法的处理过程如下所示:
3 ALS在spark中的实现
在spark的源代码中,ALS算法实现于org.apache.spark.ml.recommendation.ALS.scala文件中。我们以官方文档中的例子为起点,来分析ALS算法的分布式实现。下面是官方的例子:
val data = sc.textFile("data/mllib/als/test.data")
val ratings = data.map(_.split(',') match { case Array(user, item, rate) =&
Rating(user.toInt, item.toInt, rate.toDouble)
val rank = 10
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations, 0.01)
从代码中我们知道,训练模型用到了ALS.scala文件中的train方法,下面我们将详细介绍train方法的实现。在此之前,我们先了解一下train方法的参数表示的含义。
def train(
ratings: RDD[Rating[ID]],
rank: Int = 10,
numUserBlocks: Int = 10,
numItemBlocks: Int = 10,
maxIter: Int = 10,
regParam: Double = 1.0,
implicitPrefs: Boolean = false,
alpha: Double = 1.0,
nonnegative: Boolean = false,
intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK,
finalRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK,
checkpointInterval: Int = 10,
seed: Long = 0L): MatrixFactorizationModel
以上定义中,ratings指用户提供的训练数据,它包括用户id集、商品id集以及相应的打分集。rank表示隐含因素的数量,也即特征的数量。numUserBlocks和numItemBlocks分别指用户和商品的块数量,即分区数量。maxIter表示迭代次数。regParam表示最小二乘法中lambda值的大小。
implicitPrefs表示我们的训练数据是否是隐式反馈数据。Nonnegative表示求解的最小二乘的值是否是非负,根据Nonnegative的值的不同,spark使用了不同的求解方法。
下面我们分步骤分析train方法的处理流程。
(1) 初始化ALSPartitioner和LocalIndexEncoder。
ALSPartitioner实现了基于hash的分区,它根据用户或者商品id的hash值来进行分区。LocalIndexEncoder对(blockid,localindex)即(分区id,分区内索引)进行编码,并将其转换为一个整数,这个整数在高位存分区ID,在低位存对应分区的索引,在空间上尽量做到了不浪费。
同时也可以根据这个转换的整数分别获得blockid和localindex。这两个对象在后续的代码中会用到。
val userPart = new ALSPartitioner(numUserBlocks)
val itemPart = new ALSPartitioner(numItemBlocks)
val userLocalIndexEncoder = new LocalIndexEncoder(userPart.numPartitions)
val itemLocalIndexEncoder = new LocalIndexEncoder(itemPart.numPartitions)
class HashPartitioner(partitions: Int) extends Partitioner {
def numPartitions: Int = partitions
def getPartition(key: Any): Int = key match {
case null =& 0
case _ =& Utils.nonNegativeMod(key.hashCode, numPartitions)
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =&
h.numPartitions == numPartitions
override def hashCode: Int = numPartitions
private[recommendation] class LocalIndexEncoder(numBlocks: Int) extends Serializable {
private[this] final val numLocalIndexBits =
math.min(java.lang.Integer.numberOfLeadingZeros(numBlocks - 1), 31)
private[this] final val localIndexMask = (1 && numLocalIndexBits) - 1
def encode(blockId: Int, localIndex: Int): Int = {
(blockId && numLocalIndexBits) | localIndex
def blockId(encoded: Int): Int = {
encoded &&& numLocalIndexBits
def localIndex(encoded: Int): Int = {
encoded & localIndexMask
(2) 根据nonnegative参数选择解决矩阵分解的方法。
如果需要解的值为非负,即nonnegative为true,那么用非负最小二乘(NNLS)来解,如果没有这个限制,用乔里斯基(Cholesky)分解来解。
val solver = if (nonnegative) new NNLSSolver else new CholeskySolver
乔里斯基分解分解是把一个对称正定的矩阵表示成一个上三角矩阵U的转置和其本身的乘积的分解。在ml代码中,直接调用封装的dppsv方法实现。
lapack.dppsv(“u”, k, 1, ne.ata, ne.atb, k, info)
可以深入dppsv代码(Fortran代码)了解更深的细节。我们分析的重点是非负正则化最小二乘的实现,因为在某些情况下,方程组的解为负数是没有意义的。虽然方程组可以得到精确解,但却不能取负值解。在这种情况下,其非负最小二乘解比方程的精确解更有意义。`NNLS在最优化模块会作详细讲解。
(3) 将ratings数据转换为分区的格式。
将ratings数据转换为分区的形式,即((用户分区id,商品分区id),分区数据集blocks))的形式,并缓存到内存中。其中分区id的计算是通过ALSPartitioner的getPartitions方法获得的,分区数据集由RatingBlock组成,
它表示(用户分区id,商品分区id )对所对应的用户id集,商品id集,以及打分集,即(用户id集,商品id集,打分集)。
val blockRatings = partitionRatings(ratings, userPart, itemPart)
.persist(intermediateRDDStorageLevel)
val numPartitions = srcPart.numPartitions * dstPart.numPartitions
ratings.mapPartitions { iter =&
val builders = Array.fill(numPartitions)(new RatingBlockBuilder[ID])
iter.flatMap { r =&
val srcBlockId = srcPart.getPartition(r.user)
val dstBlockId = dstPart.getPartition(r.item)
val idx = srcBlockId + srcPart.numPartitions * dstBlockId
val builder = builders(idx)
builder.add(r)
if (builder.size &= 2048) {
builders(idx) = new RatingBlockBuilder
Iterator.single(((srcBlockId, dstBlockId), builder.build()))
Iterator.empty
builders.view.zipWithIndex.filter(_._1.size & 0).map { case (block, idx) =&
val srcBlockId = idx % srcPart.numPartitions
val dstBlockId = idx / srcPart.numPartitions
((srcBlockId, dstBlockId), block.build())
}.groupByKey().mapValues { blocks =&
val builder = new RatingBlockBuilder[ID]
blocks.foreach(builder.merge)
builder.build()
}.setName("ratingBlocks")
(4)获取inblocks和outblocks数据。
获取inblocks和outblocks数据是数据处理的重点。我们知道,通信复杂度是分布式实现一个算法时要重点考虑的问题,不同的实现可能会对性能产生很大的影响。我们假设最坏的情况:即求解商品需要的所有用户特征都需要从其它节点获得。
如下图3.1所示,求解v1需要获得u1,u2,求解v2需要获得u1,u2,u3等,在这种假设下,每步迭代所需的交换数据量是O(m*rank),其中m表示所有观察到的打分集大小,rank表示特征数量。
从图3.1中,我们知道,如果计算v1和v2是在同一个分区上进行的,那么我们只需要把u1和u2一次发给这个分区就好了,而不需要将u2分别发给v1,v2,这样就省掉了不必要的数据传输。
图3.2描述了如何在分区的情况下通过U来求解V,注意节点之间的数据交换量减少了。使用这种分区结构,我们需要在原始打分数据的基础上额外保存一些信息。
在Q1中,我们需要知道和v1相关联的用户向量及其对应的打分,从而构建最小二乘问题并求解。这部分数据不仅包含原始打分数据,还包含从每个用户分区收到的向量排序信息,在代码里称作InBlock。在P1中,我们要知道把u1,u2 发给Q1。我们可以查看和u1相关联的所有产品来确定需要把u1发给谁,但每次迭代都扫一遍数据很不划算,所以在spark的实现中只计算一次这个信息,然后把结果通过RDD缓存起来重复使用。这部分数据我们在代码里称作OutBlock。
所以从U求解V,我们需要通过用户的OutBlock信息把用户向量发给商品分区,然后通过商品的InBlock信息构建最小二乘问题并求解。从V求解U,我们需要商品的OutBlock信息和用户的InBlock信息。所有的InBlock和OutBlock信息在迭代过程中都通过RDD缓存。打分数据在用户的InBlock和商品的InBlock各存了一份,但分区方式不同。这么做可以避免在迭代过程中原始数据的交换。
下面介绍获取InBlock和OutBlock的方法。下面的代码用来分别获取用户和商品的InBlock和OutBlock。
val (userInBlocks, userOutBlocks) =makeBlocks("user", blockRatings,
userPart, itemPart,intermediateRDDStorageLevel)
val swappedBlockRatings = blockRatings.map {
case ((userBlockId, itemBlockId), RatingBlock(userIds, itemIds, localRatings)) =&
((itemBlockId, userBlockId), RatingBlock(itemIds, userIds, localRatings))
val (itemInBlocks, itemOutBlocks) =makeBlocks("item", swappedBlockRatings,
itemPart, userPart,intermediateRDDStorageLevel)
我们会以求商品的InBlock以及用户的OutBlock为例来分析makeBlocks方法。因为在第(5)步中构建最小二乘的讲解中,我们会用到这两部分数据。
下面的代码用来求商品的InBlock信息。
val inBlocks = ratingBlocks.map {
case ((srcBlockId, dstBlockId), RatingBlock(srcIds, dstIds, ratings)) =&
val start = System.nanoTime()
val dstIdSet = new OpenHashSet[ID](1 && 20)
dstIds.foreach(dstIdSet.add)
val sortedDstIds = new Array[ID](dstIdSet.size)
var pos = dstIdSet.nextPos(0)
while (pos != -1) {
sortedDstIds(i) = dstIdSet.getValue(pos)
pos = dstIdSet.nextPos(pos + 1)
Sorting.quickSort(sortedDstIds)
val dstIdToLocalIndex = new OpenHashMap[ID, Int](sortedDstIds.length)
while (i & sortedDstIds.length) {
dstIdToLocalIndex.update(sortedDstIds(i), i)
val dstLocalIndices = dstIds.map(dstIdToLocalIndex.apply)
(srcBlockId, (dstBlockId, srcIds, dstLocalIndices, ratings))
}.groupByKey(new ALSPartitioner(srcPart.numPartitions))
.mapValues { iter =&
val builder =
new UncompressedInBlockBuilder[ID](new LocalIndexEncoder(dstPart.numPartitions))
iter.foreach { case (dstBlockId, srcIds, dstLocalIndices, ratings) =&
builder.add(dstBlockId, srcIds, dstLocalIndices, ratings)
builder.build().compress()
}.setName(prefix + "InBlocks")
.persist(storageLevel)
这段代码首先对ratingBlocks数据集作map操作,将ratingBlocks转换成(商品分区id,(用户分区id,商品集合,用户id在分区中相对应的位置,打分)这样的集合形式。然后对这个数据集作groupByKey操作,以商品分区id为key值,处理key对应的值,将数据集转换成(商品分区id,InBlocks)的形式。
这里值得我们去分析的是输入块(InBlock)的结构。为简单起见,我们用图3.2为例来说明输入块的结构。
以Q1为例,我们需要知道关于v1和v2的所有打分:(v1, u1, r11),(v2, u1, r12), (v1, u2, r21), (v2, u2, r22), (v2, u3, r32),把这些项以Tuple的形式存储会存在问题,第一,Tuple有额外开销,每个Tuple实例都需要一个指针,而每个Tuple所存的数据不过是两个ID和一个打分;
第二,存储大量的Tuple会降低垃圾回收的效率。所以spark实现中,是使用三个数组来存储打分的,如([v1, v2, v1, v2, v2], [u1, u1, u2, u2, u3], [r11, r12, r21, r22, r32])。这样不仅大幅减少了实例数量,还有效地利用了连续内存。
但是,光这么做并不够,spark代码实现中,并没有存储用户的真实id,而是存储的使用LocalIndexEncoder生成的编码,这样节省了空间,格式为UncompressedInBlock:(商品id集,用户id集对应的编码集,打分集),
如,([v1, v2, v1, v2, v2], [ui1, ui1, ui2, ui2, ui3], [r11, r12, r21, r22, r32])。这种结构仍旧有压缩的空间,spark调用compress方法将商品id进行排序(排序有两个好处,除了压缩以外,后文构建最小二乘也会因此受益),
并且转换为(不重复的有序的商品id集,商品位置偏移集,用户id集对应的编码集,打分集)的形式,以获得更优的存储效率(代码中就是将矩阵的coo格式转换为csc格式,你可以更进一步了解矩阵存储,以获得更多信息)。
以这样的格式修改([v1, v2, v1, v2, v2], [ui1, ui1, ui2, ui2, ui3], [r11, r12, r21, r22, r32]),得到的结果是([v1, v2], [0, 2, 5], [ui1, ui2, ui1, ui2, ui3], [r11, r21, r12, r22, r32])。其中[0, 2]指v1对应的打分的区间是[0, 2],[2, 5]指v2对应的打分的区间是[2, 5]。
Compress方法利用spark内置的Timsort算法将UncompressedInBlock进行排序并转换为InBlock。代码如下所示:
def compress(): InBlock[ID] = {
val sz = length
val uniqueSrcIdsBuilder = mutable.ArrayBuilder.make[ID]
val dstCountsBuilder = mutable.ArrayBuilder.make[Int]
var preSrcId = srcIds(0)
uniqueSrcIdsBuilder += preSrcId
var curCount = 1
while (i & sz) {
val srcId = srcIds(i)
if (srcId != preSrcId) {
uniqueSrcIdsBuilder += srcId
dstCountsBuilder += curCount
preSrcId = srcId
curCount = 0
curCount += 1
dstCountsBuilder += curCount
val uniqueSrcIds = uniqueSrcIdsBuilder.result()
val numUniqueSrdIds = uniqueSrcIds.length
val dstCounts = dstCountsBuilder.result()
val dstPtrs = new Array[Int](numUniqueSrdIds + 1)
var sum = 0
while (i & numUniqueSrdIds) {
sum += dstCounts(i)
dstPtrs(i) = sum
InBlock(uniqueSrcIds, dstPtrs, dstEncodedIndices, ratings)
private def sort(): Unit = {
val sz = length
val sortId = Utils.random.nextInt()
val sorter = new Sorter(new UncompressedInBlockSort[ID])
sorter.sort(this, 0, length, Ordering[KeyWrapper[ID]])
下面的代码用来求用户的OutBlock信息。
val outBlocks = inBlocks.mapValues { case InBlock(srcIds, dstPtrs, dstEncodedIndices, _) =&
val encoder = new LocalIndexEncoder(dstPart.numPartitions)
val activeIds = Array.fill(dstPart.numPartitions)(mutable.ArrayBuilder.make[Int])
val seen = new Array[Boolean](dstPart.numPartitions)
while (i & srcIds.length) {
var j = dstPtrs(i)
ju.Arrays.fill(seen, false)
while (j & dstPtrs(i + 1)) {
val dstBlockId = encoder.blockId(dstEncodedIndices(j))
if (!seen(dstBlockId)) {
activeIds(dstBlockId) += i
seen(dstBlockId) = true
activeIds.map { x =&
x.result()
}.setName(prefix + "OutBlocks")
.persist(storageLevel)
这段代码中,inBlocks表示用户的输入分区块,格式为(用户分区id,(不重复的用户id集,用户位置偏移集,商品id集对应的编码集,打分集))。
activeIds表示商品分区中涉及的用户id集,也即上文所说的需要发送给确定的商品分区的用户信息。activeIds是一个二维数组,第一维表示分区,第二维表示用户id集。用户OutBlocks的最终格式是(用户分区id,OutBlocks)。
通过用户的OutBlock把用户信息发给商品分区,然后结合商品的InBlock信息构建最小二乘问题,我们就可以借此解得商品的极小解。反之,通过商品OutBlock把商品信息发送给用户分区,然后结合用户的InBlock信息构建最小二乘问题,我们就可以解得用户解。
第(6)步会详细介绍如何构建最小二乘。
(5)初始化用户特征矩阵和商品特征矩阵。
交换最小二乘算法是分别固定用户特征矩阵和商品特征矩阵来交替计算下一次迭代的商品特征矩阵和用户特征矩阵。通过下面的代码初始化第一次迭代的特征矩阵。
var userFactors = initialize(userInBlocks, rank, seedGen.nextLong())
var itemFactors = initialize(itemInBlocks, rank, seedGen.nextLong())
初始化后的userFactors的格式是(用户分区id,用户特征矩阵factors),其中factors是一个二维数组,第一维的长度是用户数,第二维的长度是rank数。初始化的值是异或随机数的F范式。itemFactors的初始化与此类似。
(6)利用inblock和outblock信息构建最小二乘。
构建最小二乘的方法是在computeFactors方法中实现的。我们以商品inblock信息结合用户outblock信息构建最小二乘为例来说明这个过程。代码首先用用户outblock与userFactor进行join操作,然后以商品分区id为key进行分组。
每一个商品分区包含一组所需的用户分区及其对应的用户factor信息,格式即(用户分区id集,用户分区对应的factor集)。紧接着,用商品inblock信息与merged进行join操作,得到商品分区所需要的所有信息,即(商品inblock,(用户分区id集,用户分区对应的factor集))。
有了这些信息,构建最小二乘的数据就齐全了。详细代码如下:
val srcOut = srcOutBlocks.join(srcFactorBlocks).flatMap {
case (srcBlockId, (srcOutBlock, srcFactors)) =&
srcOutBlock.view.zipWithIndex.map { case (activeIndices, dstBlockId) =&
(dstBlockId, (srcBlockId, activeIndices.map(idx =& srcFactors(idx))))
val merged = srcOut.groupByKey(new ALSPartitioner(dstInBlocks.partitions.length))
dstInBlocks.join(merged)
我们知道求解商品值时,我们需要通过所有和商品关联的用户向量信息来构建最小二乘问题。这里有两个选择,第一是扫一遍InBlock信息,同时对所有的产品构建对应的最小二乘问题;
第二是对于每一个产品,扫描InBlock信息,构建并求解其对应的最小二乘问题。第一种方式复杂度较高,具体的复杂度计算在此不作推导。spark选取第二种方法求解最小二乘问题,同时也做了一些优化。
做优化的原因是二种方法针对每个商品,都会扫描一遍InBlock信息,这会浪费较多时间,为此,将InBlock按照商品id进行排序(前文已经提到过),我们通过一次扫描就可以创建所有的最小二乘问题并求解。
构建代码如下所示:
while (j & dstIds.length) {
ls.reset()
var i = srcPtrs(j)
var numExplicits = 0
while (i & srcPtrs(j + 1)) {
val encoded = srcEncodedIndices(i)
val blockId = srcEncoder.blockId(encoded)
val localIndex = srcEncoder.localIndex(encoded)
val srcFactor = sortedSrcFactors(blockId)(localIndex)
val rating = ratings(i)
ls.add(srcFactor, rating)
numExplicits += 1
dstFactors(j) = solver.solve(ls, numExplicits * regParam)
到了这一步,构建显式反馈算法的最小二乘就结束了。隐式反馈算法的实现与此类似,不同的地方是它将YtY这个值预先计算了(可以参考文献【1】了解更多信息),而不用在每次迭代中都计算一遍。代码如下:
val YtY = if (implicitPrefs) Some(computeYtY(srcFactorBlocks, rank)) else None
if (implicitPrefs) {
ls.merge(YtY.get)
if (implicitPrefs) {
val c1 = alpha * math.abs(rating)
if (rating & 0) {
numExplicits += 1
ls.add(srcFactor, (c1 + 1.0) / c1, c1)
后面的问题就如何求解最小二乘了。
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:963759次
积分:12769
积分:12769
排名:第1122名
原创:330篇
转载:66篇
译文:15篇
评论:411条
文章:12篇
阅读:33462
文章:27篇
阅读:39367
阅读:10723
阅读:2052
阅读:4789
阅读:7377
文章:53篇
阅读:134434
阅读:12985
文章:55篇
阅读:160236
阅读:12698
文章:20篇
阅读:112582
文章:103篇
阅读:172830
文章:11篇
阅读:15469
文章:19篇
阅读:76477}

我要回帖

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信