spark已有集群集成使用tispark操作tidb案例
spark和tidb的集群方式有两种,一种是完全分离,一种是spark节点和tikv节点对应安装在一起,在官方看来这会加快数据加载时间(但效果并不那么大的),编码其实都是一模一样。
1.添加tispark的依赖
<dependency>
<groupId>com.pingcap.tispark</groupId>
<artifactId>tispark-assembly</artifactId>
<version>2.3.10</version>
<exclusions>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive-thriftserver_2.11</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.spark</groupId>
<artifactId>spark-unsafe_2.11</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
</exclusion>
<exclusion>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_2.11</artifactId>
</exclusion>
<exclusion>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</exclusion>
</exclusions>
</dependency>
2.sparksession配置tipd
SparkSession.builder()
.config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
.config("spark.tispark.pd.addresses", baseConf.tidbPdAddr)
当然也可以配置在spark-default文件里面,配置之后读取tidb的表和读取hive的表就一模一样了。
3.读取tidb表
val df=spark.table("tidb.tableName")
注意设计的时候tidb和hive的数据库前缀不一样,避免搞错
4.spark dataframe写入tidb
tispark写入tikv方式
//配置
def getTikvWriteOptions()={
Map(
"tidb.addr" -> baseconf.tidbDbAddrs.split(",")(0).split(":")(0),
"tidb.password" -> baseconf.tidbPassword,
"tidb.port" -> baseconf.tidbDbAddrs.split(",")(0).split(":")(1),
"tidb.user" -> baseconf.tidbUser,
"spark.tispark.pd.addresses" -> baseconf.tidbPdAddr
)
}
//写入tidb
rdd.write.format("tidb").options(getTikvWriteOptions()).option("database", dbName).option("table", table).mode(SaveMode.Append).save()
jdbc写入方式
rdd
//.na.replace("*",ImmutableMap.of(Double.NaN, 0d)) //会导致特殊字符写入失败和数据具体需求,不作为公用方法
.write.mode(saveMode = "append")
.format("jdbc")
.option("driver", "com.mysql.jdbc.Driver")
// replace the host and port with yours and be sure to use rewrite batch
.option("url", "jdbc:mysql://"+baseconf.tidbDbAddrs+"/"+dbname+"?rewriteBatchedStatements=true")
.option("useSSL", "false")
// as tested, setting to `150` is a good practice
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 150)
.option("dbtable", table) // database name and table name here
.option("isolationLevel", "NONE") // set isolationLevel to NONE
.option("user", baseconf.tidbUser) // TiDB user here
.option("password", baseconf.tidbPassword) // TiDB user here
.option("nanValue",0)
.save()