已有spark集群集成使用tispark案例

spark | 2021-04-02 16:45:14

spark已有集群集成使用tispark操作tidb案例

spark和tidb的集群方式有两种,一种是完全分离,一种是spark节点和tikv节点对应安装在一起,在官方看来这会加快数据加载时间(但效果并不那么大的),编码其实都是一模一样。

1.添加tispark的依赖

<dependency>
      <groupId>com.pingcap.tispark</groupId>
      <artifactId>tispark-assembly</artifactId>
      <version>2.3.10</version>
      <exclusions>
        <exclusion>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-hive_2.11</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-hive-thriftserver_2.11</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-unsafe_2.11</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-client</artifactId>
        </exclusion>
        <exclusion>
          <groupId>org.scalatest</groupId>
          <artifactId>scalatest_2.11</artifactId>
        </exclusion>
        <exclusion>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
        </exclusion>
      </exclusions>
    </dependency>

2.sparksession配置tipd

SparkSession.builder()
.config("spark.sql.extensions", "org.apache.spark.sql.TiExtensions")
.config("spark.tispark.pd.addresses", baseConf.tidbPdAddr)

当然也可以配置在spark-default文件里面,配置之后读取tidb的表和读取hive的表就一模一样了。

3.读取tidb表

val df=spark.table("tidb.tableName")

注意设计的时候tidb和hive的数据库前缀不一样,避免搞错

4.spark dataframe写入tidb

tispark写入tikv方式

  //配置
  def getTikvWriteOptions()={
    Map(
      "tidb.addr" -> baseconf.tidbDbAddrs.split(",")(0).split(":")(0),
      "tidb.password" -> baseconf.tidbPassword,
      "tidb.port" -> baseconf.tidbDbAddrs.split(",")(0).split(":")(1),
      "tidb.user" -> baseconf.tidbUser,
      "spark.tispark.pd.addresses" -> baseconf.tidbPdAddr
    )
  }
  //写入tidb
  rdd.write.format("tidb").options(getTikvWriteOptions()).option("database", dbName).option("table", table).mode(SaveMode.Append).save()

jdbc写入方式

      rdd
      //.na.replace("*",ImmutableMap.of(Double.NaN, 0d)) //会导致特殊字符写入失败和数据具体需求,不作为公用方法
      .write.mode(saveMode = "append")
      .format("jdbc")
      .option("driver", "com.mysql.jdbc.Driver")
      // replace the host and port with yours and be sure to use rewrite batch
      .option("url", "jdbc:mysql://"+baseconf.tidbDbAddrs+"/"+dbname+"?rewriteBatchedStatements=true")
      .option("useSSL", "false")
      // as tested, setting to `150` is a good practice
      .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE, 150)
      .option("dbtable", table) // database name and table name here
      .option("isolationLevel", "NONE") // set isolationLevel to NONE
      .option("user", baseconf.tidbUser) // TiDB user here
      .option("password", baseconf.tidbPassword) // TiDB user here
      .option("nanValue",0)
      .save()

 

登录后即可回复 登录 | 注册
    
关注编程学问公众号