代码开发

DebugTest

UnitTest

Spark 的任务需要允许在 Spark 的上下文中,可以用一个叫做 FindSpark 的包来辅助:

pip install findspark

在初始化上下文之前,需要调用findspark.init()方法,它会自动地根据本地的SPARK_HOME环境变量来定位到 Spark 的 Python 依赖包。

import unittest2
import logging

import findspark
findspark.init()
from pyspark.context import SparkContext

class ExampleTest(unittest2.TestCase):

    def setUp(self):
        self.sc = SparkContext('local[4]')
        quiet_logs(self.sc)

    def tearDown(self):
        self.sc.stop()

    def test_something(self):
        # start by creating a mockup dataset
        l = [(1, 'hello'), (2, 'world'), (3, 'world')]
        # and create a RDD out of it
        rdd = self.sc.parallelize(l)
        # pass it to the transformation you're unit testing
        result = non_trivial_transform(rdd)
        # collect the results
        output = result.collect()
        # since it's unit test let's make an assertion
        self.assertEqual(output[0][1], 2)


def non_trivial_transform(rdd):
    """ a transformation to unit test (word count) - defined here for convenience only"""
    return rdd.map(lambda x: (x[1], 1)).reduceByKey(lambda a, b: a + b)

if __name__ == "__main__":
    unittest2.main()
下一页