Spark编程模型

Rick

SparkContext类和SparkConf类

代码中初始化

我们可通过如下方式调用 SparkContext 的简单构造函数,以默认的参数值来创建相应的对象。
val sc = new SparkContext("local[4]", "Test Spark App")

这段代码会创建一个4线程的 SparkContext 对象 。

appName 参数是你程序的名字,它会显示在 cluster UI 上,上面代码相应的任务命名为 Test Spark APP。

master 是 Spark, Mesos 或 YARN 集群的 URL,或运行在本地模式时,使用专用字符串 “local”。在实践中,当应用程序运行在一个集群上时,你并不想要把 master 硬编码到你的程序中,你可以用 spark-submit 启动你的应用程序的时候传递它。然而,你可以在本地测试和单元测试中使用 “local” 运行 Spark 进程。

[SparkContext]

[pyspark.SparkContext(master=None, appName=None, sparkHome=None, pyFiles=None, environment=None, batchSize=0, serializer=PickleSerializer(), conf=None, gateway=None, jsc=None, profiler_cls=<class 'pyspark.profiler.BasicProfiler'>)]

shell中初始化

在 Spark shell 中,有一个专有的 SparkContext 已经为你创建好。在变量中叫做 sc。你自己创建的 SparkContext 将无法工作。可以用 --master 参数来设置 SparkContext 要连接的集群,用 --jars 来设置需要添加到 classpath 中的 JAR 包,如果有多个 JAR 包使用逗号分割符连接它们。例如:在一个拥有 4 核的环境上运行 bin/spark-shell,使用:

$ ./bin/spark-shell --master local[4]

或在 classpath 中添加 code.jar,使用:

$ ./bin/spark-shell --master local[4] --jars code.jar

执行 spark-shell --help 获取完整的选项列表。在这之后,调用 spark-shell 会比 spark-submit 脚本更为普遍。

SQLContext

操作sql相关函数的类,需要这样才能使用:

 

sc = SparkContext(master='local[8]', appName='kmeans')
sql_ctx = SQLContext(sc)


 

Spark:弹性分布式数据集RDD及spark转换和操作                    

共享变量

Spark 的共享变量能被运行在并行计算中。默认情况下,当 Spark 运行一个并行函数时,这个并行函数会作为一个任务集在不同的节点上运行,它会把函数里使用的每个变量都复制搬运到每个任务中。有时,一个变量需要被共享到交叉任务中或驱动程序和任务之间。

Spark 支持 2 种类型的共享变量:广播变量(broadcast variables),用来在所有节点的内存中缓存一个值;累加器(accumulators),仅仅只能执行“添加(added)”操作,例如:记数器(counters)和求和(sums)。

广播变量broadcast variable

  • class pyspark.Broadcast(sc=None, value=None, pickle_registry=None, path=None)

  • A broadcast variable created with SparkContext.broadcast().Access its value through value.

    Examples:

    >>> from pyspark.context import SparkContext
    >>> sc = SparkContext('local', 'test')
    >>> b = sc.broadcast([1, 2, 3, 4, 5])
    >>> b.value
    [1, 2, 3, 4, 5]
    >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
    [1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
    >>> b.unpersist()
    >>> large_broadcast = sc.broadcast(range(10000))
    • destroy()

    • Destroy all data and metadata related to this broadcast variable.Use this with caution; once a broadcast variable has been destroyed,it cannot be used again. This method blocks until destroy hascompleted.

    • dump(value, f)

    • load(path)

    • unpersist(blocking=False)

    • Delete cached copies of this broadcast on the executors. If thebroadcast is used after this is called, it will need to bere-sent to each executor.

      Parameters:blocking – Whether to block until unpersisting has completed
    • value

    • Return the broadcasted value

广播变量允许程序员缓存一个只读的变量在每台机器上面,而不是每个任务保存一份拷贝,它由运行 SparkContext 的驱动程序创建后发送给会参与计算的节点。例如,利用广播变量,我们能够以一种更有效率的方式将一个大数据量输入集合的副本分配给每个节点。对那些需要让各工作节点高效地访问相同数据的应用场景,比如机器学习,这非常有用。(Broadcast variables allow theprogrammer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.They can be used, for example,to give every node a copy of a large input dataset in an efficient manner.)Spark也尝试着利用有效的广播算法去分配广播变量,以减少通信的成本。

一个广播变量可以通过调用SparkContext.broadcast(v)方法从一个初始变量v中创建。广播变量是v的一个包装变量,它的值可以通过value方法访问。

val broadcastAList = sc.broadcast(List("a", "b", "c", "d", "e"))
终端的输出表明,广播变量存储在内存中。

广播变量也可以被非驱动程序所在的节点(即工作节点)访问,访问的方法是调用该变量的value 方法:
sc.parallelize(List("1", "2", "3")).map(x => broadcastAList.value ++ x).collect

     RDD分布                                  各节点map操作                                       合并到主节点

广播变量创建以后,我们就能够在集群的任何函数中使用它来代替变量v,这样我们就不需要再次传递变量v到每个节点上。另外,为了保证所有的节点得到广播变量具有相同的值,对象v不能在广播之后被修改。


累加器

顾名思义,累加器是一种只能通过关联操作进行“加”操作的变量,因此它能够高效的应用于并行操作中。它们能够用来实现counters和sums。Spark原生支持数值类型的累加器,开发者可以自己添加支持的类型。如果创建了一个具名的累加器,它可以在spark的UI中显示。这对于理解运行阶段(running stages)的过程有很重要的作用。(注意:这在python中还不被支持)

明星学员

曾*浩 就职于广东众望通科技股份有限公司

转正7000

谭* 就职于GEA国际集团(锆德教育资讯(深圳)有限公司广州分公司

转正8000

刘* 就职于睿盟计算机科技有限公司

转正8000

林*勇 就职于南方人才市场

转正6000

李*达 就职于云景科技

8000

吴* 就职于北京易诚互动网络技术有限公司

转正6000