Oobspark

Out Of the Box and Spark


  • 首页

  • 归档

  • 标签

ML工作流(Pipelines)

发表于 2018-04-12

ML工作流(Pipelines)中的一些概念

  • DataFrame:使用Spark SQL中的DataFrame作为数据集,它可以容纳各种数据类型。 DataFrame中的列可以是存储的文本,特征向量,真实标签和预测标签等。

  • Transformer:转换器,是一种可以将一个DataFrame转换为另一个DataFrame的算法。比如一个模型就是一个Transformer。

  • Estimator:评估器,基于算法实现了一个fit()方法进行拟合,输入一个DataFrame,产生一个Transformer。

  • PipeLine:管道将多个工作流阶段(转换器和估计器)连接在一起,形成机器学习的工作流。

  • Parameter:用来设置所有转换器和估计器的参数。

Pipelines如何运转

一个工作流被指定为一系列的阶段,每个阶段都是Transformer或Estimator。这些阶段按顺序运行,输入的DataFrame在通过每个阶段时会进行转换。对于Transformer阶段,会在DataFrame上调用transform()方法。对于Estimator阶段,调用fit()方法来拟合生成Transformer(它将成为PipelineModel或拟合管道的一部分),并在DataFrame上调用Transformer的transform()方法。

流水线

上图中,顶行表示具有三个阶段的管道。前两个(Tokenizer和HashingTF)是Transformers(蓝色),第三个(LogisticRegression)是Estimator(红色)。底行表示流经管道的数据,其中圆柱表示DataFrames。在原始DataFrame上调用Pipeline.fit()方法拟合,它具有原始的文本和标签。Tokenizer.transform()方法将原始文本拆分为单词,并向DataFrame添加一个带有单词的新列。 HashingTF.transform()方法将字列转换为特征向量,向这些向量添加一个新列到DataFrame。然后,由于LogisticRegression一个Estimator,Pipeline首先调用LogisticRegression.fit()拟合产生一个LogisticRegressionModel。如果管道有更多的Estimator,则在将DataFrame传递到下一个阶段之前,会先在DataFrame上调用LogisticRegressionModel的transform()方法。

PipeLine本身也是一个Estimator。因而,在工作流的fit()方法运行之后,它产生了一个PipelineModel,它也是一个Transformer。这个管道模型将在测试数据的时候使用。下图展示了这种用法。

流水线模型

在上图中,PipelineModel具有与原始Pipeline相同的阶段数,但是原始Pipeline中的所有估计器Estimators都变为变换器Transformers。当在测试数据集上调用PipelineModel的transform()方法时,数据按顺序通过拟合的管道。每个阶段的transform()方法更新数据集并将其传递到下一个阶段。Pipelines和PipelineModels有助于确保训练数据集和测试数据集通过相同的特征处理步骤。

理解Estimator,Transformer和Param

相关API :Estimator,Transformer,Params

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
from pyspark.ml.linalg import Vectors
from pyspark.ml.classification import LogisticRegression

spark = SparkSession.builder.master("local").appName("Estimator-Transformer-Param").getOrCreate()

# 准备训练数据集(label, features)元组
training = spark.createDataFrame([
(1.0, Vectors.dense([0.0, 1.1, 0.1])),
(0.0, Vectors.dense([2.0, 1.0, -1.0])),
(0.0, Vectors.dense([2.0, 1.3, 1.0])),
(1.0, Vectors.dense([0.0, 1.2, -0.5]))], ["label", "features"])

# 创建一个LogisticRegression示例,也就是Estimator
lr = LogisticRegression(maxIter=10, regParam=0.01)

