Kumulativna transformacija stanja v pretakanju iskri Apache



Ta objava v spletnem dnevniku govori o transformacijah, ki so pomembne za pretok v iskrih. Izvedite vse o kumulativnem sledenju in spretnostih za kariero Hadoop Spark.

Prispeval Prithviraj Bose

V svojem prejšnjem blogu sem razpravljal o transformacijah s stanjem z uporabo koncepta oken Apache Spark Streaming. Lahko ga preberete tukaj .





V tem prispevku bom razpravljal o kumulativnih operacijah s stanjem v Apache Spark Streaming. Če ste novi v Spark Streamingu, vam toplo priporočam, da preberete moj prejšnji spletni dnevnik, da boste razumeli, kako deluje okno.

Vrste transformacije s stanjem v pretočnem iskanju (nadaljevanje…)

> Kumulativno sledenje

Uporabili smo reduceByKeyAndWindow (...) API za sledenje stanjem ključev, vendar okno postavlja omejitve za nekatere primere uporabe. Kaj pa, če želimo stanja ključev kopičiti ves čas, namesto da bi ga omejevali na časovno okno? V tem primeru bi morali uporabiti updateStateByKey (…) OGNJ.



Ta API je bil uveden v Spark 1.3.0 in je bil zelo priljubljen. Vendar ima ta API nekaj dodatnih stroškov, njegova zmogljivost se poslabša, ko se velikost držav sčasoma poveča. Napisal sem vzorec, ki prikazuje uporabo tega API-ja. Kodo lahko najdete tukaj .

java cast string do danes

Spark 1.6.0 je predstavil nov API mapWithState (…) ki rešuje splošne stroške uspešnosti, ki jih predstavlja updateStateByKey (…) . V tem blogu bom razpravljal o tem posebnem API-ju z vzorčnim programom, ki sem ga napisal. Kodo lahko najdete tukaj .

Preden se potopim v prehod kode, si privoščimo nekaj besed o kontrolni točki. Za vsako transformacijo s stanjem je kontrolna točka obvezna. Kontrolna točka je mehanizem za obnovitev stanja tipk v primeru, da gonilniški program ne uspe. Ko se gonilnik znova zažene, se stanje ključev obnovi iz datotek kontrolne točke. Lokacije kontrolnih točk so običajno HDFS ali Amazon S3 ali katero koli zanesljivo shrambo. Med preskušanjem kode lahko shranite tudi v lokalni datotečni sistem.



V vzorčnem programu poslušamo tok besedila vtičnice na host = localhost in port = 9999. Tokenizira dohodni tok v (besede, število pojavitev) in sledi številu besed z API-jem 1.6.0 mapWithState (…) . Poleg tega se s pomočjo odstranijo ključi brez posodobitev StateSpec. timeout API. Kontroliramo v HDFS in frekvenca kontrolnih točk je vsakih 20 sekund.

Najprej ustvarimo sejo Spark Streaming,

Spark-streaming-session

Ustvarimo a checkpointDir v HDFS in nato pokličite objektno metodo getOrCreate (…) . The getOrCreate API preverja checkpointDir če želite preveriti, ali obstajajo prejšnja stanja za obnovitev, če to obstaja, nato znova ustvari sejo Spark Streaming in posodobi stanja ključev iz podatkov, shranjenih v datotekah, preden nadaljuje z novimi podatki. V nasprotnem primeru ustvari novo sejo Spark Streaming.

The getOrCreate vzame ime imenika kontrolne točke in funkcijo (ki smo jo poimenovali createFunc ) čigar podpis naj bo () => StreamingContext .

Preučimo kodo znotraj createFunc .

Vrstica št. 2: Ustvarimo pretočni kontekst z imenom opravila na »TestMapWithStateJob« in intervalom serije = 5 sekund.

Vrstica 5: nastavite imenik kontrolne točke.

Vrstica 8: nastavite specifikacijo stanja s pomočjo razreda org.apache.streaming.StateSpec predmet. Najprej nastavimo funkcijo, ki bo spremljala stanje, nato nastavimo število particij za nastale DStreamove, ki jih je treba generirati med naslednjimi transformacijami. Na koncu nastavimo še časovno omejitev (na 30 sekund), če v 30 sekundah ne dobimo nobene posodobitve za ključ, bo stanje ključa odstranjeno.

Vrstica 12 #: nastavite tok vtičnice, poravnajte dohodne paketne podatke, ustvarite par ključ-vrednost, pokličite mapWithState , interval kontrolne točke nastavite na 20 s in na koncu natisnite rezultate.

Okvir Spark kliče th e createFunc za vsak ključ s prejšnjo vrednostjo in trenutnim stanjem. Izračunamo vsoto in stanje posodobimo s kumulativno vsoto in na koncu vrnemo vsoto za ključ.

razlika med končno in končno

Viri Github -> TestMapStateWithKey.scala , TestUpdateStateByKey.scala

Imate vprašanje za nas? Prosimo, omenite to v oddelku za komentarje in se vam bomo javili.

Sorodne objave:

Začnite z Apache Spark & ​​Scala

Pomembne transformacije z okni v pretočnem iskanju