
嘻道奇闻
- 文章199742
- 阅读14625734
DataFrame数组合并操作详解:从基础到实战案例
哎,你是不是也遇到过这种情况?明明两个数组就在DataFrame里躺着,想把它俩拼一块儿,结果折腾半天不是报错就是数据对不上?今天咱们就来唠唠这个事儿,手把手带你从菜鸟变高手!
基础篇:数组合并的两种基本姿势
??Q:DataFrame里的数组到底怎么合并?总不能手动复制粘贴吧???
嘿,当然不用!Spark早就给你准备好了两把刷子:
??姿势一:concat简单粗暴式拼接??
就像把两列数据用胶水粘起来,??适合不需要去重的场景??。举个栗子:
python复制from pyspark.sql import functions as F df = spark.createDataFrame([ (["苹果","香蕉"], ["橘子"]), (["西瓜"], ["葡萄","芒果"]) ], ["水果A", "水果B"]) # 直接拼接两列数组 df.withColumn("全家福", F.concat(F.col("水果A"), F.col("水果B"))).show(truncate=False) # 输出结果: # +--------+--------+-------------------+ # |水果A |水果B |全家福 | # +--------+--------+-------------------+ # |[苹果,香蕉]|[橘子] |[苹果,香蕉,橘子] | # |[西瓜] |[葡萄,芒果]|[西瓜,葡萄,芒果] | # +--------+--------+-------------------+
??注意点??:
- 遇到null值会直接返回null(这时候得用concat_ws救场)
- 数组元素类型必须一致,别把数字和字符串混着玩
??姿势二:array_union优雅去重法??
这招就像自动筛子,??合并的同时还能去掉重复项??,特别适合搞会员名单合并这种活儿:
python复制df.withColumn("精选水果", F.array_union(F.col("水果A"), F.col("水果B"))).show() # 第二行数据中的"西瓜"和"葡萄""芒果"原本不重复,所以全保留
实战篇:三个真实业务场景破解
??场景一:用户标签合并??
假设咱们有个电商平台的用户数据:
python复制user_tags = spark.createDataFrame([ (1001, ["数码爱好者","吃货"], ["户外运动"]), (1002, ["美妆达人"], ["数码爱好者","图书迷"]) ], ["user_id", "基础标签", "扩展标签"])
??需求??:合并标签并统计出现次数
python复制result = user_tags.withColumn("全量标签", F.concat(F.col("基础标签"), F.col("扩展标签"))) .withColumn("标签统计", F.explode("全量标签")) .groupBy("标签统计").count()
??避坑指南??:
- 合并前先用array_contains检查敏感标签
- 大数据量时别用UDF,直接用内置函数
??场景二:订单商品合并??
遇到跨平台比价的需求时,经常要处理这种结构:
python复制orders = spark.createDataFrame([ ("订单A", ["iPhone15", "AirPods"], ["小米13", "蓝牙耳机"]), ("订单B", ["华为Mate60"], ["三星S23", "无线充电器"]) ], ["订单号", "自营商品", "第三方商品"])
??特殊需求??:保留商品来源信息
这时候就得用struct来套娃了:
python复制merged = orders.withColumn("全部商品", F.concat( F.transform(F.col("自营商品"), lambda x: F.struct(x.alias("商品名"), F.lit("自营").alias("渠道"))), F.transform(F.col("第三方商品"), lambda x: F.struct(x.alias("商品名"), F.lit("第三方").alias("渠道"))) ))
??输出效果??:
[{"商品名":"iPhone15","渠道":"自营"}, {"商品名":"AirPods","渠道":"自营"}, ...]
??场景三:时序数据拼接??
搞物联网的小伙伴肯定常遇到这种:
python复制sensor_data = spark.createDataFrame([ ("设备A", ["2023-08-01 10:00:00", "2023-08-01 11:00:00"], [25, 26]), ("设备B", ["2023-08-01 09:00:00"], [28]) ], ["设备ID", "时间序列", "温度序列"])
??需求??:把时间和温度对齐合并
python复制# 先确保两个数组长度一致 valid_data = sensor_data.filter(F.size("时间序列") == F.size("温度序列")) # 用zip_with函数打包 result = valid_data.withColumn("数据包", F.zip_with("时间序列", "温度序列", lambda time, temp: F.struct(time.alias("时间"), temp.alias("温度"))))
高阶技巧:性能优化三板斧
??第一招:列式存储预处理??
遇到要合并10个以上数组的情况,千万别直接concat!先做这三步:
- 用withColumn逐列添加检查点
- 对每个数组先做filter去null值
- 最后用coalesce合并分区
??第二招:分阶段合并策略??
像处理用户行为日志这种量级的数据,可以这么玩:
python复制# 第一阶段:按小时合并 hourly_merged = raw_data.groupBy("user_id", F.window("event_time", "1 hour")) .agg(F.collect_list("actions").alias("hour_actions")) # 第二阶段:按天合并 daily_merged = hourly_merged.groupBy("user_id", F.date_trunc("day", "window.start")) .agg(F.flatten(F.collect_list("hour_actions")).alias("daily_actions"))
??第三招:内存控制秘籍??
当合并操作导致executor内存爆炸时:
- 设置spark.sql.functions.sizeOfNull=false
- 在concat前先用slice函数分段
- 启用offHeap内存管理
个人观点时间
干了这么多年大数据,我发现很多小伙伴容易陷入两个极端:要么无脑用concat导致数据冗余,要么过早优化搞出一堆复杂UDF。??其实把握住三个原则就够了??:
- 能躺着用内置函数就别站着写UDF
- 合并前先想清楚要不要去重
- 处理超长数组时(超过1万元素)一定要先分治
最后说句大实话,DataFrame的数组操作就像拼乐高——找准接口对齐卡扣,剩下的交给Spark自己发挥就行。下次遇到数组合并的问题,别慌,把这篇文章翻出来对照着搞,保准你事半功倍!