随着近年来视频拍摄设备与视频处理技术的高速发展,对网络上海量视频的分析越来越受到关注与重视。本实验希望通过使用 Hadoop 实验数据集 —— Dataset for "Statistics and Social Network of YouTube Videos" 进行简明的分析实验,从而加深对大数据分析的体会与认识、对 Hadoop、MapReduce 等的理解与应用。  


本实验将在虚拟机上 (CentOS 7.5 64bits),基于 Hadoop 2.7.5 (64bits),使用 MapReduce 实现对部分 YouTube 视频数据集的分析,以统计出某一时期内 YouTube 用户已上传视频类型的总数量,并对可视化后的结果进行简要的比较与分析。


三、实验过程

3.1 数据集预处理与上传

本实验所用的 YouTube 数据集下载自 School of Computing Science Simon Fraser University。数据集中,每行都代表一条用户上传视频数据,每行中各字段由 Tab 字符 ('\t') 分隔开,各字段含义按顺序如下表所示:

部分数据集如下所示。其中,每行数据从左往右依次表示:video ID、uploader、age、category、length、views、rate、ratings、comments、related IDs。而本实验所关心的、将使用到的是 category 这项 (蓝框)。

 实验所用的数据集采样自2007年2月22至5月18日中的35天的YouTube用户上传视频信息。由于下载得到的数据集共有35个压缩包(解压后是4-5个txt文本),对应35天的采样数据,为此,我们需要先在本地主机上合并它们。合并方法有很多种,此处将主要描述方法一,概述方法二。

3.1.1 合并数据集——方法一

先把解压后的所有txt数据文本放置在桌面的DataforUpload文件夹中,然后打开命令行程序,按路径 cd 进入DataforUpload文件夹,执行 copy *.* all.txt 对文件夹下所有数据文本执行复制合并的批处理操作,生成汇总数据集文件 all.txt 共计895.5MB,含数据约一百万条以上,如下所示:

然后连接VPN,登陆虚拟机,运行Hadoop进程,再使用upload程序(见附录)将本地的数据集all.txt上传至HDFS的 "/hadoop2/videoinput2/" 路径下。通过 HDFS Web UI 验证文件上传成功,如下所示:

3.1.2 合并数据集方法二

可以先将本地的含有所有分散文本数据的 DataforUpload 文件夹上传至 HDFS (的hadoop2/inputVideo1/ 路径下),再使用hdfs dfs -getmerge <源路径 目的路径 >   命令将 HDFS 中 DataforUpload 文件夹下所有数据文本合并下载到虚拟机系统本地 (的 /opt/bigdata/hadoop/text/ 路径下) 并命名为 temp。然后cd进入temp所在目录,使用命令du -sh * 查看目录下所有的目录和文件大小,可见869M的 temp,如下所示:

最后,使用命令 hdfs dfs -put<源路径 目的路径 >  上传虚拟机本地文件 —— 汇总数据集 temp 到 HDFS (的hadoop2/inputvideo2/  路径下) 即可。

        通过 HDFS Web UI 验证文件上传成功,如下所示:

3.2 程序建立与数据处理

由3.1节已知数据集的格式及其含义,为此,我们先设计Mapper类的代码实现。

简单来说,一个map函数就是对一些独立元素组成的概念上的列表 (词频统计中每行数据形成的列表) 的每个或特定元素进行指定的操作 (把每行数据拆分成不同的单词并对特定单词计数为1) 。在这里,Hadoop首先从输入数据集文件逐行读取数据,然后对每行调用一次map函数。随后,每个map函数都会解析该行,并将接收到的每一行中category提取出来作为输出的key,而value都设定为1,表示“出现次数为1”。每行提取出的 <key,value> = <category,1> 都会写入context作为中间结果。

以下是 JAVA 代码实现:

    // 定义一个Map类继承Mapper父类  // 定义输入key为每一行的起始偏移量(LongWritable),输入value为每行文本内容(Text)public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {  // 构造Map输出的Text类型key、IntWritable类型value=1对象  private Text category = new Text();  private final static IntWritable one = new IntWritable(1);  public void map(LongWritable key, Text value, Context context)  throws IOException, InterruptedException {  // 将每行数据转为字符串后,以“\ t”作为分隔符来分割每行字符串  // 然后将分割后的各元素存储在字符串数组中  String[] line = value.toString().split(“\t”);  // 只记录有意义的数据  if (line.length > 6) {  // 滤除数据集中不符合要求的信息category.set(line[3]);  // 指定视频类型(category)为Map输出的key  }  // 将Map输出<key, value>=<category,one>写入context (即Reduce)  context.write(category, one);  }  }  

Hadoop获取map函数输出的 <key,value> 对,然后在 shuffle 阶段根据 key 进行排序(字母顺序),然后输入 Reduce,对相同的 key(category) 进行计数,并根据 key(category) 再次以 <key,value> 的形式输出,写入 context(HDFS) 中。

