hadoop中如何实现DBInputFormat

这篇文章主要介绍了hadoop中如何实现DBInputFormat,具有一定借鉴价值,感兴趣的朋友可以参考下,希望大家阅读完这篇文章之后大有收获,下面让小编带着大家一起了解一下。

代码未做测试,先做记录

package com.test;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.net.URI;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
import org.apache.hadoop.mapreduce.lib.db.DBWritable;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
 * 要运行本示例
 * 1、把mysql的jdbc驱动放到taskTracker的lib目录下,重启集群
 *
 */
public class WordCountDB extends Configured implements Tool {
 
 private String OUT_PATH = "hdfs://grid131:9000/output";
 
 public static class Map extends Mapper<LongWritable, MyUser, LongWritable, Text> {
  public void map(LongWritable key, MyUser value, Context context) throws IOException, InterruptedException {
   context.write(key, new Text(value.toString()));
  }
 }
 
 public int run(String[] args) throws Exception {
  Configuration conf = this.getConf();
  DBConfiguration.configureDB(conf, "com.mysql.jdbc.Driver", "jdbc:mysql://grid131:3306/test", "root", "admin");
  
  //输出路径如果存在,则删除
  FileSystem fs = FileSystem.get(new URI(OUT_PATH), conf);
  fs.delete(new Path(OUT_PATH), true);
  
  Job job = new Job(conf, WordCountDB.class.getSimpleName());
  job.setJarByClass(WordCountDB.class);
  
  FileOutputFormat.setOutputPath(job, new Path(args[1]));
  
  //指定不需要reduce,直接把map输出写入到hdfs中
  job.setNumReduceTasks(0);
  job.setInputFormatClass(DBInputFormat.class);
  
  //指定表、字段
  //DBInputFormat.setInput(job, inputClass, tableName, conditions, orderBy, fieldNames)
  DBInputFormat.setInput(job, MyUser.class, "myuser", null, null, "id", "name");
  job.setMapperClass(Map.class);
  
  //当reduce输出类型与map输出类型一致时,map的输出类型可以不设置
  job.setMapOutputKeyClass(LongWritable.class);
  job.setMapOutputValueClass(Text.class);
  
  job.waitForCompletion(true);
  
  return job.isSuccessful()?0:1;
 }
 
 public static void main(String[] args) throws Exception {
  int exit = ToolRunner.run(new WordCount(), args);
  System.exit(exit);
 }
}
class MyUser implements Writable, DBWritable {
 private Long id;
 private String name;
 
 public Long getId() {
  return id;
 }
 public void setId(Long id) {
  this.id = id;
 }
 public String getName() {
  return name;
 }
 public void setName(String name) {
  this.name = name;
 }
 
 @Override
 public void write(DataOutput out) throws IOException {
  out.writeLong(this.id);
  Text.writeString(out, this.name);
 }
 
 @Override
 public void readFields(DataInput in) throws IOException {
  this.id = in.readLong();
  this.name = Text.readString(in);
 }
 
 @Override
 public void write(PreparedStatement statement) throws SQLException {
  statement.setLong(1, this.id);
  statement.setString(2, this.name);
 }
 
 @Override
 public void readFields(ResultSet resultSet) throws SQLException {
  this.id = resultSet.getLong(1);
  this.name = resultSet.getString(2);
 }
 
 @Override
 public String toString() {
  return this.id + "\\t" + this.name;
 }
}

感谢你能够认真阅读完这篇文章,希望小编分享的“hadoop中如何实现DBInputFormat”这篇文章对大家有帮助,同时也希望大家多多支持云搜网,关注云搜网行业资讯频道,更多相关知识等着你来学习!


【AD】美国洛杉矶/香港/日本VPS推荐,回程电信CN2 GIA线路,延迟低、稳定性高、免费备份_搬瓦工

【AD】炭云:36元/年/1GB内存/20GB SSD空间/500GB流量/5Gbps端口/KVM/香港/国际线路LUMEN