# 打印出所以的参数和默认值信息
print("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

# 用训练数据集训练模型,这一步骤会使用到lr中的parameters
model1 = lr.fit(training)

# 现在,model1成为了一个Model(Estimator产生的transformer)
# 我们查看一下拟合过程所用到的parameters
print("Model 1 was fit using parameters: ")
print(model1.extractParamMap())

# 修改参数
paramMap = {lr.maxIter: 20}
paramMap[lr.maxIter] = 30
paramMap.update({lr.regParam: 0.1, lr.threshold: 0.55})

# 合并参数
paramMap2 = {lr.probabilityCol: "myProbability"} # 修改输出列名
paramMapCombined = paramMap.copy()
paramMapCombined.update(paramMap2)

# 现在基于新的参数进行拟合
model2 = lr.fit(training, paramMapCombined)
print("Model 2 was fit using parameters: ")
print(model2.extractParamMap())

# 测试数据集
test = spark.createDataFrame([
(1.0, Vectors.dense([-1.0, 1.5, 1.3])),
(0.0, Vectors.dense([3.0, 2.0, -0.1])),
(1.0, Vectors.dense([0.0, 2.2, -1.5]))], ["label", "features"])

# 使用Transformer.transform()对测试数据进行预测
# LogisticRegression.transform只会使用'features'列,myProbability列既是probability列,之前我们做过更改
prediction = model2.transform(test)
result = prediction.select("features", "label", "myProbability", "prediction").collect()

for row in result:
print("features=%s, label=%s -> prob=%s, prediction=%s" %
(row.features, row.label, row.myProbability, row.prediction))

可参考examples/src/main/python/ml/estimator_transformer_param_example.py

Pipeline

相关API :Pipeline

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer

# 训练数据集(id, text, label)元组.
training = spark.createDataFrame([
(0, "a b c d e spark", 1.0),
(1, "b d", 0.0),
(2, "spark f g h", 1.0),
(3, "hadoop mapreduce", 0.0)
], ["id", "text", "label"])

# 配置pipeline,连接tokenizer,hashingTF和lr
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.001)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# 拟合
model = pipeline.fit(training)

# 测试数据集(id, text)元组
test = spark.createDataFrame([
(4, "spark i j k"),
(5, "l m n"),
(6, "spark hadoop spark"),
(7, "apache hadoop")
], ["id", "text"])

# 预测结果
prediction = model.transform(test)
selected = prediction.select("id", "text", "probability", "prediction")
for row in selected.collect():
rid, text, prob, prediction = row
print("(%d, %s) --> prob=%s, prediction=%f" % (rid, text, str(prob), prediction))

可参考examples/src/main/python/ml/pipeline_example.py

弹性分布式数据集-RDD

发表于 2018-04-09

弹性分布式数据集-RDD

弹性分布式数据集(RDD)是一种具有容错特性的数据集合,能在Spark的各个组件间做出各类转换并无缝传递。

有两种方式创建RDD:并行化数据集合或是外部数据集合(文件,HDFS,HBase等)。

并行化集合(Parallelized Collections)

1
2
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)

外部数据集

  • Bash中创建文件

    1
    echo -e "zhangsan, 23\nlisi, 25\nwanger, 27" > /usr/local/spark/data.txt
  • Pyspark Shell中创建基于文件的RDD

    1
    distFile = sc.textFile("data.txt")

当使用文件名时,所以的工作节点(Worker Nodes)都应该有能力访问到该文件。再次强调,本例基于Standalone。

RDD基本操作

1
2
3
4
5
6
7
8
lines = sc.textFile("data.txt")
lineLengths = lines.map(lambda s: len(s)) # 返回每行的长度作为一个集合
lineLengths.collect() # [12, 8, 10],返回元素集合
lineLengths.first() # 12,返回第一个元素
lineLengths.take(2) # [12, 8],,返回第一个元素的集合
lineLengths.count() # 3,集合元素总数
lineLengths.reduce(lambda a, b: a + b) # 30,聚集函数,求和
lineLengthsFiltered = lineLengths.filter(lambda x: x >= 10) # [12, 10],过滤出长度大于等于10的集合

更多可参考集合转换和操作

Spark中的函数传递