以下是具体实现:

    // 定义一个Reduce类继承Reducer父类  // Reduce类的输入与Mapper类的输出<key,value>对类型必须一致 public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> { // Reduce接收Map传入的 (被写入context的) <key,value>对  // 输入Reduce前,Shuffle阶段已经根据key的字母进行排序了  public void reduce(Text key, Iterable<IntWritable> values, Context context)  throws IOException, InterruptedException {  int counter = 0;  // 初始化counter  // 遍历key(category)相同的values  for (IntWritable val : values) {  counter += val.get(); }  // 累计同一category的数量  // 将统计结果写入context (即HDFS)  context.write(key, new IntWritable(counter));  }  }  

3.3 处理过程理解与示意

MapReduce 处理数据流程 <key,value> 对变化示意图如下所示(以4条数据为例):


四、结果分析

4.1 运行日志分析

上图为运行日志的一部分,如图可知,map 运行7次共计用时107s,平均每次15.3s;reduce 运行用时13s。这验证了895.5M 的数据集被分成大小为 128M 的7个 Block,对应7个 Split,每个 Split 对应1个 map task,所以一共运行了7次 map。

注意,Block 意为数据块,Split 意为分片,两者数目相同,而后者是 MapReduce 中 map task 的最小输入单位。作为Mapper 的输入,分片是很重要的环节,它主要是将 HDFS 上数据块进行map计算之前,根据作业的 InputFormat 把输入数据集重新进行逻辑上的划分,再将划分得到的每个逻辑 InputSplit 实例分别分发给每个 Mapper。

这里 HDFS 的数据块为128M。若某文件大于128M,则会将该“大文件”分解为若干个数据块;若某文件小于128M,则按它的实际大小组块存储。

这也有助于理解 map task 分片大小(Split Size) 默认和 HDFS 的数据块(Block Size)大小一致的原因。一方面,由 map task 数量 = 输入文件总大小(Total Size) / 分片尺寸(Split Size),可知分片尺寸越大,map task 数量就越少,从而系统管理与处理分片的开销就越小。另一方面,若分片尺寸太大,以至于一个分片要跨越多个 HDFS 数据块,则一个 map task 就需要经由多个块的网络传输。所以设置分片尺寸的上限为 HDFS 数据块大小,将有助于减少网络开销。综上所述,令分片尺寸等于数据块大小最合适。

 

4.2 实验结果分析

        通过 HDFS shell 查看实验结果,如下所示:

使用Excel对实验结果进行排序并可视化,得到用户上传数据类型统计图,如下所示:

上图表明:YouTube用户在2007年2月22至5月18日,对音乐(Music)的兴趣相对特别高,对娱乐(Entertainment)的兴趣次高,对喜剧(Comedy) 的兴趣稍低些,然后依次对运动(Sports)、电影&动画(Film&Animals)、人物&博客(People&Blogs) 等较感兴趣,对其他类别则兴趣相对较低。

总体而言,数据从视频类型上传量这个角度反映出该时间段的YouTube视频用户以休闲娱乐为主要偏好或目的,对新闻政治(News & Politics)的兴趣介于中等水平,而对出行工具、DIY、动物、旅行其他内容等兴趣相对较低。当然,若要更为切实客观地分析用户偏好,还需要综合考虑浏览量、评分、评论等多重因素。


一是大数据的数据量大,获取不易,特别是适时的、核心的、有意义的数据。我本来想找诸如近期的淘宝用户数据、知名网站访问IP数据等看起来有意义且时效性强的数据,但是找了很久却一无所获。最后有人找到了这个经典的 YouTube 数据集,正好是专门用来进行大数据实验的,就只好用它了,唯一缺点就是时效性不足。

二是大数据虽然数据量大,但是有价值、有意义的比例并不很高,而且需要预处理。YouTube 数据集就是一个典型的、显式的例子,我们所关心的内容仅仅占总数据集的一小部分。当然,更深入的还有分析潜在的语义相关性、时间相关性等,对应也需要更强大的数据挖掘分析技术等。

三是 HDFS 适合处理海量大数据,不适合处理海量小数据,因为时间、内存、网络开销等“不划算”。至于原因前面也有提过,总得来说就是小数据并没有因为内存小而各种减少开销。


参考文献

https://blog.csdn.net/itbiggod/article/details/80901357

https://blog.csdn.net/oxuzhenyi/article/details/77389144


附录

A. 上传本地数据集至 HDFS

