Spark ML 索引器无法解析带有点的数据帧列名?
2022-09-03 08:12:41
我有一个数据帧,其中包含一个名为a.b的列。当我指定a.b作为StringIndexer的输入列名称时,AnalysisException带有消息“无法解析给定输入列a.b'的'a.b'”。我使用的是 Spark 1.6.0。
我知道旧版本的Spark可能存在列名中的点问题,但在较新版本中,反引号可以围绕Spark shell中的列名和SQL查询使用。例如,这是另一个问题的解决方案,如何在Spark SQL中使用连字符转义列名。其中一些问题被报告为SPARK-6898,列名称中的特殊字符已损坏,但在1.4.0中已得到解决。
下面是一个最小示例和堆栈跟踪:
public class SparkMLDotColumn {
public static void main(String[] args) {
// Get the contexts
SparkConf conf = new SparkConf()
.setMaster("local[*]")
.setAppName("test")
.set("spark.ui.enabled", "false"); // http://permalink.gmane.org/gmane.comp.lang.scala.spark.user/21385
JavaSparkContext sparkContext = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sparkContext);
// Create a schema with a single string column named "a.b"
StructType schema = new StructType(new StructField[] {
DataTypes.createStructField("a.b", DataTypes.StringType, false)
});
// Create an empty RDD and DataFrame
JavaRDD<Row> rdd = sparkContext.parallelize(Collections.emptyList());
DataFrame df = sqlContext.createDataFrame(rdd, schema);
StringIndexer indexer = new StringIndexer()
.setInputCol("a.b")
.setOutputCol("a.b_index");
df = indexer.fit(df).transform(df);
}
}
现在,值得尝试使用反引号列名的相同示例,因为我们得到了一些奇怪的结果。下面是一个具有相同架构的示例,但这次我们在帧中有数据。在尝试任何索引之前,我们会将命名的列复制到名为 的列。这需要使用反引号,并且它可以毫无问题地工作。然后,我们将尝试为该列编制索引,该列可以正常工作。然后,当我们尝试使用反引号为列编制索引时,会发生一些非常奇怪的事情。我们没有收到任何错误,但也没有得到任何结果:a.b
a_b
a_b
a.b
public class SparkMLDotColumn {
public static void main(String[] args) {
// Get the contexts
SparkConf conf = new SparkConf()
.setMaster("local[*]")
.setAppName("test")
.set("spark.ui.enabled", "false");
JavaSparkContext sparkContext = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sparkContext);
// Create a schema with a single string column named "a.b"
StructType schema = new StructType(new StructField[] {
DataTypes.createStructField("a.b", DataTypes.StringType, false)
});
// Create an empty RDD and DataFrame
List<Row> rows = Arrays.asList(RowFactory.create("foo"), RowFactory.create("bar"));
JavaRDD<Row> rdd = sparkContext.parallelize(rows);
DataFrame df = sqlContext.createDataFrame(rdd, schema);
df = df.withColumn("a_b", df.col("`a.b`"));
StringIndexer indexer0 = new StringIndexer();
indexer0.setInputCol("a_b");
indexer0.setOutputCol("a_bIndex");
df = indexer0.fit(df).transform(df);
StringIndexer indexer1 = new StringIndexer();
indexer1.setInputCol("`a.b`");
indexer1.setOutputCol("abIndex");
df = indexer1.fit(df).transform(df);
df.show();
}
}
+---+---+--------+
|a.b|a_b|a_bIndex| // where's the abIndex column?
+---+---+--------+
|foo|foo| 0.0|
|bar|bar| 1.0|
+---+---+--------+
第一个示例中的堆栈跟踪
Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'a.b' given input columns a.b;
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:60)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:57)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:319)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:53)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:318)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:316)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:316)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:305)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:316)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:316)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:316)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:265)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:305)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:316)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:107)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:117)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:121)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
at scala.collection.AbstractTraversable.map(Traversable.scala:105)
at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:121)
at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:125)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1157)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:48)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:103)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:47)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:273)
at scala.collection.AbstractIterator.to(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:265)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:252)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1157)
at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:125)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:57)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:50)
at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:105)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:50)
at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:44)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:34)
at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:133)
at org.apache.spark.sql.DataFrame.org$apache$spark$sql$DataFrame$$withPlan(DataFrame.scala:2165)
at org.apache.spark.sql.DataFrame.select(DataFrame.scala:751)
at org.apache.spark.ml.feature.StringIndexer.fit(StringIndexer.scala:84)
at SparkMLDotColumn.main(SparkMLDotColumn.java:38)