Hadoop 入门笔记 二十一 : MapReduce DB操作

一. 背景知识 通常组织会使用关系型数据来存储业务相关的数据,但随着数据的规模越来越大,尤其是像MySQL这种,在单表超过5千万条记录时,尽管对表使用了特定的存储引擎和索引优化,但依然不可避免的存在性能下降问题。
此时,我们**可以通过使用MapReduce从MySQL中定期迁移使用频率较低的历史数据到HDFS中,一方面可以降低对MySQL的存储和计算负载,另一方面,通过分布式计算引擎可以更加高效的处理过去的历史数据。
对于MapReduce框架来说,使用inputform进行数据读取操作,读取的数据首先由mapper处理,然后交给reducer处理,最终使用outputformat进行数据的输出操作。默认情况下,输入输出的组件实现都是针对文本数据处理的,分别是TextInputFormat、TextOutputFormat。
为了方便 MapReduce 直接访问关系型数据库(Mysql,Oracle),Hadoop提供了DBInputFormat和DBOutputFormat两个类。其中DBInputFormat负责从数据库中读取数据,而DBOutputFormat负责把数据最终写入数据库中。
二. 读取数据库操作 1. 需求
在mysql中itcast_shop数据库下创建表itheima_goods并加载数据到表中。要求使用MapReduce程序将表中的数据导出存放在指定的文件下。
数据库:
链接:https://pan.baidu.com/s/1ImrI...
提取码:pz9b
因为涉及到java操作mysql,因此需要在pom依赖中额外添加mysql-jdbc驱动。

mysql mysql-connector-java 5.1.32

2. DBInputFormat类
InputFormat类用于从SQL表读取数据。DBInputFormat底层一行一行读取表中的数据,返回键值对。其中k是LongWritable类型,表中数据的记录行号,从0开始,v是DBWritable类型,表示该行数据对应的对象类型。
此外还需要使用setInput方法设置SQL查询的语句相关信息。
Hadoop 入门笔记 二十一 : MapReduce DB操作
文章图片

3. 代码实现 1. 编写GoodsBean类 定义GoodsBean的实体类,用于封装查询返回的结果(如果要查询表的所有字段,那么属性就跟表的字段一一对应即可)。并且需要实现序列化机制Writable。
此外,从数据库读取/写入数据库的对象应实现DBWritable。 DBWritable与Writable相似,区别在于write(PreparedStatement)方法采用PreparedStatement,而readFields(ResultSet)采用ResultSet。
package com.uuicon.sentiment_upload.db; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.lib.db.DBWritable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; public class GoodsBean implements Writable, DBWritable { private Long goodsId; private String goodsSn; private String goodsName; private Double marketPrice; private Double shopPrice; private Long saleNum; @Override public String toString() { return goodsId + "\t" + goodsSn + '\t' + goodsName + "\t" + marketPrice + "\t" + shopPrice + "\t" + saleNum; }public Long getGoodsId() { return goodsId; }public void setGoodsId(Long goodsId) { this.goodsId = goodsId; }public String getGoodsSn() { return goodsSn; }public void setGoodsSn(String goodsSn) { this.goodsSn = goodsSn; }public String getGoodsName() { return goodsName; }public void setGoodsName(String goodsName) { this.goodsName = goodsName; }public Double getMarketPrice() { return marketPrice; }public void setMarketPrice(Double marketPrice) { this.marketPrice = marketPrice; }public Double getShopPrice() { return shopPrice; }public void setShopPrice(Double shopPrice) { this.shopPrice = shopPrice; }public Long getSaleNum() { return saleNum; }public void setSaleNum(Long saleNum) { this.saleNum = saleNum; }public GoodsBean() {}public GoodsBean(Long goodsId, String goodsSn, String goodsName, Double marketPrice, Double shopPrice, Long saleNum) { this.goodsId = goodsId; this.goodsSn = goodsSn; this.goodsName = goodsName; this.marketPrice = marketPrice; this.shopPrice = shopPrice; this.saleNum = saleNum; }public void set(Long goodsId, String goodsSn, String goodsName, Double marketPrice, Double shopPrice, Long saleNum) { this.goodsId = goodsId; this.goodsSn = goodsSn; this.goodsName = goodsName; this.marketPrice = marketPrice; this.shopPrice = shopPrice; this.saleNum = saleNum; }@Override public void write(DataOutput out) throws IOException { out.writeLong(goodsId); out.writeUTF(goodsSn); out.writeUTF(goodsName); out.writeDouble(marketPrice); out.writeDouble(shopPrice); out.writeLong(saleNum); }@Override public void readFields(DataInput in) throws IOException { this.goodsId = in.readLong(); this.goodsSn = in.readUTF(); this.goodsName = in.readUTF(); this.marketPrice = in.readDouble(); this.shopPrice = in.readDouble(); this.saleNum = in.readLong(); }@Override public void write(PreparedStatement ps) throws SQLException { ps.setLong(1, goodsId); ps.setString(2, goodsSn); ps.setString(3, goodsName); ps.setDouble(4, marketPrice); ps.setDouble(5, shopPrice); ps.setLong(6, saleNum); }@Override public void readFields(ResultSet resultSet) throws SQLException { this.goodsId = resultSet.getLong(1); this.goodsSn = resultSet.getString(2); this.goodsName = resultSet.getString(3); this.marketPrice = resultSet.getDouble(4); this.shopPrice = resultSet.getDouble(5); this.saleNum = resultSet.getLong(6); } }