Spark API中对函数传递有很大的依赖,主要有三种方式

  • Lambda表达式

    1
    2
    3
    def doStuff(self, rdd):
    field = self.field
    return rdd.map(lambda s: field + s)
  • 内部定义的函数

    1
    2
    3
    4
    5
    6
    7
    if __name__ == "__main__":
    def myFunc(s):
    words = s.split(" ")
    return len(words)

    sc = SparkContext(...)
    sc.textFile("file.txt").map(myFunc)
  • 模块中定义的函数

Spark MLlib概述

发表于 2018-04-08

Spark MLlib概述

Spark的MLlib,其目标是让机器学习实践更加简单且具可扩展性。提供的特性如下:

  • 机器学习算法:分类,回归,聚类和协同过滤
  • 特征提取:特征提取、转化,将维和选择
  • 工作流(Pipelines):构建,评估和调整机器学习工作流的工具
  • 数据持久化:持久化/加载模型,算法和工作流
  • 工具集:线性代数,统计学,数据操作的工具支持

MLlib的主要API

值得注意的是,进入了Spark2.0版本之后,spark.mllib中基于RDD的API( RDD-based APIs)也进入了维护模式,取而代之的是spark.ml中基于DataFrame的API(DataFrame-based API)。

RDD-DataFrame

依赖

  • 线性代数库Breeze
  • 基础线性代数子程序库Intel MKL或OpenBLAS
  • NumPy >=1.4

搭建一个最简单的Spark集群

发表于 2018-04-07

Spark集群的工作原理

集群工作原理

SparkContext可以连接到几种类型的集群管理器(Cluster Managers), 一旦连接,Spark就会获取集群中节点(Worker Node)上的执行者(Executor),这些执行者是运行计算并为应用程序存储数据的进程。 然后,它将写好的应用程序代码(JAR包或Python文件)发送给执行者。 最后,SparkContext发送任务给执行者运行。

集群类型

  1. Standalone
    Spark框架本身也自带了完整的资源调度管理服务,可以独立部署到一个集群中,而不需要依赖其他系统来为其提供资源管理调度服务。

  2. Apache Mesos
    Mesos是一种资源调度管理框架,可以为运行在它上面的Spark提供服务。Spark程序所需要的各种资源,都由Mesos负责调度。目前,Spark官方推荐采用这种模式,所以,许多公司在实际应用中也采用该模式。

  3. Hadoop YARN
    Spark可运行于YARN之上,与Hadoop进行统一部署,资源管理和调度依赖YARN,分布式存储依赖HDFS。

  4. Kubernetes
    Kubernetes(k8s)是自动化容器操作的开源平台,这些操作包括部署,调度和节点集群间扩展。

  5. 同时也有第三方的集群模式,但尚未得到官方的支持,比如Nomad。

之后本教程将主要使用Standalone模式。

Standalone模式

  • 启动集群的master服务
    ./sbin/start-master.sh

  • 启动集群的Workers
    ./sbin/start-slave.sh <master-spark-URL>,此处的master-spark-URL就是刚刚启动好的master服务地址,协议头为spark:://,你可以通过http://localhost:8080查看。本例中为spark://ubuntu:7077或spark://127.0.0.1:7077。

  • jps,可以查看集群信息(Master/Worker的进程ID)

  • 停止集群的脚本也在./sbin下。

Spark组件

发表于 2018-04-06

Spark组件

  • Spark Core:Spark Core包含Spark的基本功能,如内存计算、任务调度、部署模式、故障恢复、存储管理等。Spark建立在统一的抽象RDD之上,使其可以以基本一致的方式应对不同的大数据处理场景。

  • Spark SQL:Spark SQL允许开发人员直接处理RDD,同时也可查询Hive、HBase等外部数据源。Spark SQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行查询,并进行更复杂的数据分析。

  • Spark Streaming:Spark Streaming支持高吞吐量、可容错处理的实时流数据处理。

  • MLlib(机器学习):MLlib提供了常用机器学习算法的实现,包括聚类、分类、回归、协同过滤等,降低了机器学习的门槛,开发人员只要具备一定的理论知识就能进行机器学习的工作;

  • GraphX(图计算):GraphX是Spark中用于图计算的API。

