1.问题
spark 使用dataframe join的时候出现下面异常
User class threw exception: org.apache.spark.sql.AnalysisException: Resolved attribute(s) subjectId#518 missing from subjectiveList
大概意思是无法找到这个字段,但我把join的两个表都show出来,发现都有这个字段
2.原因
首先我的两个dataframe也是经过 join等算法算出来的,而且并不是垂直依赖,也就是说这个两个join依赖关系还稍稍优点复杂,我开始也怀疑这点
经过搜索找到是一个spark的bug:https://issues.apache.org/jira/browse/SPARK-14948
文章也给出了会出现问题的场景
StructField[] fields = new StructField[2];
fields[0] = new StructField("F1", DataTypes.StringType, true, Metadata.empty());
fields[1] = new StructField("F2", DataTypes.StringType, true, Metadata.empty());
JavaRDD<Row> rdd =
sparkClient.getJavaSparkContext().parallelize(Arrays.asList(RowFactory.create("a", "b")));
DataFrame df = sparkClient.getSparkHiveContext().createDataFrame(rdd, new StructType(fields));
sparkClient.getSparkHiveContext().registerDataFrameAsTable(df, "t1");
DataFrame aliasedDf = sparkClient.getSparkHiveContext().sql("select F1 as asd, F2 from t1");
sparkClient.getSparkHiveContext().registerDataFrameAsTable(aliasedDf, "t2");
sparkClient.getSparkHiveContext().registerDataFrameAsTable(df, "t3");
DataFrame join = aliasedDf.join(df, aliasedDf.col("F2").equalTo(df.col("F2")), "inner");
DataFrame select = join.select(aliasedDf.col("asd"), df.col("F1"));
select.collect();
这和我的逻辑很像,难怪我也出现这个异常。
3.解决方法
首先官方建议:
- This issue is related to the Data Type of Fields of the initial Data Frame.(If the Data Type is not String, it will work.)【此问题与初始数据字段的数据类型有关(如果数据类型不是字符串,则是正常的)】
- It works fine if the data frame is registered as a temporary table and an sql (select a.asd,b.F1 from t2 a inner join t3 b on a.F2=b.F2) is written.【注册临时表后使用sql也是可以正常玩(select a.asd,b.F1 from t2 a inner join t3 b on a.F2=b.F2)】
我的方法:
重命名要被join的字段绕过这个bug
df1=df1.select($"relationId".as("relationId2"),$"otherField")
df3 = df2.join(
df1,
df2("relationId")===df1("relationId2"),"left"
)