启动spark shell
spark-shell # 默认按本地单线程模式启动
或者
spark-shell --master local[2] # 指定线程数
Spark小试牛刀:一个简单的词频统计例子
val rdd = sc.textFile("testfile").flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b).collectAsMap()
RDD基本操作
创建RDD
如果spark中运行在分布式部署的hadoop上,则读入hdfs的当前用户目录下的文件。如果spark独立运行,则读取本地文件当前目录下的文件,
使用sc.textFile生产RDD时,并没有真正读取文件,只有执行某个action时才会读取文件。所以,定义RDD时即使sc.textFile中给一个不存在的文件也不会报错,只有当运行action时才会报找不到文件。
val rdd=sc.textFile("README.md") //从文件创建一个RDD
val rdd=sc.parallerlize(List(1,2,4,5,6)) //从Scala数组或列表转换成RDD
rdd.collect()
# 查看RDD内容。
rdd.persist()
# 将RDD持久化,避免每次执行action时都重新生成
对RDD执行action操作
rdd.count()
# 返回RDD中item的数目,对于用sc.textFile()
创建的RDD就是文本文件的行数
rdd.first()
#返回RDD的第一个item,对于文本文件就是第一行.
rdd.take(2)
#返回RDD的前2个item
rdd.saveAsTextFile("...")
#保持到本地或hdfs等文件系统。
通过一个transformation返回一个新的RDD
为了实现filter,无外乎告诉哪些item保留,哪些不保留,也即要提供一个item留或者不留的函数。
filter方法接受一个函数(其参数为RDD中item的类型,返回类型为boolean)作为参数。
line => line.contains("Spark")
定义了一个匿名函数,其参数为line,后面的line.contains("Spark")为函数体,其调用了String的contains方法,返回一个boolean的值
组合的RDD操作
第一个flatmap创建一个新的RDD,将原来的行item变成以单词为单位的item,flatmap是一对多的map,将原RDD中的每个item应用函数返回多个item(类型与原rdd可以不一样)。
第二个map将单词变成(string,int)的元组
reducebykey对(string,int)按key值合并
第一个map创建了一个新的RDD,其item为整数。map函数和上面的filter类似,也是接收一个函数作为参数。该函数的参数名为line,使用了string类的split函数返回一个string数组,再用size返回该数组的长度。
reduce作用在上面创建的RDD。reduce函数同样接收一个二元函数作为参数。假设RDD的元素类型为T,则该函数的原型为(T,T)=>T,在该例中用匿名函数的方式定义了一个函数,其包含两个整型参数a和b,再返回一个整型的数值。reduce函数对RDD的元素依次按交换律和结合律两两执行提供给它的参数函数(如果rdd有A,B,C,D四个元素,假设二元操作用+表示,则reduce的作用是(A+B+C+D),由于+满足交换律和结合律,所以可以并行执行),最后返回某个item。
rdd.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b) //找到所有行中最多的单词数目
textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)
rdd.flatMap(line=>line.split(" ")).countByValue()//词频统计方法二
补充:Scala匿名函数
spark监控
官方参考资料http://spark.apache.org/docs/latest/monitoring.html
# 每个spark application都有一个对应的web 监控界面,端口号默认从4040开始,有多个app时,依次用占用后面的4041、4042端口。该web服务在应用程序结束时(例如spark shell退出)即中止,如果需要在app结束后访问,需要设置spark history server。
spark history server 设置。
在spark配置中将spark.eventLog.enabled
设为true
。打开event日志选项记录spark运行过程的各项信息。
envetLog默认保存在/tmp/spark-events
目录下面,手动建立该目录,并设置其权限为当前用户。如果要保存到其它目标,同时修改配置项spark.eventLog.dir
和spark.history.fs.logDirectory
到相同的目录。
<spark directory>/sbin/start-history-server.sh
启动历史服务器,默认端口为18080
,这样在spark程序结束后,可以通过查看spark的运行情况。如果要修改端口号,修改配置项spark.history.ui.port
到指定端口
RDD分区
每个RDD都有固定数目的分区,分区数目决定了在RDD上执行操作的并行度。可以用rdd.partitions.size
查看分区的数目。
rdd.textFile("...",4)
# 手动设置分区数目。实际的分区数目可能大于或等于设定的数值,它依赖于dfs中block的大小已经一些其他的参数,还依赖于文件本身的大小。
sbt打包spark的scala程序
name := "Simple Project"version := "1.0"scalaVersion := "2.10.5"libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.2"
maven打包spark的java程序
<project>
<modelVersion>4.0.0</modelVersion>
<groupId>java-spark-app</groupId>
<artifactId>java-spark-app</artifactId>
<version>1.0</version>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.6.2</version>
</dependency>
</dependencies></project>
提交用sbt或maven打包好的spark应用程序
spark-submit --class "hellospark" java-spark-app-1.0.jar
# 提交到spark执行,--class
后接java或scala主类的名字。
如何有多个属性需要设置,可以将这些属性写到一个配置文件中,如下例,然后再用--properties-file
选项指定该配置文件, 例如该配置文件保存在当前文件夹下spark.conf文件中,则提交语句可以为spark-submit --properties-file ./spark.conf --class "hellospark" java-spark-app-1.0.jar
spark.master local[*]spark.app.name My_spark_applicationspark.driver.memory 60gspark.executor.memory 60gspark.logConf truespark.eventLog.enabled true