package Upload;  
import java.io.BufferedReader;  
import java.io.InputStreamReader;  
import java.sql.Date;  
import java.util.Properties;  
import org.apache.hadoop.conf.Configuration;  
import org.apache.hadoop.fs.BlockLocation;  
import org.apache.hadoop.fs.FSDataInputStream;  
import org.apache.hadoop.fs.FSDataOutputStream;  
import org.apache.hadoop.fs.FileStatus;  
import org.apache.hadoop.fs.FileSystem;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.hdfs.DistributedFileSystem;  
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;  public class Upload {  public static void main(String[] args) throws Exception {  Properties properties = System.getProperties();  properties.setProperty("HADOOP_USER_NAME", "bduser");  // 获取环境变量  Configuration conf = new Configuration();    // 设置环境参数  conf.set("fs.defaultFS", "hdfs://10.40.2.147:9000");   conf.set("fs.hdfs.impl","org.apache.hadoop.hdfs.DistributedFileSystem");  // 创建文件系统实例对象 hdfs  FileSystem hdfs = FileSystem.get(conf);     // 调用Upload函数上传本地文件至HDFS  Upload(hdfs);    
}  public static void Upload(FileSystem hdfs) {  // 定义本地路径  String localdir="C:\\Users\\lenovo\\Desktop\\DataforUpload";    // 定义HDFS路径  String hdfsdir="/hadoop2/videoinput2/";    try {  // 实例化路径对象  Path localpath = new Path(localdir);  Path hdfspath = new Path(hdfsdir);  // 上传文件  hdfs.copyFromLocalFile(localpath, hdfspath);  }   // 当无法上传本地文件至HDFS时捕获异常  catch(Exception e) {    e.printStackTrace();  }  
}  
}  

B. 对数据集进行关键词频统计

package VideoTypeCount;  
import org.apache.hadoop.conf.*;  
import org.apache.hadoop.fs.Path;  
import org.apache.hadoop.io.*;  
import org.apache.hadoop.mapreduce.*;  
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;  
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;  
import java.io.IOException;  
import java.util.Properties;  public class VideoTypeCount {  // 定义一个Map类继承Mapper父类  public static class Map extends Mapper<LongWritable, Text, Text, IntWritable> {  // 构造Map输出的Text类型key、IntWritable类型value=1对象  private Text category = new Text();  private final static IntWritable one = new IntWritable(1);  public void map(LongWritable key, Text value, Context context)  throws IOException, InterruptedException {  // 将每行数据转为字符串后,以“\ t”作为分隔符来分割每行字符串  // 然后将分割后的各元素存储在字符串数组中  String[] line = value.toString().split(“\t”);  // 只记录有意义的数据  if (line.length > 7) {  // 滤除数据集中不符合要求的log信息category.set(line[3]);  // 指定视频类型(category)为Map输出的key  }  // 将Map输出<key, value>=<category,one>写入context (即Reduce)  context.write(category, one);  }  }  // 定义一个Reduce类继承Reducer父类  public static class Reduce extends Reducer<Text, IntWritable, Text, IntWritable> {  // Reduce接收Map传入的 (被写入context的) <key,value>对  // 输入Reduce前,Shuffle阶段已经根据key的字母进行排序了  public void reduce(Text key, Iterable<IntWritable> values, Context context)  throws IOException, InterruptedException {  int counter = 0;  // 初始化counter  // 遍历key(category)相同的values  for (IntWritable val : values) {  counter += val.get();  // 累计同一category的数量  }  // 将统计结果写入context (即HDFS)  context.write(key, new IntWritable(counter));  }  }  
}  // 主函数public static void main(String[] args) throws Exception {  Properties properties = System.getProperties();  properties.setProperty("HADOOP_USER_NAME", "bduser");  // 获取并配置环境变量  Configuration conf = new Configuration();  conf.set("fs.defaultFS", "hdfs://10.40.2.147:9000");  // 实例化Job,设置一个用户定义的job名称  Job job = Job.getInstance(conf, "categories");  // 指定导出的Jar包路径    // 设置生产Jar包所使用的类  job.setJarByClass(VideoTypeCount.class);  // 设置Map类输出Key/Value类  job.setMapOutputKeyClass(Text.class);  job.setMapOutputValueClass(IntWritable.class);  // 设置Reduce类输出Key/Value类     job.setOutputKeyClass(Text.class);  job.setOutputValueClass(IntWritable.class);  // 设置Mapper/Reducer类  job.setMapperClass(Map.class);  job.setReducerClass(Reduce.class);  // 指定格式化用的类型  job.setInputFormatClass(TextInputFormat.class);  job.setOutputFormatClass(TextOutputFormat.class);  // 指定要分析的数据集路径  FileInputFormat.addInputPath(job, new Path("/hadoop2/videoinput2")); // 指定分析结果文件输出路径  FileOutputFormat.setOutputPath(job, new Path("/hadoop2/output2"));  // 设定job完成的输出信息  boolean flag = job.waitForCompletion(true);  // 运行job  System.out.println("统计结束"+flag);  System.exit(flag ? 0 : 1);  System.out.println();  }