2. 编写Mapper类
package com.uuicon.sentiment_upload.db; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class ReadDBMapper extends Mapper { Text outValue = https://www.it610.com/article/new Text(); @Override protected void map(LongWritable key, GoodsBean value, Context context) throws IOException, InterruptedException { outValue.set(value.toString()); context.write(key, outValue); } }

3. 创建程序驱动类
package com.uuicon.sentiment_upload.db; import com.uuicon.sentiment_upload.covidtopn.CovidTopNDriver; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapred.lib.db.DBInputFormat; import org.apache.hadoop.mapreduce.Job; import java.io.File; import java.io.IOException; public class ReadDBDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); //数据库信息 DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/itcast_goods", "root", "root" ); // 创建作业类 Job job = Job.getInstance(conf, ReadDBDriver.class.getSimpleName()); //设置mr 驱动类 job.setJarByClass(ReadDBDriver.class); //设置mapper 类 job.setMapperClass(ReadDBMapper.class); job.setOutputKeyClass(LongWritable.class); job.setOutputValueClass(Text.class); // todo 设置输入组件 FileOutputFormat.setOutputPath(job, new Path("E:\\ml\\hadoop\\mysqlout")); //设置Reducer 类 ,todo 本例不需要reduce ,操作方式是将tasknumber 设置为 0 job.setNumReduceTasks(0); // todo 设置输入组件 job.setInputFormatClass(DBInputFormat.class); //添加读取数据库相关参数DBInputFormat.setInput( job, GoodsBean.class, "select goodsId ,goodsSn,goodsName,marketPrice ,shopPrice , saleNum from itheima_goods", "select count(goodsId) from itheima_goods" ); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }

4. 运行程序 直接在驱动类中右键运行main方法,使用MapReduce的本地模式执行。也可以将程序使用maven插件打包成jar包,提交到yarn上进行分布式运行。
Hadoop 入门笔记 二十一 : MapReduce DB操作
文章图片

【Hadoop 入门笔记 二十一 : MapReduce DB操作】Hadoop 入门笔记 二十一 : MapReduce DB操作
文章图片

3. 输出到数据库操作
1. 需求 有一份结构化的数据文件,数据内容对应着mysql中一张表的内容,要求使用MapReduce程序将文件的内容读取写入到mysql中。
就以上例的输出文件作为结构化文件,下面在mysql中创建对应的表结构。
表结构:
CREATE TABLE `itheima_goods_mr_write` ( `goodsId` bigint(11) NOT NULL AUTO_INCREMENT COMMENT '商品id', `goodsSn` varchar(20) NOT NULL COMMENT '商品编号', `goodsName` varchar(200) NOT NULL COMMENT '商品名称', `marketPrice` decimal(11,2) NOT NULL DEFAULT '0.00' COMMENT '市场价', `shopPrice` decimal(11,2) NOT NULL DEFAULT '0.00' COMMENT '门店价', `saleNum` int(11) NOT NULL DEFAULT '0' COMMENT '总销售量', PRIMARY KEY (`goodsId`) ) ENGINE=InnoDB AUTO_INCREMENT=115909 DEFAULT CHARSET=utf8;

