RDD z uporabo Spark: gradnik Apache Spark



Ta spletni dnevnik o RDD, ki uporablja Spark, vam bo zagotovil podrobno in celovito znanje o RDD, ki je temeljna enota Sparka in kako uporaben je.

, Že sama beseda je dovolj, da v mislih vsakega Hadoopovega inženirja ustvari iskro. TO n v pomnilniku orodje za obdelavo kar je bliskovito hitro pri računalniškem grozdu. V primerjavi z MapReduce izmenjava podatkov v pomnilniku naredi RDD-je 10-100x hitreje kot skupna raba omrežja in diska, vse to pa je mogoče zaradi RDD-jev (odpornih naborov porazdeljenih podatkov). Ključne točke, na katere se danes osredotočamo v tem RDD s člankom Spark, so:

Potrebujete RDD-je?

Zakaj potrebujemo RDD? -RDD z uporabo Spark





Svet se razvija z in Podatkovna znanost zaradi napredovanja v . Algoritmi temelji na Regresija , , in ki teče naprej Porazdeljeno Iterative Comput acija moda, ki vključuje ponovno uporabo in skupno rabo podatkov med več računalniškimi enotami.

Tradicionalno tehnike, ki potrebujejo stabilno vmesno in porazdeljeno shrambo HDFS ki vključuje ponavljajoče se izračune s podvajanjem podatkov in serializacijo podatkov, zaradi česar je bil postopek veliko počasnejši. Iskanje rešitve ni bilo nikoli enostavno.



Tu je RDD (Resilient Distributed Datasets) prihaja do velike slike.

RDD enostavne za uporabo in enostavno ustvarjanje, saj se podatki uvažajo iz podatkovnih virov in spustijo v RDD-je. Nadalje se uporabljajo operacije za njihovo obdelavo. So porazdeljena zbirka spomina z dovoljenji kot Le za branje in kar je najpomembneje, so Odporen na napake .



Če kateri podatkovna particija od RDD je izgubljeno , se lahko regenerira z uporabo istega preobrazba operacijo na tej izgubljeni particiji v rod , namesto da bi obdelali vse podatke iz nič. Ta pristop v realnem času lahko povzroči čudeže v primerih izgube podatkov ali kadar sistem ne deluje.

Kaj so RDD?

RDD ali ( Nabor elastičnih porazdeljenih podatkov ) je temeljna podatkovna struktura v Sparku. Izraz Odporna definira zmožnost samodejnega ustvarjanja podatkov ali podatkov kotaljenje nazaj do prvotno stanje kadar pride do nepričakovane nesreče z verjetnostjo izgube podatkov.

Podatki, zapisani v RDD, so razdeljen in shranjene v več izvedljivih vozlišč . Če je izvršilno vozlišče ne uspe v času izvajanja, nato takoj dobi varnostno kopijo iz naslednje izvedljivo vozlišče . Zato RDD-ji veljajo za napredne vrste podatkovnih struktur v primerjavi z drugimi tradicionalnimi podatkovnimi strukturami. RDD lahko shranjujejo strukturirane, nestrukturirane in polstrukturirane podatke.

metoda preobremenitev in razveljavitev v javi

Nadaljujmo z našim RDD z uporabo spletnega dnevnika Spark in spoznajmo edinstvene značilnosti RDD-jev, kar mu daje prednost pred drugimi vrstami podatkovnih struktur.

