首页 > 社会 > 正文内容

DataFrame数组合并操作详解:从基础到实战案例

社会2025-05-28 03:30:46

哎,你是不是也遇到过这种情况?明明两个数组就在DataFrame里躺着,想把它俩拼一块儿,结果折腾半天不是报错就是数据对不上?今天咱们就来唠唠这个事儿,手把手带你从菜鸟变高手!


基础篇:数组合并的两种基本姿势

??Q:DataFrame里的数组到底怎么合并?总不能手动复制粘贴吧???
嘿,当然不用!Spark早就给你准备好了两把刷子:

??姿势一:concat简单粗暴式拼接??
就像把两列数据用胶水粘起来,??适合不需要去重的场景??。举个栗子:

python复制
from pyspark.sql import functions as F

df = spark.createDataFrame([
    (["苹果","香蕉"], ["橘子"]),
    (["西瓜"], ["葡萄","芒果"])
], ["水果A", "水果B"])

# 直接拼接两列数组
df.withColumn("全家福", F.concat(F.col("水果A"), F.col("水果B"))).show(truncate=False)

# 输出结果:
# +--------+--------+-------------------+
# |水果A   |水果B   |全家福             |
# +--------+--------+-------------------+
# |[苹果,香蕉]|[橘子]  |[苹果,香蕉,橘子]    |
# |[西瓜]   |[葡萄,芒果]|[西瓜,葡萄,芒果]    |
# +--------+--------+-------------------+

??注意点??:

  • 遇到null值会直接返回null(这时候得用concat_ws救场)
  • 数组元素类型必须一致,别把数字和字符串混着玩

??姿势二:array_union优雅去重法??
这招就像自动筛子,??合并的同时还能去掉重复项??,特别适合搞会员名单合并这种活儿:

python复制
df.withColumn("精选水果", F.array_union(F.col("水果A"), F.col("水果B"))).show()
# 第二行数据中的"西瓜"和"葡萄""芒果"原本不重复,所以全保留

实战篇:三个真实业务场景破解

??场景一:用户标签合并??
假设咱们有个电商平台的用户数据:

python复制
user_tags = spark.createDataFrame([
    (1001, ["数码爱好者","吃货"], ["户外运动"]),
    (1002, ["美妆达人"], ["数码爱好者","图书迷"])
], ["user_id", "基础标签", "扩展标签"])

??需求??:合并标签并统计出现次数

python复制
result = user_tags.withColumn("全量标签", F.concat(F.col("基础标签"), F.col("扩展标签")))
           .withColumn("标签统计", F.explode("全量标签"))
           .groupBy("标签统计").count()

??避坑指南??:

  • 合并前先用array_contains检查敏感标签
  • 大数据量时别用UDF,直接用内置函数

??场景二:订单商品合并??
遇到跨平台比价的需求时,经常要处理这种结构:

python复制
orders = spark.createDataFrame([
    ("订单A", ["iPhone15", "AirPods"], ["小米13", "蓝牙耳机"]),
    ("订单B", ["华为Mate60"], ["三星S23", "无线充电器"])
], ["订单号", "自营商品", "第三方商品"])

??特殊需求??:保留商品来源信息
这时候就得用struct来套娃了:

python复制
merged = orders.withColumn("全部商品", 
    F.concat(
        F.transform(F.col("自营商品"), lambda x: F.struct(x.alias("商品名"), F.lit("自营").alias("渠道"))),
        F.transform(F.col("第三方商品"), lambda x: F.struct(x.alias("商品名"), F.lit("第三方").alias("渠道")))
    ))

??输出效果??:

[{"商品名":"iPhone15","渠道":"自营"}, {"商品名":"AirPods","渠道":"自营"}, ...]

??场景三:时序数据拼接??
搞物联网的小伙伴肯定常遇到这种:

python复制
sensor_data = spark.createDataFrame([
    ("设备A", ["2023-08-01 10:00:00", "2023-08-01 11:00:00"], [25, 26]),
    ("设备B", ["2023-08-01 09:00:00"], [28])
], ["设备ID", "时间序列", "温度序列"])

??需求??:把时间和温度对齐合并

python复制
# 先确保两个数组长度一致
valid_data = sensor_data.filter(F.size("时间序列") == F.size("温度序列"))

# 用zip_with函数打包
result = valid_data.withColumn("数据包", 
    F.zip_with("时间序列", "温度序列", 
        lambda time, temp: F.struct(time.alias("时间"), temp.alias("温度"))))

高阶技巧:性能优化三板斧

??第一招:列式存储预处理??
遇到要合并10个以上数组的情况,千万别直接concat!先做这三步:

  1. 用withColumn逐列添加检查点
  2. 对每个数组先做filter去null值
  3. 最后用coalesce合并分区

??第二招:分阶段合并策略??
像处理用户行为日志这种量级的数据,可以这么玩:

python复制
# 第一阶段:按小时合并
hourly_merged = raw_data.groupBy("user_id", F.window("event_time", "1 hour"))
                   .agg(F.collect_list("actions").alias("hour_actions"))

# 第二阶段:按天合并
daily_merged = hourly_merged.groupBy("user_id", F.date_trunc("day", "window.start"))
                   .agg(F.flatten(F.collect_list("hour_actions")).alias("daily_actions"))

??第三招:内存控制秘籍??
当合并操作导致executor内存爆炸时:

  • 设置spark.sql.functions.sizeOfNull=false
  • 在concat前先用slice函数分段
  • 启用offHeap内存管理

个人观点时间

干了这么多年大数据,我发现很多小伙伴容易陷入两个极端:要么无脑用concat导致数据冗余,要么过早优化搞出一堆复杂UDF。??其实把握住三个原则就够了??:

  1. 能躺着用内置函数就别站着写UDF
  2. 合并前先想清楚要不要去重
  3. 处理超长数组时(超过1万元素)一定要先分治

最后说句大实话,DataFrame的数组操作就像拼乐高——找准接口对齐卡扣,剩下的交给Spark自己发挥就行。下次遇到数组合并的问题,别慌,把这篇文章翻出来对照着搞,保准你事半功倍!

搜索