经验首页 前端设计 程序设计 Java相关 移动开发 数据库/运维 软件/图像 大数据/云计算 其他经验
当前位置:技术经验 » 数据库/运维 » Spark » 查看文章
使用Python的Mock库进行PySpark单元测试
来源:cnblogs  作者:氢氦  时间:2019/3/12 8:51:01  对本文有异议

测试是软件开发中的基础,它经常被数据开发者忽视,但是它很重要。在本文中会展示如何使用Python的uniittest.mock库,对一段PySpark代码进行测试。笔者会从数据科学家的视角来进行工作,这意味着本文将不会深入某些软件开发的细节。

本文链接:https://www.cnblogs.com/hhelibeb/p/10508692.html

英文原文:Stop mocking me! Unit tests in PySpark using Python’s mock library

单元测试和mock是什么?

单元测试是一种测试代码片段的方式,确保代码片段按预期工作。Python中的uniittest.mock库,允许人们将部分代码替换为mock对象,并对人们使用这些mock对象的方式进行断言。“mock”的功能如名字所示——它模仿代码中的对象/变量的属性。

最终目标:测试spark.sql(query)

PySpark中最简单的创建dataframe的方式如下:

  1. df = spark.sql("SELECT * FROM table")

虽然它很简单,但依然应该被测试。

准备代码和问题

假设我们为一家电子商务服装公司服务,我们的目标是创建产品相似度表,用某些条件过滤数据,把它们写入到HDFS中。

假设我们有如下的表:

  1. 1. Products. Columns: item_id”, category_id”.
  1. 2. Product_similarity (unfiltered). Columns: item_id_1”, item_id_2”, similarity_score”.

(假设Product_similarity中的相似度分数在0~1之间,越接近1,就越相似。)

查看一对产品和他们的分数是很简单的:

  1. SELECT
  2. s.item_id_1,
  3. s.item_id_2,
  4. s.similarity_score
  5. FROM product_similarity s
  6. WHERE s.item_id_1 != s.item_id_2

where子句将和自身对比的项目移除。否则的话会得到分数为1的结果,没有意义!

要是我们想要创建一个展示相同目录下的产品的相似度的表呢?要是我们不关心鞋子和围巾的相似度,但是想要比较不同的鞋子与鞋子、围巾与围巾呢?这会有点复杂,需要我们连接“product”和“product_similarity”两个表。

查询语句变为:

  1. SELECT
  2. s.item_id_1,
  3. s.item_id_2,
  4. s.similarity_score
  5. FROM product_similarity s
  6. INNER JOIN products p
  7. ON s.item_id_1 = p.item_id
  8. INNER JOIN products q
  9. ON s.item_id_2 = q.item_id
  10. WHERE s.item_id_1 != s.item_id_2
  11. AND p.category_id = q.category_i

我们也可能想得知与每个产品最相似的N个其它项目,在该情况下,查询语句为:

  1. SELECT
  2. s.item_id_1,
  3. s.item_id_2,
  4. s.similarity_score
  5. FROM (
  6. SELECT
  7. s.item_id_1,
  8. s.item_id_2,
  9. s.similarity_score,
  10. ROW_NUMBER() OVER(PARTITION BY item_id_1 ORDER BY similarity_score DESC) as row_num
  11. FROM product_similarity s
  12. INNER JOIN products p
  13. ON s.item_id_1 = p.item_id
  14. INNER JOIN products q
  15. ON s.item_id_2 = q.item_id
  16. WHERE s.item_id_1 != s.item_id_2
  17. AND p.category_id = q.category_id
  18. )
  19. WHERE row_num <= 10

(假设N=10)

现在,要是我们希望跨产品目录比较和在产品目录内比较两种功能成为一个可选项呢?我们可以通过使用名为same_category的布尔变量,它会控制一个字符串变量same_category_q的值,并将其传入查询语句(通过.format())。如果same_category为True,则same_category_q中为inner join的内容,反之,则为空。查询语句如下:

  1. '''
  2. SELECT
  3. s.item_id_1,
  4. s.item_id_2,
  5. s.similarity_score
  6. FROM product_similarity s
  7. {same_category_q}
  8. '''.format(same_category_q='') # Depends on value of same_category boolean

(译注:Python 3.6以上可以使用f-Strings代替format)

让我们把它写得更清楚点,用function包装一下,

  1. def make_query(same_category, table_paths):
  2. if same_category is True:
  3. same_category_q = '''
  4. INNER JOIN {product_table} p
  5. ON s.item_id_1 = p.item_id
  6. INNER JOIN {product_table} q
  7. ON s.item_id_2 = q.item_id
  8. WHERE item_id_1 != item_id_2
  9. AND p.category_id = q.category_id
  10. '''.format(product_table=table_paths["products"]["table"])
  11. else:
  12. same_category_q = ''
  13.  
  14. return same_category_q

到目前为止,很不错。我们输出了same_category_q,因此可以通过测试来确保它确实返回了所需的值。

回忆我们的目标,我们需要将dataframe写入HDFS,我们可以通过如下方法来测试函数:

  1. def create_new_table(spark, table_paths, params, same_category_q):
  2. similarity_table = table_paths["product_similarity"]["table"]
  3. created_table = spark.sql(create_table_query.format(similarity_table=similarity_table,
  4. same_category_q=same_category_q,
  5. num_items=params["num_items"]))
  6. # Write table to some path
  7. created_table.coalesce(1).write.save(table_paths["created_table"]["path"],
  8. format="orc", mode="Overwrite")

添加查询的第一部分和一个主方法,完成我们的脚本,得到:

  1. import pyspark
  2. from pyspark.sql import SparkSession
  3. create_table_query = '''
  4. SELECT
  5. item_id_1,
  6. item_id_2
  7. FROM (
  8. SELECT
  9. item_id_1,
  10. item_id_2,
  11. ROW_NUMBER() OVER(PARTITION BY item_id_1 ORDER BY similarity_score DESC) as row_num
  12. FROM {similarity_table} s
  13. {same_category_q}
  14. )
  15. WHERE row_num <= {num_items}
  16. '''
  17.  
  18. def create_new_table(spark, table_paths, params, from_date, to_date, same_category_q):
  19. similarity_table = table_paths["product_similarity"]["table"]
  20. created_table = spark.sql(create_table_query.format(similarity_table=similarity_table,
  21. same_category_q=same_category_q,
  22. num_items=params["num_items"]))
  23. # Write table to some path
  24. created_table.coalesce(1).write.save(table_paths["created_table"]["path"],
  25. format="orc", mode="Overwrite")
  26. def make_query(same_category, table_paths):
  27. if same_category is True:
  28. same_category_q = '''
  29. INNER JOIN {product_table} p
  30. ON s.item_id_1 = p.item_id
  31. INNER JOIN {product_table} q
  32. ON s.item_id_2 = q.item_id
  33. WHERE item_id_1 != item_id_2
  34. AND p.category_id = q.category_id
  35. '''.format(product_table=table_paths["product_table"]["table"])
  36. else:
  37. same_category_q = ''
  38.  
  39. return same_category_q
  40. if __name__ == "__main__":
  41. spark = (SparkSession
  42. .builder
  43. .appName("testing_tutorial")
  44. .enableHiveSupport()
  45. .getOrCreate())
  46. same_category = True # or False
  47. table_paths = foo # Assume paths are in some JSON
  48. params = bar
  49. same_category_q, target_join_q = make_query(same_category, table_paths)
    create_new_table(spark, table_paths, params, same_category_q)

这里的想法是,我们需要创建为脚本中的每个函数创建function,名字一般是test_name_of_function()。需要通过断言来验证function的行为符合预期。

测试查询-make_query

首先,测试make_query。make_query有两个输入参数:一个布尔变量和某些表路径。它会基于布尔变量same_category返回不同的same_category_q。我们做的事情有点像是一个if-then语句集:

  1. 1. If same_category is True, then same_category_q = INNER JOIN …”
  1. 2. If same_category is False, then same_category_q = “” (empty)

我们要做的是模拟make_query的参数,把它们传递给function,接下来测试是否得到期望的输出。因为test_paths是个目录,我们无需模拟它。测试脚本如下,说明见注释:

  1. def test_make_query_true(mocker):
  2. # Create some fake table paths
  3. test_paths = {
  4. "product_table": {
  5. "table": "products",
  6. },
  7. "similarity_table": {
  8. "table": "product_similarity"
  9. }
  10. }
  11. # Call the function with our paths and "True"
  12. same_category_q = make_query(True, test_paths)
  13. # We want same_category_q to be non-empty
  14. assert same_category_q != ''
  15.  
  16. def test_make_query_false(mocker):
  17. # As above, create some fake paths
  18. test_paths = {
  19. "product_table": {
  20. "table": "products",
  21. },
  22. "similarity_table": {
  23. "table": "product_similarity"
  24. }
  25. }
  26. same_category_q = make_query(False, test_paths)
  27. # This time, we want same_category_q to be empty
  28. assert same_category_q == ''

就是这么简单!

测试表创建

下一步,我们需要测试create_new_table的行为。逐步观察function,我们可以看到它做了几件事,有几个地方可以进行断言和模拟。注意,无论何时,只要程序中有某些类似df.write.save.something.anotherthing的内容,我们就需要模拟每个操作和它们的输出。

  1. 这个function使用spark作为参数,这需要被模拟。
  2. 通过调用spark.sql(create_table_query.format(**some_args))来创建created_table。我们需要断言spark.sql()只被调用了一次。我们也需要模拟spark.sql()的输出。
  3. Coalesce created_table。保证调用coalesce()时的参数是1。模拟输出。
  4. 写coalesced table,我们需要模拟.write,模拟调用它的输出。
  5. 将coalesced table保存到一个路径。确保它的调用带有正确的参数。

和前面一样,测试脚本如下:

  1. ef test_create_new_table(mocker):
  2. # Mock all our variables
  3. mock_spark = mock.Mock()
  4. mock_category_q = mock.Mock()
  5. mock_created_table = mock.Mock()
  6. mock_created_table_coalesced = mock.Mock()
  7. # Calling spark.sql with create_table_query returns created_table - we need to mock it
  8. mock_spark.sql.side_effect = [mock_created_table]
  9. # Mock the output of calling .coalesce on created_table
  10. mock_created_table.coalesce.return_value = mock_created_table_coalesced
  11. # Mock the .write as well
  12. mock_write = mock.Mock()
  13. # Mock the output of calling .write on the coalesced created table
  14. mock_created_table_coalesced.write = mock_write
  15. test_paths = {
  16. "product_table": {
  17. "table": "products",
  18. },
  19. "similarity_table": {
  20. "table": "product_similarity"
  21. },
  22. "created_table": {
  23. "path": "path_to_table",
  24. }
  25. }
  26. test_params = {
  27. "num_items": 10,
  28. }
  29. # Call our function with our mocks
  30. create_new_table(mock_spark, test_paths, test_params, mock_category_q)
  31. # We only want spark.sql to have been called once, so assert that
  32. assert 1 == mock_spark.sql.call_count
  33. # Assert that we did in fact call created_table.coalesce(1)
  34. mock_created_table.coalesce.assert_called_with(1)
  35. # Assert that the table save path was passed in properly
  36. mock_write.save.assert_called_with(test_paths["created_table"]["path"],
  37. format="orc", mode="Overwrite")

最后,把每样东西保存在一个文件夹中,如果你想的话,你需要从相应的模块中导入function,或者把所有东西放在同一个脚本中。

为了测试它,在命令行导航到你的文件夹(cd xxx),然后执行:

  1. python -m pytest final_test.py.

你可以看到类似下面的输出,

serena@Comp-205:~/workspace$ python -m pytest testing_tutorial.py
============================= test session starts ==============================
platform linux -- Python 3.6.4, pytest-3.3.2, py-1.5.2, pluggy-0.6.0
rootdir: /home/serena/workspace/Personal,
inifile: plugins: mock-1.10.0 collected 3 items testing_tutorial.py ...
[100%]
=========================== 3 passed in 0.01 seconds ===========================

结语

以上是全部内容。希望你觉得有所帮助。当我试图弄明白如何mock的时候,我希望可以遇到类似这样一篇文章。

现在就去做吧,就像Stewie所说的那样,(don’t) stop mocking me (functions)!

 

原文链接:http://www.cnblogs.com/hhelibeb/p/10508692.html

 友情链接:直通硅谷  点职佳  北美留学生论坛

本站QQ群:前端 618073944 | Java 606181507 | Python 626812652 | C/C++ 612253063 | 微信 634508462 | 苹果 692586424 | C#/.net 182808419 | PHP 305140648 | 运维 608723728

W3xue 的所有内容仅供测试,对任何法律问题及风险不承担任何责任。通过使用本站内容随之而来的风险与本站无关。
关于我们  |  意见建议  |  捐助我们  |  报错有奖  |  广告合作、友情链接(目前9元/月)请联系QQ:27243702 沸活量
皖ICP备17017327号-2 皖公网安备34020702000426号