Značilnosti RDD

  • V spomin (OVEN) Izračuni : Koncept izračunavanja v pomnilniku vodi obdelavo podatkov do hitrejše in učinkovitejše faze, ko gre za celotno izvedba sistema je nadgrajena.
  • L njegovo vrednotenje : Izraz Leno vrednotenje pravi preobrazbe se uporabljajo za podatke v RDD, vendar izhod ni ustvarjen. Namesto tega so uporabljene transformacije prijavljeni.
  • Vztrajnost : Nastali RDD so vedno za večkratno uporabo.
  • Grobozrnate operacije : Uporabnik lahko prek vseh sprememb v naborih podatkov uporabi transformacije zemljevid, filter ali razvrsti po operacij.
  • Prenašanje napak : Če pride do izgube podatkov, lahko sistem nazaj nazaj do svojega prvotno stanje z uporabo prijavljenega preobrazbe .
  • Nespremenljivost : Podatkov ni mogoče definirati, pridobiti ali ustvariti spremenila ko je prijavljen v sistem. Če morate dostopati do obstoječega RDD in ga spremeniti, morate ustvariti nov RDD z uporabo nabora Preobrazba deluje na trenutni ali predhodni RDD.
  • Pregrada : Je ključna enota vzporednosti v Iskri RDD. Privzeto število ustvarjenih particij temelji na vašem viru podatkov. Lahko celo določite število particij, ki jih želite uporabiti particija po meri funkcije.

Ustvarjanje RDD z uporabo Spark

RDD-je je mogoče ustvariti v trije načini:

  1. Branje podatkov iz vzporedne zbirke
val PCRDD = spark.sparkContext.parallelize (Array ('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat'), 2) val resultRDD = PCRDD.collect () resultRDD.collect ( ) .foreach (println)
  1. Prijava preobrazba na prejšnjih RDD
val besede = spark.sparkContext.parallelize (Seq ('Spark', 'je', 'a', 'zelo', 'močan', 'jezik')) val wordpair = words.map (w = (w.charAt ( 0), w)) wordpair.collect (). Foreach (println)
  1. Branje podatkov iz zunanji pomnilnik ali poti datotek, kot so HDFS ali HBase
val Sparkfile = spark.read.textFile ('/ user / edureka_566977 / spark / spark.txt.') Sparkfile.collect ()

Operacije, izvedene na RDD:

Na RDD se izvajata predvsem dve vrsti operacij, in sicer:

  • Preobrazbe
  • Dejanja

Preobrazbe : The operacij uporabljamo za RDD za filter, dostop in spremeniti podatke v nadrejenem RDD za generiranje a zaporedni RDD je poklican preobrazba . Novo RDD vrne kazalec na prejšnji RDD, ki zagotavlja medsebojno odvisnost.

Preobrazbe so Lene ocene, z drugimi besedami, operacije, uporabljene na RDD, ki jih delate, bodo zabeležene, vendar ne izvršena. Sistem po sprožitvi datoteke vrže rezultat ali izjemo Ukrepanje .

Transformacije lahko razdelimo na dve vrsti, kot spodaj:

  • Ozke preobrazbe
  • Široke preobrazbe

Ozke preobrazbe Za a uporabimo ozke transformacije ena particija nadrejenega RDD za ustvarjanje novega RDD, saj so podatki, potrebni za obdelavo RDD, na voljo na eni particiji nadrejeni ASD . Primeri za ozke transformacije so:

  • zemljevid()
  • filter ()
  • flatMap ()
  • particija ()
  • mapPartitions ()

Široke transformacije: Široko preobrazbo uporabljamo na več particij za ustvarjanje novega RDD. Podatki, potrebni za obdelavo RDD, so na voljo na več particijah nadrejeni ASD . Primeri za široke transformacije so:

  • zmanjšanjeBy ()
  • union ()

Dejanja : Dejanja naročajo Apache Spark, da se prijavi računanje in rezultat ali izjemo pošljite nazaj na RDD gonilnika. Nekaj ​​ukrepov vključuje:

  • zbrati ()
  • štetje ()
  • vzemi ()
  • najprej ()

Praktično uporabimo operacije na RDD:

IPL (indijska premier liga) je kriket turnir, ki je na vrhuncu. Torej, omogočimo si danes, da se lotimo nabora podatkov IPL in izvedemo svoj RDD s pomočjo Spark.

  • Prvič, prenesi CSV podatke o ujemanju IPL. Po prenosu začne videti kot datoteka EXCEL z vrsticami in stolpci.

V naslednjem koraku prižgemo iskrico in naložimo datoteko match.csv z mesta, v mojem primeru mojegacsvlokacija datoteke je “/User/edureka_566977/test/matches.csv”

