如何解决分析异常:Spark 中的已解析属性

2022-08-31 22:32:17
val rdd = sc.parallelize(Seq(("vskp", Array(2.0, 1.0, 2.1, 5.4)),("hyd",Array(1.5, 0.5, 0.9, 3.7)),("hyd", Array(1.5, 0.5, 0.9, 3.2)),("tvm", Array(8.0, 2.9, 9.1, 2.5))))
val df1= rdd.toDF("id", "vals")
val rdd1 = sc.parallelize(Seq(("vskp","ap"),("hyd","tel"),("bglr","kkt")))
val df2 = rdd1.toDF("id", "state")
val df3 = df1.join(df2,df1("id")===df2("id"),"left")

联接操作工作正常,但当我重用df2时,我遇到了未解决的属性错误

val rdd2 = sc.parallelize(Seq(("vskp", "Y"),("hyd", "N"),("hyd", "N"),("tvm", "Y")))
val df4 = rdd2.toDF("id","existance")
val df5 = df4.join(df2,df4("id")===df2("id"),"left")

ERROR: org.apache.spark.sql.AnalysisException: resolved attribute(s)id#426


答案 1

正如我在评论中提到的,它与 https://issues.apache.org/jira/browse/SPARK-10925 有关,更具体地说,与 https://issues.apache.org/jira/browse/SPARK-14948 有关。重用引用会在命名时产生歧义,因此您必须克隆df - 有关示例,请参阅 https://issues.apache.org/jira/browse/SPARK-14948 中的最后一个注释。


答案 2

如果 df1 和 df2 派生自 df1,请尝试重命名 df2 中的所有列,以便在联接后没有两个列具有相同的名称。所以在加入之前:

所以而不是df1.join(df2...

# Step 1 rename shared column names in df2.
df2_renamed = df2.withColumnRenamed('columna', 'column_a_renamed').withColumnRenamed('columnb', 'column_b_renamed')

# Step 2 do the join on the renamed df2 such that no two columns have same name.
df1.join(df2_renamed)

推荐