脚本之家,脚本语言编程技术及教程分享平台!
分类导航

Python|VBS|Ruby|Lua|perl|VBA|Golang|PowerShell|Erlang|autoit|Dos|bat|shell|

服务器之家 - 脚本之家 - Python - 基于PySpark SQL的媒体浏览日志ETL作业

基于PySpark SQL的媒体浏览日志ETL作业

2023-11-27 17:26口袋大数据 Python

pyspark除了官方的文档,网上的教程资料一直很少,但基于调度平台下,使用pyspark编写代码非常高效,程序本身是提交到spark集群中,性能上也是毫无问题的

pyspark除了官方的文档,网上的教程资料一直很少,但基于调度平台下,使用pyspark编写代码非常高效,程序本身是提交到spark集群中,性能上也是毫无问题的,在本文中,我们将深入探讨基于Spark的媒体浏览日志ETL(提取、转换、加载)流水线的详细实现,在展示如何使用PySpark SQL处理大规模的媒体浏览日志数据,包括IP地址转换、数据清洗、时间维度补充、码表关联等关键步骤。

基于PySpark SQL的媒体浏览日志ETL作业

一、环境配置

首先,我们需要创建一个SparkSession并导入必要的库和设置默认参数,包括与IP-to-Location数据库的交互以及其他相关的配置。

如果pyspark仅仅是本地运行而不是提交集群时,可以使用findspark库,它能够帮助我们快速初始化Spark环境。在开始之前,确保您已经成功安装了findspark库,并已经下载并解压了Spark二进制文件。将Spark的安装路径和Python解释器路径指定为变量。

import findspark

# 指定Spark的安装路径
spark_home = "/usr/local/spark"

# 指定用于Spark的Python解释器路径
python_path = "/home/hadoop/.conda/envs/sparkbox/bin/python3.6"

# 使用findspark.init方法初始化Spark环境
findspark.init(spark_home, python_path)

findspark.init方法将帮助设置PYSPARK_PYTHON和SPARK_HOME环境变量,确保正确的Spark库和配置文件被加载。其简化了Spark环境的初始化过程,避免手动配置环境变量。

二、数据处理

接下来,我们定义了一个NewsEtl类,用于执行数据处理和转换的各个步骤。这包括从HDFS中读取媒体浏览日志数据,进行IP地址转、换,清洗数据,添加时间维度,补充码表信息等。

在spark_function中,我们详细说明了数据处理的逻辑。这包括读取媒体浏览日志数据、进行IP地址转换、添加时间维度、补充码表信息、数据清洗和最终写入HDFS等步骤。

1.数据读取

首先,我们使用PySpark的read方法从HDFS中读取媒体浏览日志数据。我们指定了数据的schema,以确保正确地解析每一列。

df = spark.read.schema(schema).parquet(
"hdfs://xxx:8020/user/hive/warehouse/xxx.db/ods_media_browse_log").filter(
"dt in ({})".format(",".join(["'{}'".format(partition) for partition in latest_partitions])))

2.IP地址转换

接下来,我们通过iptranslate函数将IP地址转换为地理位置信息。这使用了XdbSearcher类,该类负责读取xdb文件并执行IP地址的二分查找。

# 根据IP地址获取地点信息
from_ip_get_place_udf = udf(action.iptranslate, struct_schema)
df = df.withColumn('country', from_ip_get_place_udf(col('ip'), lit('country')))
df = df.withColumn("place", from_ip_get_place_udf(col('ip')))
df = df.withColumn("country", df["place"]["country"])
df = df.withColumn("city", df["place"]["city"])
df = df.withColumn("province", df["place"]["province"])
df = df.drop('place')

3.时间维度添加

我们生成当前时间的时间戳,并添加各种时间格式的列,包括年、季度、月、日、小时等。

# 生成当前时间的时间戳
df = df.withColumn("current_timestamp", from_unixtime(df["operation_time"] / 1000))
# 添加各种时间格式的列
df = df.withColumn("year", date_format("current_timestamp", "yyyy"))
df = df.withColumn("quarter", date_format("current_timestamp", "yyyy-MM"))
df = df.withColumn("month", date_format("current_timestamp", "yyyy-MM"))
df = df.withColumn("day", date_format("current_timestamp", "dd"))
df = df.withColumn("hour_time", date_format("current_timestamp", "yyyy-MM-dd HH"))
df = df.withColumn("dt", date_format("current_timestamp", "yyyy-MM-dd"))
df = df.withColumn("hour", date_format("current_timestamp", "HH"))
df = df.drop('current_timestamp')

