MapReduce提供了以下的主要功能:
1)数据划分和计算任务调度:
系统自动将一个作业(Job)待处理的大数据划分为很多个数据块,每个数据块对应于一个计算任务(Task),并自动 调度计算节点来处理相应的数据块。作业和任务调度功能主要负责分配和调度计算节点(Map节点或Reduce节点),同时负责监控这些节点的执行状态,并 负责Map节点执行的同步控制。
2)数据/代码互定位:
为了减少数据通信,一个基本原则是本地化数据处理,即一个计算节点尽可能处理其本地磁盘上所分布存储的数据,这实现了代码向 数据的迁移;当无法进行这种本地化数据处理时,再寻找其他可用节点并将数据从网络上传送给该节点(数据向代码迁移),但将尽可能从数据所在的本地机架上寻 找可用节点以减少通信延迟。
3)系统优化:
为了减少数据通信开销,中间结果数据进入Reduce节点前会进行一定的合并处理;一个Reduce节点所处理的数据可能会来自多个 Map节点,为了避免Reduce计算阶段发生数据相关性,Map节点输出的中间结果需使用一定的策略进行适当的划分处理,保证相关性数据发送到同一个 Reduce节点;此外,系统还进行一些计算性能优化处理,如对最慢的计算任务采用多备份执行、选最快完成者作为结果。
4)出错检测和恢复:
以低端商用服务器构成的大规模MapReduce计算集群中,节点硬件(主机、磁盘、内存等)出错和软件出错是常态,因此 MapReduce需要能检测并隔离出错节点,并调度分配新的节点接管出错节点的计算任务。同时,系统还将维护数据存储的可靠性,用多备份冗余存储机制提 高数据存储的可靠性,并能及时检测和恢复出错的数据。
Hadoop开发要点
1.Hadoop将运行一次Map/Reduce作业叫做运行一个Job
2.Hadoop需要计算的源数据都存储在HDFS中。
3. Map阶段计算结果存储在本地文件系统中。
4.Hadoop最终计算的结果也存储在HDFS中。
5. Map/Reduce框架的运作完全基于<key,value>对,即数据的输入是一批<key,value>对,生成的结果也是一批<key,value>对。
6.默认情况下,Key与value之间用\t分隔。
Hadoop开发方式:
1.基于java的开发
--得天独厚的程序语言优势
准备计算用的源数据
将源数据上传到HDFS中
分别编写map/reduce脚本
编写运行脚本
开始运行
查看结果
A:终端信息反馈
B:http://JobTrackerAdress:50030/
获取结果
2.Streaming开发
--不限制语言的优势:允许用户创建和运行任何可执行程序
•特点:
hadoop streaming使用标准输入将数据传递给map/red程序, map/red程序使用标准输出将数据返回给hadoop;
需要将问题转化为(key,value)对的形式,并将问题划分为一个或多个(map,red)阶段,默认情况下,key与value之间是\t分隔的。
具体过程如下:
Streaming开发实例:
数据格式:
2011-05-01T00:00:16.884085+08:00 IZP-HN-ZSQ-4 [info] <adc_packet_0>[20624]:|rpt_cad|73750076302860878|800208978|125.34.145.21|99990011|4db0e4cf|frame|2011-04-30 23:55:14|0|0|0|1|default|0|0|dota1.luren.cc|dota1.luren.cc/|
(其中,第7个字段为每个广告的id值)
任务说明:获取每个广告的pv
本地数据:/home/gaohui/data_view/ad/ad_day_pv/rpt_cad.log
Hadoop数据:将本地数据放置到hadoop集群上,hadoop命令:hadoop dfs –put /home/gaohui/data_view/ad/ad_day_pv/rpt_cad.log /data/test_in/
文件名:cat ad_day_pv_map.py(存储内容)
#!/usr/bin/python import sys for eachLine in sys.stdin: #从标准输入获取数据 eachLine = eachLine.strip() Seg = eachLine.split(‘|’) #用\t来分割开各个字段 if len(Seg) == 19 and Seg[1] == 'rpt_cad' and Seg[6] != '': ad_id = Seg[6] print ‘%s\t%d’ % (ad_id, 1) #将map结果送到标准输出 else: pass
文件名:ad_day_pv_red.py(reduce操作)
#!/usr/bin/python LastLine = '' LastCount = 0 import sys for eachLine in sys.stdin: #从标准输入获取数据 eachLine = eachLine.strip() Seg = eachLine.split(‘\t’) #用\t来分割开key和value if len(Seg) == 2 and Seg[0] != '': ad_id = Seg[0] if ad_id == LastLine: LastCount = LastCount + 1 #对于同一个key的值进行累加操作 else: if LastLine != '': print ‘%s\t%d’ % (LastLine, LastCount) #将结果送到标准输出 LastLine = ad_id LastCount = 1 else: pass print '%s\t%d' % (LastLine, LastCount)
1.程序本身的健壮性
2.注意-mapper和-reducer的路径要用绝对路径
3.map/reduce脚本要有可执行属性。
4.map/reduce脚本的简单语法层面的调试:
cat input.txt|./map.py|sort|./red.py
5. map/reduce脚本的线上小部分数据进行简单测试。
6.关注hadoop的Job输出结果和Job运行界面。
7.保存Job输出结果的完美解决方案(nohup 命令字符串 &)。
8. mapper或reducer程序在遇到从标准输入读到EOF,读标准输入异常,pipe broken异常,写标准输出异常,写标准错误异常时,应该主动退出。
9. 处理中文,源输入文件必须是utf-8格式(平台硬编码问题)。