2. DBOutputFormat类 OutputFormat,它将reduce输出发送到SQL表。DBOutputFormat接受键值对,其中key必须具有扩展DBWritable的类型。
此外还需要使用setOutput方法设置SQL插入语句相关信息,比如表、字段等。
3. 代码实现 1. 编写GoodsBean类 定义GoodsBean的实体类,用于封装插入表中的数据(对象属性跟表的字段一一对应即可)。并且需要实现序列化机制Writable。
此外,从数据库读取/写入数据库的对象应实现DBWritable。 DBWritable与Writable相似,区别在于write(PreparedStatement)方法采用PreparedStatement,而readFields(ResultSet)采用ResultSet。
package com.uuicon.sentiment_upload.db; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.lib.db.DBWritable; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; public class GoodsBean implements Writable, DBWritable { private Long goodsId; private String goodsSn; private String goodsName; private Double marketPrice; private Double shopPrice; private Long saleNum; @Override public String toString() { return goodsId + "\t" + goodsSn + '\t' + goodsName + "\t" + marketPrice + "\t" + shopPrice + "\t" + saleNum; }public Long getGoodsId() { return goodsId; }public void setGoodsId(Long goodsId) { this.goodsId = goodsId; }public String getGoodsSn() { return goodsSn; }public void setGoodsSn(String goodsSn) { this.goodsSn = goodsSn; }public String getGoodsName() { return goodsName; }public void setGoodsName(String goodsName) { this.goodsName = goodsName; }public Double getMarketPrice() { return marketPrice; }public void setMarketPrice(Double marketPrice) { this.marketPrice = marketPrice; }public Double getShopPrice() { return shopPrice; }public void setShopPrice(Double shopPrice) { this.shopPrice = shopPrice; }public Long getSaleNum() { return saleNum; }public void setSaleNum(Long saleNum) { this.saleNum = saleNum; }public GoodsBean() {}public GoodsBean(Long goodsId, String goodsSn, String goodsName, Double marketPrice, Double shopPrice, Long saleNum) { this.goodsId = goodsId; this.goodsSn = goodsSn; this.goodsName = goodsName; this.marketPrice = marketPrice; this.shopPrice = shopPrice; this.saleNum = saleNum; }public void set(Long goodsId, String goodsSn, String goodsName, Double marketPrice, Double shopPrice, Long saleNum) { this.goodsId = goodsId; this.goodsSn = goodsSn; this.goodsName = goodsName; this.marketPrice = marketPrice; this.shopPrice = shopPrice; this.saleNum = saleNum; }@Override public void write(DataOutput out) throws IOException { out.writeLong(goodsId); out.writeUTF(goodsSn); out.writeUTF(goodsName); out.writeDouble(marketPrice); out.writeDouble(shopPrice); out.writeLong(saleNum); }@Override public void readFields(DataInput in) throws IOException { this.goodsId = in.readLong(); this.goodsSn = in.readUTF(); this.goodsName = in.readUTF(); this.marketPrice = in.readDouble(); this.shopPrice = in.readDouble(); this.saleNum = in.readLong(); }@Override public void write(PreparedStatement ps) throws SQLException { ps.setLong(1, goodsId); ps.setString(2, goodsSn); ps.setString(3, goodsName); ps.setDouble(4, marketPrice); ps.setDouble(5, shopPrice); ps.setLong(6, saleNum); }@Override public void readFields(ResultSet resultSet) throws SQLException { this.goodsId = resultSet.getLong(1); this.goodsSn = resultSet.getString(2); this.goodsName = resultSet.getString(3); this.marketPrice = resultSet.getDouble(4); this.shopPrice = resultSet.getDouble(5); this.saleNum = resultSet.getLong(6); } }

2. Mapper 类
package com.uuicon.sentiment_upload.db; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Counter; import org.apache.hadoop.mapreduce.Mapper; import java.io.IOException; public class WriteDBMapper extends Mapper { NullWritable outKey = NullWritable.get(); GoodsBean outValue = https://www.it610.com/article/new GoodsBean(); @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { Counter sc = context.getCounter("mr_to_sql", "SUCCESS"); Counter fc = context.getCounter("mr_to_sql", "FAILED"); String[] fields = value.toString().split("\\s+"); if (fields.length > 6) { // 正常数据 outValue.set( Long.parseLong(fields[1]), fields[2], fields[3], Double.parseDouble(fields[4]), Double.parseDouble(fields[5]), Long.parseLong(fields[6]) ); context.write(outKey, outValue); sc.increment(1); } else { // 异常数据 fc.increment(1); } } }

3. reudce 类
package com.uuicon.sentiment_upload.db; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Reducer; import java.io.IOException; /** * todo 在使用DBOutputFormat 的时候,要求输出的key 必须实现DBWritable 因为只会把key写入数据库 */ public class WriteDBReducer extends Reducer { @Override protected void reduce(NullWritable key, Iterable values, Context context) throws IOException, InterruptedException { for (GoodsBean value : values) { context.write(value, key); } } }

4. 驱动类
package com.uuicon.sentiment_upload.db; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.lib.db.DBConfiguration; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; public class WriteDBDriver { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); //数据库信息 DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://localhost:3306/itcast_goods?useUnicode=true&characterEncoding=utf8", "root", "root" ); // 创建作业类 Job job = Job.getInstance(conf, WriteDBDriver.class.getSimpleName()); //设置mr 驱动类 job.setJarByClass(WriteDBDriver.class); //设置mapper 类 job.setMapperClass(WriteDBMapper.class); job.setMapOutputKeyClass(NullWritable.class); job.setMapOutputValueClass(GoodsBean.class); //设置 Reduce 相关 job.setReducerClass(WriteDBReducer.class); job.setOutputKeyClass(GoodsBean.class); job.setOutputValueClass(NullWritable.class); // 设置当前作业的文件路径 FileInputFormat.setInputPaths(job, new Path("E:\\ml\\hadoop\\mysqlout")); // todo 设置程序输出类 job.setOutputFormatClass(DBOutputFormat.class); // 配置当前作业,写入数据库表 itheima_goods_mr_write DBOutputFormat.setOutput( job, "itheima_goods_mr_write", "goodsId","goodsSn","goodsName","marketPrice","shopPrice","saleNum" ); boolean b = job.waitForCompletion(true); System.exit(b ? 0 : 1); } }

5. 运行结果 Hadoop 入门笔记 二十一 : MapReduce DB操作
文章图片

    推荐阅读