4.数据清洗

最后,我们对数据进行清洗,包括将空值替换为默认值、字符串去除空格、数据类型转换等。

# 数据清洗
newdf = newdf.withColumn("media_type", when(col("media_type").isNull(), 0).otherwise(col("media_type")))
newdf = newdf.withColumn("news_type", when(col("news_type").isNull(), 99).otherwise(col("news_type")))
newdf = newdf.withColumn("original_type", when(col("original_type").isNull(), 99).otherwise(col("original_type")))
# ...

5.最终写入HDFS

最终,我们将处理后的数据写入HDFS,采用分区方式存储,以便更高效地管理和查询。

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
newdf.write.partitionBy("dt", "hour").mode("overwrite").option('user', 'hive').parquet(
"hdfs://xxxx:8020/user/hive/warehouse/xxx.db/dwd_media_browse_log")

通过这一系列步骤,我们完成了对媒体浏览日志数据的全面处理,包括数据转换、地理位置信息的添加、时间维度的补充和数据清洗等关键步骤。

三、结论

通过详细的实现步骤,深入解析了基于Spark的媒体浏览日志ETL任务的构建过程。这个任务可以根据具体需求进行调整和扩展,为大规模数据处理任务提供了一种高效而灵活的解决方案。

原文地址:https://mp.weixin.qq.com/s?__biz=MzI0MjYwNTUyMw==&mid=2247484160&idx=1&sn=c5123f3086252737c36983e5595e8c6a

延伸 · 阅读

精彩推荐
  • Pythonselenium在执行phantomjs的API并获取执行结果的方法

    selenium在执行phantomjs的API并获取执行结果的方法

    今天小编就为大家分享一篇selenium在执行phantomjs的API并获取执行结果的方法,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...

    Bendawang9422021-04-30
  • PythonPython实现将文本生成二维码的方法示例

    Python实现将文本生成二维码的方法示例

    这篇文章主要介绍了Python实现将文本生成二维码的方法,结合完整实例形式分析了Python生成二维码操作的具体步骤与相关实现技巧,需要的朋友可以参考下...

    酷小孩6242020-11-27
  • Pythonpython 循环结构练习题

    python 循环结构练习题

    这篇文章主要给大家分享的是python 循环结构练习题,求两个数最大公约数、整数反转:如12345,输出54321等多个练习题,需要的朋友可以参考一下...

    全菜小能手9872022-02-26
  • PythonPython判断文件和字符串编码类型的实例

    Python判断文件和字符串编码类型的实例

    下面小编就为大家分享一篇Python判断文件和字符串编码类型的实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧...

    浅醉樱花雨8122020-12-27
  • PythonFlask Web开发入门之文件上传(八)

    Flask Web开发入门之文件上传(八)

    这篇文章主要为大家详细介绍了Flask Web开发入门之文件上传的相关资料,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    __HelloWorld__10022021-03-28
  • Python详解pandas映射与数据转换

    详解pandas映射与数据转换

    这篇文章主要介绍了pandas映射与数据转换的相关资料,帮助大家更好的利用python进行数据分析,感兴趣的朋友可以了解下...

    元小疯9932021-08-28
  • Pythonpython机器学习之神经网络(三)

    python机器学习之神经网络(三)

    这篇文章主要为大家详细介绍了python机器学习之神经网络第三篇,具有一定的参考价值,感兴趣的小伙伴们可以参考一下...

    Jeffrey_Cui4562020-12-26
  • PythonPython编程在flask中模拟进行Restful的CRUD操作

    Python编程在flask中模拟进行Restful的CRUD操作

    今天小编就为大家分享一篇关于Python编程在flask中模拟进行Restful的CRUD操作,小编觉得内容挺不错的,现在分享给大家,具有很好的参考价值,需要的朋友一...

    liumiaocn6782021-05-09