DBInputFormat za prenos podatkov iz SQL v bazo podatkov NoSQL



Cilj tega spletnega dnevnika je izvedeti, kako prenesti podatke iz baz podatkov SQL na HDFS, kako prenesti podatke iz baz podatkov SQL v baze podatkov NoSQL.

V tem blogu bomo raziskali zmogljivosti in možnosti ene najpomembnejših komponent tehnologije Hadoop, tj. MapReduce.

Danes podjetja sprejemajo ogrodje Hadoop kot prvo izbiro za shranjevanje podatkov zaradi njegovih zmožnosti učinkovitega ravnanja z velikimi podatki. Vemo pa tudi, da so podatki vsestranski in obstajajo v različnih strukturah in oblikah. Za nadzor nad tako raznovrstnimi podatki in njihovimi različnimi formati bi moral obstajati mehanizem, ki bi ustrezal vsem sortam, a hkrati ustvaril učinkovit in dosleden rezultat.





Najmočnejša komponenta v okolju Hadoop je MapReduce, ki lahko zagotavlja nadzor nad podatki in njihovo strukturo boljši od ostalih. Čeprav zahteva dodatne krivulje učenja in zapletenost programiranja, če lahko obvladate te zapletenosti, lahko s Hadoop-om zagotovo obdelujete kakršne koli podatke.

MapReduce ogrodje razdeli vse svoje naloge obdelave v bistvu v dve fazi: Map in Reduce.



Priprava surovih podatkov za te faze zahteva razumevanje nekaterih osnovnih razredov in vmesnikov. Super razred za to predelavo je InputFormat.

The InputFormat class je eden temeljnih razredov v API-ju Hadoop MapReduce. Ta razred je odgovoren za opredelitev dveh glavnih stvari:

  • Podatki se delijo
  • Bralnik zapisov

Razdelitev podatkov je temeljni koncept ogrodja Hadoop MapReduce, ki opredeljuje velikost posameznih opravil zemljevida in njegov potencialni strežnik za izvajanje. The Bralnik zapisov je odgovoren za dejansko branje zapisov iz vhodne datoteke in njihovo oddajo (kot pari ključ / vrednost) preslikavcu.



Število razporejevalnikov se določi glede na število razcepov. Naloga InputFormat je ustvariti delitve. Večina velikosti razdeljenega časa je enaka velikosti bloka, vendar ni vedno mogoče, da se razdelijo na podlagi velikosti bloka HDFS. Popolnoma je odvisno od tega, kako je bila metoda getSplits () vašega InputFormat razveljavljena.

Obstaja temeljna razlika med MR split in HDFS blokom. Blok je fizični del podatkov, delitev pa le logični del, ki ga mapper prebere. Delitev ne vsebuje vhodnih podatkov, vsebuje le sklic ali naslov podatkov. Delitev ima v bistvu dve stvari: dolžino v bajtih in niz shranjevalnih mest, ki so samo nizi.

Da bi to bolje razumeli, vzemimo en primer: obdelava podatkov, shranjenih v MySQL, z MR. Ker v tem primeru ni pojma blokov, teorija: 'delitve se vedno ustvarijo na podlagi bloka HDFS',ne uspe. Ena od možnosti je ustvariti delitve na podlagi obsegov vrstic v tabeli MySQL (in to počne DBInputFormat, vhodna oblika za branje podatkov iz relacijskih baz podatkov). Morda imamo k število razdelilcev, sestavljenih iz n vrstic.

operater ločljivosti obsega c ++

Delitve se ustvarijo le za vhodne formate, ki temeljijo na FileInputFormat (InputFormat za obdelavo podatkov, shranjenih v datotekah), glede na skupno velikost vhodnih datotek v bajtih. Vendar se velikost bloka vhodnih datotek FileSystem obravnava kot zgornja meja za vhodne delitve. Če imate datoteko, manjšo od velikosti bloka HDFS, boste za to datoteko dobili samo 1 preslikav. Če želite imeti drugačno vedenje, lahko uporabite mapred.min.split.size. Toda to je spet odvisno izključno od getSplits () vašega InputFormat.

V paketu org.apache.hadoop.mapreduce.lib.input je na voljo toliko obstoječih vhodnih formatov.

