让我惊奇的是spark不仅弹性分布式的架构做的好,开发部署测试也很方便,就连输入输出计算的API也是完整丰富。
这里就记录一下java 开发spark程序读取写入操作mysql数据库的项目代码。
1.spark java项目搭建准备
1.1.创建一个mysql数据库 user_test表结构如下:
create table user ( id int(11) default null comment "id", name varchar(64) default null comment "用户名", password varchar(64) default null comment "密码", age int(11) default null comment "年龄" )engine=InnoDB default charset=utf-8; insert into user_test values(12, 'cassie', '123456', 25); insert into user_test values(11, 'zhangs', '1234562', 26); insert into user_test values(23, 'zhangs', '2321312', 27); insert into user_test values(22, 'tom', 'asdfg', 28);
1.2 创建maven java项目,添加依赖
我用的idea,直接file-new Module-选择maven-点击ok即可
这里不多解释直接上pom代码:
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>SparkSQL</groupId> <artifactId>com.sparksql.test</artifactId> <version>1.0-SNAPSHOT</version> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.24</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>2.6.0</version> </dependency> <dependency> <groupId>net.sf.json-lib</groupId> <artifactId>json-lib</artifactId> <version>2.4</version> <classifier>jdk15</classifier> </dependency> </dependencies> </project>
2. spark java读取mysql代码
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.SQLContext; import java.util.Properties; /** * Created by Administrator on 2017/11/6. */ public class SparkMysql { public static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(SparkMysql.class); public static void main(String[] args) { JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkMysql").setMaster("local[5]")); SQLContext sqlContext = new SQLContext(sparkContext); //读取mysql数据 readMySQL(sqlContext); //停止SparkContext sparkContext.stop(); } private static void readMySQL(SQLContext sqlContext){ //jdbc.url=jdbc:mysql://localhost:3306/database String url = "jdbc:mysql://localhost:3306/test"; //查找的表名 String table = "user_test"; //增加数据库的用户名(user)密码(password),指定test数据库的驱动(driver) Properties connectionProperties = new Properties(); connectionProperties.put("user","root"); connectionProperties.put("password","123456"); connectionProperties.put("driver","com.mysql.jdbc.Driver"); //SparkJdbc读取Postgresql的products表内容 System.out.println("读取test数据库中的user_test表内容"); // 读取表中所有数据 DataFrame jdbcDF = sqlContext.read().jdbc(url,table,connectionProperties).select("*"); //显示数据 jdbcDF.show(); } }
运行结果
3. spark java写入mysql代码
import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.sql.DataFrame; import org.apache.spark.sql.Row; import org.apache.spark.sql.RowFactory; import org.apache.spark.sql.SQLContext; import org.apache.spark.sql.types.DataTypes; import org.apache.spark.sql.types.StructType; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; /** * Created by Administrator on 2017/11/6. */ public class SparkMysql { public static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(SparkMysql.class); public static void main(String[] args) { JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkMysql").setMaster("local[5]")); SQLContext sqlContext = new SQLContext(sparkContext); //写入的数据内容 JavaRDD<String> personData = sparkContext.parallelize(Arrays.asList("1 tom 5","2 jack 6","3 alex 7")); //数据库内容 String url = "jdbc:mysql://localhost:3306/test"; Properties connectionProperties = new Properties(); connectionProperties.put("user","root"); connectionProperties.put("password","123456"); connectionProperties.put("driver","com.mysql.jdbc.Driver"); /** * 第一步:在RDD的基础上创建类型为Row的RDD */ //将RDD变成以Row为类型的RDD。Row可以简单理解为Table的一行数据 JavaRDD<Row> personsRDD = personData.map(new Function<String,Row>(){ public Row call(String line) throws Exception { String[] splited = line.split(" "); return RowFactory.create(Integer.valueOf(splited[0]),splited[1],Integer.valueOf(splited[2])); } }); /** * 第二步:动态构造DataFrame的元数据。 */ List structFields = new ArrayList(); structFields.add(DataTypes.createStructField("id",DataTypes.IntegerType,true)); structFields.add(DataTypes.createStructField("name",DataTypes.StringType,true)); structFields.add(DataTypes.createStructField("age",DataTypes.IntegerType,true)); //构建StructType,用于最后DataFrame元数据的描述 StructType structType = DataTypes.createStructType(structFields); /** * 第三步:基于已有的元数据以及RDD<Row>来构造DataFrame */ DataFrame personsDF = sqlContext.createDataFrame(personsRDD,structType); /** * 第四步:将数据写入到person表中 */ personsDF.write().mode("append").jdbc(url,"person",connectionProperties); //停止SparkContext sparkContext.stop(); } }
运行结果