Zdaj pa začnimo z Preobrazba prvi del:

  • zemljevid():

Uporabljamo Preoblikovanje zemljevida za uporabo posebne operacije pretvorbe za vsak element RDD. Tu ustvarimo RDD z imenom CKfile, kjer shranimo našocsvmapa. Ustvarili bomo še eno RDD, imenovano države shranite podrobnosti o mestu .

iskra2-lupina val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println) val state = CKfile.map (_. split (',') (2)) state.collect (). foreach (println)

  • filter ():

Preoblikovanje filtra, že samo ime opisuje njegovo uporabo. To operacijo preoblikovanja uporabljamo za filtriranje selektivnih podatkov iz zbirke podanih podatkov. Prijavite se delovanje filtra tukaj, da dobite zapise IPL tekem leta 2017 in ga shranite v fil RDD.

val fil = CKfile.filter (vrstica => line.contains ('2017')) fil.collect (). foreach (println)

  • flatMap ():

Za vsak element RDD uporabljamo funkcijo pretvorbe, da ustvarimo novo RDD. Podobno je preoblikovanju zemljevida. tukaj se prijavimoFlatmapdo izpljunite tekme mesta Hyderabad in shranite podatke vfilRDDRDD.

val filRDD = fil.flatMap (vrstica => line.split ('Hyderabad')). collect ()

  • particija ():

Vsi podatki, ki jih zapišemo v RDD, so razdeljeni na določeno število particij. To preobrazbo uporabljamo za iskanje število particij podatki so dejansko razdeljeni na.

val fil = CKfile.filter (vrstica => line.contains ('2017')) fil.partitions.size

  • mapPartitions ():

MapPatitions obravnavamo kot alternativo Map () inza vsakogar() skupaj. Tukaj uporabljamo mapPartitions za iskanje število vrstic imamo v našem RDD fil.

val fil = CKfile.filter (line => line.contains ('2016')) fil.mapPartitions (idx => Array (idx.size) .iterator) .collect

  • reduceBy ():

UporabljamoZmanjšajBy() naprej Pari ključ-vrednost . To preobrazbo smo uporabili na našemcsvdatoteko, da poiščete predvajalnik z najvišji mož tekem .

val ManOfTheMatch = CKfile.map (_. split (',') (13)) val MOTMcount = ManOfTheMatch.map (WINcount => (WINcount, 1)) val ManOTH = MOTMcount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) ManOTH.take (10) .foreach (println)

kako narediti slovar v javi -
  • union ():

Ime vse pojasnjuje, Uporabljamo sindikalno preobrazbo združite dva RDD skupaj . Tu ustvarjamo dva RDD, in sicer fil in fil2. fil RDD vsebuje zapise o tekmah IPL za leto 2017 in fil2 RDD vsebuje zapis o tekmah IPL za leto 2016.

val fil = CKfile.filter (vrstica => line.contains ('2017')) val fil2 = CKfile.filter (line => line.contains ('2016')) val uninRDD = fil.union (fil2)

Začnimo z Ukrepanje del, kjer prikazujemo dejanski izhod:

  • collect ():

Zbiranje je dejanje, ki ga uporabljamo prikaz vsebine v RDD.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.collect.foreach (println)

  • štetje ():

Štetjeje dejanje, s katerim štejemo število zapisov prisotna v RDD.Tukajto operacijo uporabljamo za štetje skupnega števila zapisov v naši datoteki matches.csv.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') CKfile.count ()

  • take ():

Take je operacija akcije, podobna zbiranju, vendar je edina razlika v tem, da lahko natisne katero koli selektivno število vrstic na zahtevo uporabnika. Tukaj za tiskanje uporabimo naslednjo kodo deset vodilnih poročil.

val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.collect (). foreach (println) statecountm. take (10) .foreach (println)

  • najprej ():

First () je akcijska operacija, podobna zbiranju () in take ()touporablja se za tiskanje najvišjega poročila s izhodom Tu uporabimo prvo () operacijo za iskanje največje število odigranih tekem v določenem mestu in kot rezultat dobimo Mumbai.