CombineFileInputFormat.html

CombineFileRecordReader.html

CombineFileRecordReaderWrapper.html

CombineFileSplit.html

CombineSequenceFileInputFormat.html

CombineTextInputFormat.html

FileInputFormat.html

FileInputFormatCounter.html

FileSplit.html

FixedLengthInputFormat.html

InvalidInputException.html

KeyValueLineRecordReader.html

KeyValueTextInputFormat.html

MultipleInputs.html

NLineInputFormat.html

SequenceFileAsBinaryInputFormat.html

SequenceFileAsTextInputFormat.html

SequenceFileAsTextRecordReader.html

SequenceFileInputFilter.html

SequenceFileInputFormat.html

SequenceFileRecordReader.html

TextInputFormat.html

Privzeto je TextInputFormat.

Podobno imamo toliko izhodnih formatov, ki bere podatke z reduktorjev in jih shranjuje v HDFS:

FileOutputCommitter.html

FileOutputFormat.html

FileOutputFormatCounter.html

FilterOutputFormat.html

LazyOutputFormat.html

MapFileOutputFormat.html

MultipleOutputs.html

NullOutputFormat.html

PartialFileOutputCommitter.html

PartialOutputCommitter.html

SequenceFileAsBinaryOutputFormat.html

SequenceFileOutputFormat.html

TextOutputFormat.html

Privzeto je TextOutputFormat.

Ko bi prebrali ta spletni dnevnik, bi se naučili:

  • Kako napisati program za zmanjšanje zemljevidov
  • O različnih vrstah vhodnih formatov, ki so na voljo v Mapreduce
  • Kaj potrebujejo InputFormats
  • Kako napisati InputFormats po meri
  • Kako prenesti podatke iz zbirk podatkov SQL v HDFS
  • Kako prenesti podatke iz zbirk podatkov SQL (tukaj MySQL) v zbirke podatkov NoSQL (tukaj Hbase)
  • Kako prenesti podatke iz ene zbirke podatkov SQL v drugo tabelo v zbirkah podatkov SQL (morda to ni tako pomembno, če to počnemo v isti bazi podatkov SQL. Vendar ni nič narobe, če poznamo isto. Nikoli ne veš kako lahko začne uporabljati)

Predpogoj:

  • Hadoop vnaprej nameščen
  • Prednameščen SQL
  • Prednameščen Hbase
  • Osnovno razumevanje Java
  • MapZmanjšajte znanje
  • Osnovno znanje Hadoop framework

Razumejmo izjavo o težavi, ki jo bomo rešili tukaj:

V relacijski bazi podatkov Edureka imamo tabelo zaposlenih v MySQL DB. Zdaj moramo v skladu s poslovnimi zahtevami vse podatke, ki so na voljo v relacijskih DB, preusmeriti v datotečni sistem Hadoop, tj. HDFS, NoSQL DB, znan kot Hbase.

Za to nalogo imamo veliko možnosti:

  • Sqoop
  • Flume
  • MapReduce

Zdaj ne želite namestiti in konfigurirati nobenega drugega orodja za to operacijo. Na voljo imate samo eno možnost, in sicer Hadoopov okvir obdelave MapReduce. Okvir MapReduce vam daje popoln nadzor nad podatki med prenosom. S stolpci lahko manipulirate in jih postavite neposredno na katero koli od dveh ciljnih lokacij.

Opomba:

  • Za nalaganje tabel iz tabele MySQL moramo prenesti in postaviti konektor MySQL v razred učilnice Hadoop. Če želite to narediti, prenesite konektor com.mysql.jdbc_5.1.5.jar in ga shranite v imenik Hadoop_home / share / Hadoop / MaPreduce / lib.
cp Downloads / com.mysql.jdbc_5.1.5.jar $ HADOOP_HOME / share / hadoop / mapreduce / lib /
  • Prav tako postavite vse kozarce Hbase pod Hadoop classpath, da bo vaš MR program dostopal do Hbase. Če želite to narediti, izvedite naslednji ukaz :
cp $ HBASE_HOME / lib / * $ HADOOP_HOME / share / hadoop / mapreduce / lib /

