Prophet是微软开源的时序预测算法,自动化程度极高,支持R和Python接口,能自动考虑异常值、缺失值、突变点和节假日等影响因素。

本场景是每天定时预测第二天各县市的iptv播放用户数,即每天对全国的省县市的时序数据进行训练并预测出第二天288个5分钟对应的播放用户数,用以进行下一个环节的用户数实时突降预警。如果在客户端上单机上对一千多个县市直接套用for循环进行预测,耗时1h30min,实际部署的话将浪费了1h30min的时序特征,且挤占了很多的计算资源(如果想利用python多线程加速的话),因此在依托yarn集群的spark平台上融合prophet进行预测才是一个比较合理的方法,实测训练预测的流程运行耗时降至15分钟以内。

首先遇到的问题是spark和python版本都太老旧,python版本为2.7,spark版本为2.2;第二个问题是由于prophet模型依赖pandas库,大多数案例都是使用spark2.3.x结合pyspark的pandas udf(https://towardsdatascience.com/pyspark-forecasting-with-pandas-udf-and-fb-prophet-e9d70f86d802),这样对group by后的数据一一应用prophet模型的接口时,能大大减少运行时间。但是笔者使用的大数据平台为很多人共用,每分钟都运行着其他人数不清的spark任务,为了业务的稳定运行不太方便更换spark版本,因此只能舍弃pandas_udf的高性能。好在spark支持提交虚拟环境的python zip包,才使得python3+prophet可以在这么老旧的数据平台上发挥作用。

一.python虚拟环境的打包

推荐使用anaconda建立虚拟环境,并导出。
首先创建虚拟环境

conda create -n ml_env python=3.6 (-n 指定给虚拟环境命名,python=3.6指定Python版本)```

然后在激活python环境的前提下,下载依赖包
```shell 
source activate ml_env
pip install pandas
pip install fbprophet

最后在anaconda安装目录的envs文件夹下进行打包

zip -r envs.zip ml_env

由于本案例pyspark使用的是yarn模式,因此需要上传到HDFS上

 hadoop fs  -put envs.zip =hdfs:///user/XXXX/spark/

二、prophet运行代码开发

具体样例代码

三、提交py脚本

#py_envs表示,这个文件被解压后指定的文件名称。(路径不一定和本样例一模一样)

spark-submit \
--master yarn-cluster \
--queue queue_iptvqoe  \
--num-executors 10 \
--driver-memory 4g \
--executor-cores 6 \
--executor-memory 9g \
--conf spark.driver.port=50001 \
--conf spark.yarn.dist.archives=hdfs:///user/iptvqoe/XXX/envs.zip#py_envs \
--conf spark.yarn.appMasterEnv.PYSPARK_PYTHON=py_envs/ml_env/py3.6/bin/python\
 /slview/qoezy/lushun1/area_al/prophet_area.py