本教程将专注于MLlib组件的学习。

Spark环境搭建

发表于 2018-04-04

目标:

  • 搭建Spark环境
  • 写出第一个Spark程序并运行。

运行环境

  • ubuntu 16.04
  • python3
  • Java 1.8
  • Spark 2.1.0
  • Hadoop 2.7.5

ubuntu 16.04

  • 基于vagrant
  • vagrant box add ubuntu/xenial https://cloud-images.ubuntu.com/xenial/current/xenial-server-cloudimg-amd64-vagrant.box
  • vagrant init ubuntu/xenial
  • 增加ip地址解析config.vm.network "public_network", ip: "192.168.100.110"
  • 增加内存上限config.vm.provider "virtualbox" do |vb| vb.memory = "5248" end
  • vagrant up
  • vagrant ssh

python3

  • curl "https://bootstrap.pypa.io/get-pip.py" -o "get-pip.py"
  • python3 get-pip.py
  • sudo pip install ipython
  • sudo apt install python3-pip
  • sudo pip3 install ipython
  • sudo pip3 install numpy

Java 1.8

  • sudo apt-get update
  • sudo apt-get install -y default-jre default-jdk

Hadoop 2.7.5

  • wget http://mirrors.tuna.tsinghua.edu.cn/apache/hadoop/common/hadoop-2.7.5/hadoop-2.7.5.tar.gz
  • tar zxvf hadoop-2.7.5.tar.gz
  • sudo mv hadoop-2.7.5 /usr/local/hadoop
  • sudo chown vagrant /usr/local/hadoop

Spark 2.3.0

  • wget https://www.apache.org/dyn/closer.lua/spark/spark-2.3.0/spark-2.3.0-bin-without-hadoop.tgz
  • tar zxvf spark-2.3.0-bin-without-hadoop.tgz
  • sudo mv spark-2.3.0-bin-without-hadoop /usr/local/spark
  • sudo chown vagrant /usr/local/spark
  • 在文件/usr/local/spark/conf/spark-env.sh文件开头增加:export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoop classpath)

PATH

  • vi ~/.bashrc,添加以下代码
    1
    2
    3
    4
    5
    6
    7
    8
    export PYSPARK_DRIVER_PYTHON=ipython
    export JAVA_HOME=/usr/lib/jvm/default-java
    export HADOOP_HOME=/usr/local/hadoop
    export SPARK_HOME=/usr/local/spark
    export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
    export PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.6-src.zip:$PYTHONPATH
    export PYSPARK_PYTHON=python3
    export PATH=$HADOOP_HOME/bin:$SPARK_HOME/bin:$HADOOP_HOME/sbin:$PATH

Spark 初体验

  • cd /usr/local/spark
  • 执行./bin/pyspark,进入Spark shell。没有报错,表明安装正确。
  • 运行Spark自带的例子。./bin/spark-submit examples/src/main/python/pi.py 10

第一个Spark程序

  • vi /usr/local/spark/code/wordCount.py

    1
    2
    3
    4
    5
    from pyspark import SparkContext
    sc = SparkContext( 'local', 'test')
    textFile = sc.textFile("file:///usr/local/spark/README.md")
    wordCount = textFile.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a, b : a + b)
    wordCount.foreach(print)

至此,本文也到了该完结的时候,在敲下这个命令之后:python3 code/wordCount.py

Spark由浅入深

发表于 2018-04-02

本系列教程由浅入深介绍Spark在大数据中的实践和应用,让大家对大数据时代能有更清晰,更真切的认识。

123
oobspark

oobspark

Out Of the Box and Spark

27 日志
51 标签
GitHub
© 2018 - 2020 oobspark
由 Hexo 强力驱动
主题 - NexT.Muse