Različice programske opreme, ki sem jih uporabil pri izvajanju te naloge, so:

  • Hadooop-2.3.0
  • HBaza 0,98,9-Hadoop2
  • Eclipse Moon

Da bi se izognili programu pri kakršni koli težavi z združljivostjo, bralcem predlagam, naj ukaz zaženejo s podobnim okoljem.

DBInputWritable po meri:

paket com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.iodo .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable javni razred DBInputWritable implementira Writable, DBWritable {private int id private String name, dept public void readFields (DataInput in) meče IOException {} public void readFields (ResultSet) vrže SQLException // Objekt Resultset predstavlja podatke, vrnjene iz stavka SQL {id = rs.getInt (1) name = rs.getString (2) dept = rs.getString (3)} public void write (DataOutput out) vrže IOException { } public void write (PreparedStatement ps) vrže SQLException {ps.setInt (1, id) ps.setString (2, name) ps.setString (3, dept)} public int getId () {return id} public String getName () {return ime} javni niz getDept () {return dept}}

DBOutputWritable po meri:

paket com.inputFormat.copy import java.io.DataInput import java.io.DataOutput import java.io.IOException import java.sql.ResultSet import java.sql.PreparedStatement import java.sql.SQLException import org.apache.hadoop.iodo .Writable import org.apache.hadoop.mapreduce.lib.db.DBWritable javni razred DBOutputWritable implementira Writable, DBWritable {private String name private int id private String dept public DBOutputWritable (String name, int id, String dept) {this.name = ime this.id = id this.dept = dept} public void readFields (DataInput in) vrže IOException {} public void readFields (ResultSet rs) vrže SQLException {} public void write (DataOutput out) meta IOException {} public void write (PreparedStatement ps) vrže SQLException {ps.setString (1, ime) ps.setInt (2, id) ps.setString (3, dept)}}

Vhodna tabela:

ustvari bazo podatkov
ustvari tabelo emp (empid int ni null, ime varchar (30), dept varchar (20), primarni ključ (empid))
vstavi v vrednosti emp (1, 'abhay', 'developement'), (2, 'brundesh', 'test')
izberite * med emp

Primer 1: Prenos iz MySQL v HDFS

paket com.inputFormat.copy import java.net.URI import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce .Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat import org.apache.hadoop .io.Text import org.apache.hadoop.io.IntWritable javni razred MainDbtohdfs {public static void main (String [] args) vrže izjemo {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc .Driver ', // razred gonilnika' jdbc: mysql: // localhost: 3306 / edureka ', // db url' root ', // uporabniško ime' root ') // geslo Opravilo = novo opravilo (conf) .setJarByClass (MainDbtohdfs.class) job.setMapperClass (Map.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setInputFormatClass (DBInputFormat.putmat). nova pot (args [0])) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // ime vhodne tabele null, null, new String [] {'empid', 'name', 'dept'} / / stolpci tabele) Pot p = nova pot (args [0]) FileSystem fs = FileSystem.get (nov URI (args [0]), conf) fs.delete (p) System.exit (job.waitForCompletion (true)? 0: 1)}}

Ta del kode nam omogoča, da pripravimo ali konfiguriramo vhodno obliko za dostop do naše izvorne baze podatkov SQL. Parameter vključuje razred gonilnikov, URL vsebuje naslov baze podatkov SQL, uporabniško ime in geslo.

DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // razred voznika 'jdbc: mysql: // localhost: 3306 / edureka', // db url 'root', // uporabniško ime 'root') // geslo

Ta del kode nam omogoča, da podatke o tabelah posredujemo v zbirko podatkov in jih nastavimo v opravilu. Parametri vključujejo seveda primerek opravila, zapisljiv razred po meri, ki mora implementirati vmesnik DBWritable, ime izvorne tabele, pogoj, če je še kaj null, kateri koli parameter razvrščanja else null, seznam stolpcev tabele.

DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // ime vhodne tabele null, null, new String [] {'empid', 'name', 'dept'} // stolpci tabele)

Mapper

paket com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io .IntWritable javni razred Map razširja Mapper {
zaščitena void map (LongWritable key, DBInputWritable value, Context ctx) {try {String name = value.getName () IntWritable id = new IntWritable (value.getId ()) String dept = value.getDept ()
ctx.write (novo besedilo (ime + '' + id + '' + oddelek), id)
} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Reduktor: uporabljen reduktor identitete

Ukaz za zagon:

hadoop jar dbhdfs.jar com.inputFormat.copy.MainDbtohdfs / dbtohdfs

Izhod: tabela MySQL prenesena v HDFS

hadoop dfs -ls / dbtohdfs / *

Primer 2: Prenos iz ene tabele v MySQL na drugo v MySQL

ustvarjanje izhodne tabele v MySQL

ustvari tabelo worker1 (ime varchar (20), id int, dept varchar (20))

paket com.inputFormat.copy import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib .db.DBInputFormat import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io.NullWritable javni razred Mainonetable_to_other_table {public static void main (String [] args) vrže izjemo {Configuration conf = new Configuration () DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // class driver 'jdbc: mysql: // localhost : 3306 / edureka ', // db url' root ', // uporabniško ime' root ') // geslo Opravilo opravila = novo opravilo (conf) job.setJarByClass (Mainonetable_to_other_table.class) job.setMapperClass (Map.class) opravilo .setReducerClass (Reduce.class) job.setMapOutputKeyClass (Text.class) job.setMapOutputValueClass (IntWritable.class) job.setOutputKeyClass (DBOutputWritable.class) job.setOutputValueClass (NET). lWritable.class) job.setInputFormatClass (DBInputFormat.class) job.setOutputFormatClass (DBOutputFormat.class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // ime vhodne tabele null, null, new String [] {'empid ',' name ',' dept '} // stolpci tabele) DBOutputFormat.setOutput (opravilo,' worker1 ', // ime izhodne tabele new String [] {' name ',' id ',' dept '} // tabela stolpci) System.exit (job.waitForCompletion (true)? 0: 1)}}

Ta del kode nam omogoča, da v SQL DB konfiguriramo ime izhodne tabele. Parametra sta primerek opravila, ime izhodne tabele in ime izhodnega stolpca.

kaj je pojo programski model
DBOutputFormat.setOutput (job, 'worker1', // ime izhodne tabele new String [] {'name', 'id', 'dept'} // stolpci tabele)

Mapper: Enako kot v primeru 1

Reduktor:

paket com.inputFormat.copy import java.io.IOException import org.apache.hadoop.mapreduce.Reducer import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable import org.apache.hadoop.io .NullWritable javni razred Reduce razširi Reducer {zaščiteno void zmanjšanje (Besedilni ključ, Iterable vrednosti, Context ctx) {int sum = 0 String line [] = key.toString (). Split ('') try {ctx.write (new DBOutputWritable (vrstica [0] .toString (), Integer.parseInt (vrstica [1] .toString ()), vrstica [2] .toString ()), NullWritable.get ())} ulov (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

Ukaz za zagon:

hadoop jar dbhdfs.jar com.inputFormat.copy.Mainonetable_to_other_table

Rezultat: preneseni podatki iz EMP tabele v MySQL drugemu uslužbencu tabele1 v MySQL

Primer 3: Prenos iz tabele v MySQL v tabelo NoSQL (Hbase)

Ustvarjanje tabele Hbase za sprejem izhoda iz tabele SQL:

ustvari 'zaposleni', 'official_info'

Razred voznika:

paket Dbtohbase import org.apache.hadoop.conf.Configuration import org.apache.hadoop.mapreduce.Job import org.apache.hadoop.mapreduce.lib.db.DBConfiguration import org.apache.hadoop.mapreduce.lib.db.DBInputFormat import org.apache.hadoop.hbase.mapreduce.TableOutputFormat import org.apache.hadoop.hbase.HBaseConfiguration import org.apache.hadoop.hbase.client.HTable import org.apache.hadoop.hbase.client.HTableInterface import org.apache .hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil import org.apache.hadoop.io.Text javni razred MainDbToHbase {public static void main (String [] args) vrže izjemo {Configuration conf = config HBaseConfiguration.create () HTableInterface mytable = new HTable (conf, 'emp') DBConfiguration.configureDB (conf, 'com.mysql.jdbc.Driver', // razred voznika 'jdbc: mysql: // localhost: 3306 / edureka' , // db url 'root', // uporabniško ime 'root') // geslo Opravilo opravila = novo opravilo (conf, 'dbtohbase') job.setJarByClass (MainDbToHbase.class) job.s etMapperClass (Map.class) job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class) TableMapReduceUtil.initTableReducerJob ('uslužbenec', Reduce.class. class) DBInputFormat.setInput (job, DBInputWritable.class, 'emp', // ime vhodne tabele null, null, new String [] {'empid', 'name', 'dept'} // stolpci tabele) System.exit (job.waitForCompletion (true)? 0: 1)}}

Ta del kode vam omogoča konfiguriranje razreda izhodnega ključa, ki je v primeru hbase ImmutableBytesWritable

job.setMapOutputKeyClass (ImmutableBytesWritable.class) job.setMapOutputValueClass (Text.class)

Tu posredujemo ime tabele hbase in reduktor, ki delujeta na mizo.

TableMapReduceUtil.initTableReducerJob ('zaposleni', Reduce.class, job)

Mapper:

paket Dbtohbase import java.io.IOException import org.apache.hadoop.mapreduce.Mapper import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.util.Bytes import org.apache.hadoop.io .LongWritable import org.apache.hadoop.io.Text import org.apache.hadoop.io.IntWritable javni razred Zemljevid razširja Mapper {private IntWritable one = new IntWritable (1) zaščitena void map (LongWritable id, DBInputWritable value, Context context). {try {String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), novo besedilo (vrstica + ' '+ dept))} catch (IOException e) {e.printStackTrace ()} catch (InterruptedException e) {e.printStackTrace ()}}}

V tem delu kode jemljemo vrednosti iz geterjev razreda DBinputwritable in jih nato posredujemo
ImmutableBytesWritable, tako da pridejo do reduktorja v obliki, ki jo je mogoče zapisati z bajtom, kar Hbase razume.

String line = value.getName () String cd = value.getId () + '' String dept = value.getDept () context.write (new ImmutableBytesWritable (Bytes.toBytes (cd)), novo besedilo (vrstica + '' + dept ))

Reduktor:

paket Dbtohbase import java.io.IOException import org.apache.hadoop.hbase.client.Put import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.hbase.mapreduce.TableReducer import org.apache.hadoop .hbase.util.Bytes import org.apache.hadoop.io.Text javni razred Reduce razširi TableReducer {public void reduce (ImmutableBytesWritable key, Iterable values, Context context) meče IOException, InterruptedException {String [] vzrok = null // Loop values za (Besedilo val: vrednosti) {vzrok = val.toString (). split ('')} // Put to HBase Put put = new Put (key.get ()) put.add (Bytes.toBytes ('official_info') ), Bytes.toBytes ('ime'), Bytes.toBytes (vzrok [0])) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('oddelek'), Bytes.toBytes (vzrok [1 ])) context.write (ključ, pot)}}

Ta del kode nam omogoča, da določimo natančno vrstico in stolpec, v katerem bi shranili vrednosti iz reduktorja. Tu shranjujemo vsak empid v ločeno vrstico, kot smo naredili empid kot ključ vrstice, ki bi bil unikaten. V vsaki vrstici shranjujemo uradne podatke o zaposlenih v družino stolpcev „official_info“ pod stolpce „ime“ oziroma „oddelek“.

Put put = novo Put (key.get ()) put.add (Bytes.toBytes ('official_info'), Bytes.toBytes ('name'), Bytes.toBytes (vzrok [0])) put.add (Bytes. toBytes ('official_info'), Bytes.toBytes ('department'), Bytes.toBytes (vzrok [1])) context.write (key, put)

Preneseni podatki v Hbase:

skeniraj zaposlenega

Kot vidimo, smo uspešno opravili nalogo selitve naših poslovnih podatkov iz relacijskega DB DB v NoSQL DB.

V naslednjem blogu se bomo naučili, kako pisati in izvajati kode za druge vhodne in izhodne formate.

Objavljajte svoje komentarje, vprašanja ali kakršne koli povratne informacije. Rad bi slišal od vas.

Imate vprašanje za nas? Prosimo, omenite to v oddelku za komentarje in se vam bomo javili.

Sorodne objave: