博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spark Examples
阅读量:6005 次
发布时间:2019-06-20

本文共 2952 字,大约阅读时间需要 9 分钟。

Spark Examples

Spark is built around
distributed datasets that support types of parallel operations: transformations, which are lazy and yield another distributed dataset (e.g.,
map,
filter, and
join), and actions, which force the computation of a dataset and return a result (e.g.,
count). The following examples show off some of the available operations and features.

Text Search

In this example, we search through the error messages in a log file:

 

val file = spark.textFile(
"hdfs://...")
val errors = file.
filter(
line => line.contains("ERROR"))
// Count all the errors
errors.
count()
// Count errors mentioning MySQL
errors.
filter(
line => line.contains("MySQL")).
count()
// Fetch the MySQL errors as an array of strings
errors.
filter(
line => line.contains("MySQL")).
collect()

 

The red code fragments are Scala function literals (closures) that get passed automatically to the cluster. The blue ones are Spark operations.

In-Memory Text Search

Spark can cache datasets in memory to speed up reuse. In the example above, we can load just the error messages in RAM using:

 

errors.
cache()

 

After the first action that uses errors, later ones will be much faster.

Word Count

In this example, we use a few more transformations to build a dataset of (String, Int) pairs called counts and then save it to a file.

 

val file = spark.textFile(
"hdfs://...")
val counts = file.
flatMap(
line => line.split(" "))
                  .
map(
word => (word, 1))
                  .
reduceByKey(
_ + _)
counts.
saveAsTextFile(
"hdfs://...")

 

Estimating Pi

Spark can also be used for compute-intensive tasks. This code estimates π by "throwing darts" at a circle. We pick random points in the unit square ((0, 0) to (1,1)) and see how many fall in the unit circle. The fraction should be π / 4, so we use this to get our estimate.

 

val count = spark.parallelize(1 to NUM_SAMPLES).
map(
i =>
  val x = Math.random
  val y = Math.random
  if (x*x + y*y < 1) 1.0 else 0.0
).
reduce(
_ + _)
println(
"Pi is roughly " + 4 * count / NUM_SAMPLES)

 

Logistic Regression

This is an iterative machine learning algorithm that seeks to find the best hyperplane that separates two sets of points in a multi-dimensional feature space. It can be used to classify messages into spam vs non-spam, for example. Because the algorithm applies the same MapReduce operation repeatedly to the same dataset, it benefits greatly from caching the input data in RAM across iterations.

 

val points = spark.textFile(...).
map(parsePoint).
cache()
var w = Vector.random(D)
// current separating plane
for (i <- 1 to ITERATIONS) {
  
val gradient = points.
map(
p =>
    (1 / (1 + exp(-p.y*(w dot p.x))) - 1) * p.y * p.x
  
).
reduce(
_ + _)
  w -= gradient
}
println(
"Final separating plane: " + w)

 

Note that w gets shipped automatically to the cluster with every map call.

The graph below compares the performance of this Spark program against a Hadoop implementation on 30 GB of data on an 80-core cluster, showing the benefit of in-memory caching:

Logistic regression performance in Spark vs Hadoop

转载地址:http://bupmx.baihongyu.com/

你可能感兴趣的文章
在Linux系统下玩《炉石传说:魔兽英雄传》
查看>>
阿里数据库内核月报:2016年01月
查看>>
Samba 系列(七):在 Samba AD DC 服务器上创建共享目录并映射到 Windows/Linux 客户...
查看>>
The Joy of Clojure – Clojure philosophy(1)
查看>>
Apache Storm 官方文档 —— 多语言接口协议
查看>>
在 Linux/UNIX 终端下使用 nload 实时监控网络流量和带宽使用
查看>>
小白学数据:一文看懂NoSQL数据库
查看>>
阿里云ApsaraDB RDS用户 - OLAP最佳实践
查看>>
菜鸟学Linux命令:Chmod命令和数字文件权限
查看>>
设置AFNetworking网络请求的超时时间
查看>>
从零开始的微信支付接入(一)用户认证
查看>>
linux何检查一个目录是否为空目录
查看>>
压缩介绍、bz2、gz、xz压缩工具
查看>>
StretchRect...果然和文档上说的一样
查看>>
Python成生随机KEY工具
查看>>
将一个数组拆分为几个至少三个元素的递增子序列
查看>>
备忘,解决WIN10下COM注册问题
查看>>
SAP移动解决方案在零售行业的应用方案及案例分享
查看>>
cx_Oracle install
查看>>
jquery ajax从后台获取数据
查看>>