大家好,好久没没更新Spark类容了,主要是最近考试比较多。今天先给大家展现1个实战案例,这个案例是我在今年参加第1届高校云计算利用
创新大赛时技能赛第4题――莎士比亚文集词频统计并行化算加法。PS:感谢师兄辉哥,真大神!
题目是这样的(这里截图展现出来):
是要好优化还是需要花费1点心思。在这里停词表的作用是对在该表中的单词不予以统计,1般而言停词表中的单词是出现频率比较高的单词(如the)。这个案例比较简单,但
有的人的思路多是这样的:先对莎士比亚文集进行wordcount操作统计出各个单词的出现频率,然后对wordcount中的结果过滤掉在停词表
中出现的单词,最后找到出现频率最高的100个便可。这类方式可行,但效力略低。大家知道wordcount包括shuffle操作,shuffle所带来的IO是spark
性能的瓶颈。我们在写程序的时候应当尽量的较少shuffle IO,那末如何减少shuffle IO呢,在这里我们可以尽可能减少要参与shuffle操作的数据。
所以,优化的思路是对莎士比亚文集进行单词分片后就进行过滤操作,过滤掉在停词表中得单词,然落后行wordcount操作。这样1来我们可以
过滤掉大量出现频率很高的辞汇从而减少了主要shuffle IO。可能有的同学会问那这里的filter操作岂不是比上面的思路中filter操作需要处理的单词书更
多,确切是这样。但是对性能没有任何影响,为何这么说?大家知道spark的1个良好的特点就是它的pipeline(血统),我们的处理在每个shuffle
操作之 前都会算作1个同1个stage,在这个satge中的计算都是在最后的action时才进行的,血统就是具有这1良好特性。那末对每个partiton上的
文本进行单词切割落后行filter操作是否是具有pipeline的特性?是否是这两个操作就像血液1样瞬间流过你的血管中的两个细胞?是否是几近是同时发
生?是否是没有任何性能影响?
另外,我们还可以将范围较小的停词表放在1个hash表中,hash查找的效力几近为单位时间(大家1定要多关注hash的原理,头几天百度面试包
含了很多hash类容)。
说了这么多,下面贴出源码:
- import org.apache.spark.SparkContext._
- import org.apache.spark.SparkConf
- import org.apache.spark._
- object Shakespear {
- def main(args: Array[String]) {
- if (args.length != 3) {
- println("USage:<Shakespear> <Stopwords> <Out>")
- }
- //initial SparkConf and SparkContext
- val sc = new SparkContext()
- //To get Shakespear'paper
- val papers = sc.textFile(args(0))
- //To get stopwords
- val stopWords = sc.textFile(args(1)).map(_.trim).collect().toSet + ""
- //To parse papers into words and find the words statisfy the requirement
- val words = papers.flatMap(_.split("[^a-zA-Z]")).map(_.toLowerCase).filter(!stopWords(_)).map((_,1)).
- reduceByKey(_ + _).map(line=>(line._2, line._1)).top(100).map(line=>(line._2, line._1))
- val result = sc.parallelize(words)
- //To write the result into hdfs
- result.saveAsTextFile(args(2))
- }
- }
在后面我会提供包括技能赛第3题和其他的案例详解。希望大家共同学习讨论。(by老杨,转载请注明出处)
上一篇 scala简要:包