val CKfile = sc.textFile ('/ user / edureka_566977 / test / matches.csv') val stanja = CKfile.map (_. split (',') (2)) val Scount = state.map (Scount => ( Scount, 1)) scala & gt val statecount = Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) Scount.reduceByKey ((x, y) => x + y) .collect.foreach (println) val statecountm = Scount.reduceByKey ((x, y) => x + y) .map (tup => (tup._2, tup._1)) sortByKey (false) statecountm.first ()

Da bi bil naš proces učenja RDD s pomočjo Sparka še bolj zanimiv, sem pripravil zanimiv primer uporabe.

RDD z uporabo Spark: Pokemon Use Case

  • Prvič, Prenesite datoteko Pokemon.csv in jo naložite v lupino iskra, kot smo to storili z datoteko Matches.csv.
val PokemonDataRDD1 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') PokemonDataRDD1.collect (). foreach (println)

Pokemoni so dejansko na voljo v najrazličnejših različicah. Najdimo nekaj sort.

  • Odstranjevanje sheme iz datoteke Pokemon.csv

Mogoče ga ne bomo potrebovali Shema datoteke Pokemon.csv. Zato ga odstranimo.

val Head = PokemonDataRDD1.first () val NoHeader = PokemonDataRDD1.filter (line =>! line.equals (Head))

  • Iskanje števila predelne stene naš pokemon.csv je razdeljen v.
println ('No.ofpartitions =' + NoHeader.partitions.size)

  • Vodni Pokemon

Iskanje število vodnih pokemonov

val WaterRDD = PokemonDataRDD1.filter (line => line.contains ('Water')) WaterRDD.collect (). foreach (println)

  • Fire Pokemon

Iskanje število požarnih pokemonov

val FireRDD = PokemonDataRDD1.filter (line => line.contains ('Fire')) FireRDD.collect (). foreach (println)

  • Prav tako lahko zaznamo prebivalstva druge vrste pokemonov z uporabo funkcije count
WaterRDD.count () FireRDD.count ()

  • Ker mi je všeč igra obrambna strategija najdemo pokemon s največja obramba.
val defenceList = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble)} println ('Highest_Defence:' + defenceList.max ())

  • Vemo največ vrednost obrambne moči vendar ne vemo, kateri pokemon je. torej, poiščimo, kaj je to pokemon.
val defWithPokemonName = NoHeader.map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered (1) (Naročanje [Double] .reverse.on (_._ 1)) MaxDefencePokemon.foreach (println)

  • Zdaj pa rešimo pokemon s najmanj Obramba
val minDefencePokemon = defenceList.distinct.sortBy (x => x.toDouble, true, 1) minDefencePokemon.take (5) .foreach (println)

  • Zdaj pa si oglejmo Pokemone z a manj obrambna strategija.
val PokemonDataRDD2 = sc.textFile ('/ user / edureka_566977 / PokemonFile / PokemonData.csv') val Head2 = PokemonDataRDD2.first () val NoHeader2 = PokemonDataRDD2. .map {x => x.split (',')}. map {x => (x (6) .toDouble, x (1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered (1) (Ordering [Double ] .on (_._ 1)) MinDefencePokemon2.foreach (println)

S tem smo prišli do konca tega RDD z uporabo članka Spark. Upam, da smo malo osvetlili vaše znanje o RDD-jih, njihovih značilnostih in različnih vrstah operacij, ki jih je mogoče izvajati na njih.

Ta članek temelji na je zasnovan tako, da vas pripravi na izpit za certificiranje razvijalcev Cloudera Hadoop in Spark (CCA175). Dobili boste poglobljeno znanje o Apache Spark in Spark Ekosistemu, ki vključuje Spark RDD, Spark SQL, Spark MLlib in Spark Streaming. Pridobili boste celovito znanje o programskem jeziku Scala, HDFS, Sqoop, Flume, Spark GraphX ​​in sistemu za sporočanje, kot je Kafka.