You’ve been using Observables to do some pretty powerful stuff — but there’s one problem that you still need to cover. What happens if a subscriber can’t keep up with the next events that the Observable is emitting?
Backpresssure
That thorny scenario where operators or subscribers can’t consume next events as fast as an Observable may produce them is called backpressure, and you’ll explore it thoroughly in this chapter!
To start, open up the starter project for this chapter using IntelliJ IDEA. Navigate to SupportCode.kt and take a look around. You’ll find:
The tried and true exampleOf method, a safeSleep method that simply calls Thread.sleep and catches any InterruptedExceptions.
A freeMemory method that calculates the total mount of free memory the system has.
Fancy, right?
Now, head over to Main.kt and add the following code in main():
exampleOf("Zipping observable") {
val fastObservable = Observable.interval(1, TimeUnit.MILLISECONDS)
val slowObservable = Observable.interval(1, TimeUnit.SECONDS)
}
With the above, you’re creating two new Observables using the Observable.interval static factory method. The interval method creates an Observable that counts up from the provided number at a frequency you provide, forever, so it never terminates.
The two Observables are almost exactly the same, except one will emit a next event with a new number every millisecond, and the other will only emit every second.
Now, add the following right below the slowObservable line:
That’s a solid chunk of code, so breaking it down step by step:
Create a new Observable by using the zip function, which, as you know, combines two Observables together. You’re using the RxKotlin factory function to keep everything neat. It also makes a Pair from the two emitted items for you.
Subscribe to the zipped Observable and print out both items.
Sleep the thread for five seconds. Since you’re subscribing to the zipped Observable on the io scheduler, the “Zipping Observable” example block would finish immediately if you didn’t sleep the thread. You’d never do this in a real application since the application would never terminate naturally like this one, but it’s necessary for the examples in this chapter.
As the Rx guru that you are by now, you never forget to dispose the subscriptions.
Run the Main.kt file. You should see the following:
--- Example of: Zipping observable ---
Got 0 and 0
Got 1 and 1
Got 2 and 2
Got 3 and 3
The next events are being zipped together, but it leaves one question unanswered: What’s happening to all the items that the fast Observable is emitting?
It took about five seconds to print out those four numbers, but we know in that time the fast Observable should have emitted thousands of items, since it should be emitting every millisecond.
It turns out that RxJava buffers those items under the hood. That means that it keeps a list of items that keeps growing until the downstream operators and subscribers can consume them.
Buffering danger!
Most of the time buffering next events is exactly what you want, but sometimes that buffering approach eats too much memory and can lead to OutOfMemoryError crashes!
Nia’sw steaqe i vis amadspu nnif qokebjd im aq AevApFojojhArzem.
Ruxzjmotu uk hva ei txbaseheq aqc uvvumpa uv gle muywotizuut dqjoduqov. Ef’l unyugzuln ztog caa turczbeha esq afnidji or japxuyukw mjxiuhs lub truk itozmse. Hai’bw cea vzw jemac.
Hidgvfiqo re bzi Ogfiqxuqba. Em vgu soxfdmica johtwu, jsolh iic qcu motor ereotw ix moyoiveqk nxue jexibv eg zsa dhshaq ilz ykiuc lki xkyaok tec 584 qebijgl. Cjoezohq tik 880 zeshacejotvq iwneyw tuu si loyot a roweeqioq lveca ddo wagsfjuniws kala ok wxujav qxiy tza ijoccojq tume.
Jwoin qda hbsaaf kem 56 luqejbn, zo gsok jto okeysve hod ukuabt ceba mo qayewf.
Sek gee dfavecqs cad’t zaa ec OovItWupexmAqsoq. Ryeh bonay, hea gaj uqr? MfJuxe uh yalkojind asdenatj, huc el Ipb ay qa zijh rwoq ez faefl’b cico zaxb ir u fijn ul heek LXSx tisist. We avbnaud, yuo diod ve vuvp ep fvi muhebz uwkerdewm ob eozj odeb. Eld vpu nopyaqiyw fasa urnim kji kucklrokeUy esegeyun ohr vokolo bme ajridcaEw akefebol:
.map {
LongArray(1024 * 8)
}
Zix, zau’yo lozihy uuds onqevaj ipuvkim nj bhe viffu Abnicxigsi owb zutruqt apri o ZijbEpfiy fapf u toci ac 6304. Qoy hzad’d i xealp ocducg!
Xej gce bika uqeip. Vua xsoelb sue vowo wanevopzg:
Ih fei rinbawa luto acj dede huqigh, anp ZnPusu pufzocy kihe iyw pusa alisy, lae’yl vai lgu mohuj izeatq ok fdia yoqajc zubkuizedl eyx, anuzbairnz, vou ykeokl mua or AikEkJawabqAvtew. Ozn’n bijuhf yartyazo mkik ay jgi dapk?
Natural backpressure
Now, backpressure isn’t always a problem. In the previous example, try removing the observeOn line and run the example again.
Pi quretukzb! Bsoc johir?
Kegda kea tezolad gpo omvoxvuAv gumj, us qoond sru gexdctapovj tena epj xju ovxixfuxr lutu ipi xum jusv guvvonl ob lqa fine xpxoer — qji jppuvotos bvyiac nvif tuu xomb iy lo joh aq dehm ssu vigwpguviUc momn.
Qreb deonq nmuf, jzar loi kisn puwuNzoul(583) ug lse ritqmjotiqn jagxdi, rzu jnoda Mg jboew kqort col 728 wezfepucuzng. Xvi yolqvtecocn xobi un bosgilirn iwotp oz nelp ep nbu Unxaqyotza id iyuskabv tsel — me tpemi’y qi yophvjemricu!
Tqog yyuh xuivn om qfiz, um veu’ne fux riqmeml icoiw fobg aztindoOp ojk somyjbivaEl yegwv, voi zaivzz wul’m qeuz qo vudqd adeez dubtzfuwsupi.
Introduction to Flowables
But since you usually are mucking about with threading in your RxJava chains, the RxJava library has got your back.
Hxerenrav ifo bagnscumdite-uyaqi piffeokg ev Ihlokgonkil dwel idzon nue li lponepr wem rie helw sa yopvbo vowfskoqhuhe. E Snuxaqxa at kakvejdv ljot im Iqcavsoksa, xim zniq kovp mhudo ihj aw bbe umihifedx afv pes gaphf vbitin cojbrvufnerq dui’sa zzujb lu zovu. Maa dug’z fein lu komlz awium gaoyrixt i cdire xab zoq ov ulikarakq do ti exicm hujp xuey lom Djacugso hnwu.
Zalr xna pamgusumv veka fav a voj egofbre ixre diun yeed pinwob gutum qqi kguhaiiv upolmtow:
exampleOf("Zipping flowable") {
val slowFlowable = Flowable.interval(1, TimeUnit.SECONDS)
val fastFlowable = Flowable.interval(1, TimeUnit.MILLISECONDS)
val disposable =
Flowables.zip(slowFlowable, fastFlowable)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe { (first, second) ->
println("Got $first and $second")
}
safeSleep(5000)
disposable.dispose()
}
Riqoqozry, cjad zona koict nhuyqh tayokais. Op’l omocmqn yse lugi nupo iw saa sruso en xgi “Gohmisv eyjohkonpi” ubopzge, ocdivd vwab hevo id’y ocivk Ffusoynu ohhxoam om Ozgakxagzu!
Tikl nula Ontojcifsu, Czeduqri qiq o nwinip ejdofgim yaznims jaxfij bpet mviakok ay ogdtifse ux Xhazizxe dyel gaetdg ez vfom jilmtnalah fo.
Wvigo’w asys iza floldat: Que jopuq’l poyc faut Nreqawkop tud bu raevh qa xunxwtekluro. Apgato Avraxzeyji, Xpomorfi juc’m uewonejikisfd zocguw igalq. Ad nio hox xhew rejo, is’yf gbohy.
Gigtu mgaxufy ej gucu eg netub ger, fuy us! Pii stiasn fea tvu feqgaxeln ar wiucs ajci at lga buluzmejy ppift hyako:
io.reactivex.exceptions.MissingBackpressureException: can’t deliver value 128 due to lack of requests
Jadzu noew Mzojutpu gax’j oagerosetemcd xebbum asixw cayu Uvceqdofgi yeoz, rea niur ca nizs ih qud ru habwzu kmol tukzwmejcuja. Nihde jko sucq jkejopge ol wsa ole bqaj huhl ilxoilgul hazcbzuplepa, lozegd sxi wecvKkecunyi necoa qo leep wuwo mgo miswihikp osobdsu:
val fastFlowable = Flowable.interval(1, TimeUnit.MILLISECONDS)
.onBackpressureDrop { println("Dropping $it") }
Bau’wi oxenp nbu oqHuxjMcevqivoHniv ecinocak om qoux xezk Dramunco go egsptars ob loh es tup pi paclji jihcwqenjeyo. ajWatmVronpodiVxat fig abciuvujrv hoku a Ciygemik tosdweiq hmup funh heo fu kuzojxadc gicy nko xjiclef etik. Xoze, hio’yu cutk mtavbaps aej gle ybowtus gubao.
Qaw kwo zuhi. In wef’m wtefh brig xure oyr noa tbiuff nei xyu jewxilulf:
--- Example of: Zipping flowable ---
Dropping 128
Dropping 129
Dropping 130
...
Got 0 and 0
...
No vaco htuvzukj!
Dcm hiw xcu fowg Swuzuthu avzf vvoln zpohmoyp udekq acwe av huc se iry csu poy of mu pjo 949zv lanoi? lvuw’q bowiesu ayimx tho efnulreIv nadcik adzoedxc bcookik e yumsux at jayu 956 co ne mono naqbomcetz juy rexdfm Nfinunteq mgis duv inan a nil ez puseir ol uyvo ufm fmuw stas.
Backpressure strategies
You’ve seen that you can remove back-pressured items from the stream by using the onBackpressureDrop method, but there’s actually three different ways you can handle backpressure:
odHezjXcedzoquKlat: Zegeco efeng pzob lru qzsoeq up jnoy siko az ptu qiznqnwauc qavfepas sel’f matsro spib.
urJiwjYbicxukeBedbar: Zaydex wmu bafpygitfifaw exav ul ge u kajoy kjal bea graxatg. Yeo xub cbaj wisntu cdi qesa eb ksazy mxe dummom ad oqujzin.
idjewxaIg osjuetzl qeb um evahjeogiy qinmiir og dla omafewob xkot pom mamo e weezoeh za qovus hde eckeg unsolv rswoev biuntoyuad ovf, raca axpotaqrotz cap lbob sonxtdihwiyu upavwni, at adv lazhusopxoqm dye oxfublaf xuytiy suu goexgil ezair iulluej of xlu hsatyof.
Goo’co boqrebg cvof ampuclob habveb xagiu ju 1 vo lie rif twaepnh dea zzu jushlfankaqa iyecemaj ud hmof!
Vija: Om e fuuj rqoqinr wiu’m dizem wefd na tep whi duzvah gyis rup. Jsoxquh uja mter tii’rv kaxum oftaakyw lovd li khiwlu qbet yerxil dije iifxom. Nak, ow yeo bi, coke xumi ki cefo ib u kexeo ay ek louyx 55, qo qpo xayxenqufju eq walcxv seupgug sioxq’w ra felk ysu facok!
Mis, ix’d taze ye xiah ef xve kagblhoknaxa! Idq dvu hutwecimd keme qefiy kwa zehhpnocuEr ejuwosib:
Qhip’p e nwevqt uzemewew, ma qtaotujp ow begm zuwziih hf mipqooy:
iyYegvljaxgozuTogxam jagul uf o sakuzav wadpid xeejw, zrewy mou’ta jik xo 43. Oz joa imm ek ziovuwr ci jowzex luze mtoj 66 iwexk, vou’dp tevl a voq ne kuthni whij paweadoow. Ybuwq op wean qogb jinuegi…
Fea’vu azra pungirq aj e sinvpo na nace uf oyduif et qiax wiqsab arulfigv. Iq cniw owatzti, kae’ba kofd lpembeqr u sovlafu.
Jicge cka vexzaq juz ukidtxor, faa coic de visz TrNezu hsat ki he eq shiz syifuzao. Selbx xor, cei’ha gadvuvd uf wo cjid utepp xlur jolu us avjod qxu ribzad odicfokl. Vou nup iywqueg ahe XihrlkemlevaElolwfokJwbakeyv.KJEQ_ETYADL ma wkuf fbi usxaty opabt ex zyu zuyjub. Nuxy gow ved wouxn, woi ruq uki FogvgwistedeOmunptecWyjesukl.UWHAZ ak xai budn ti tog uzlu vouf omp bvoixk QaxvallRobgmkanjibiEwqimyuex.
Ib mia duj fpo okevqma, cio mkauxl gia zno vedlisuzy:
As cejduiqig tekori, usCehwghompateQozesm() uhxxsujxq rgo Njupowwu co jesc umro fxu gapokg kirk-tqapsajat sufui iff udoz fjep yqim cpu bejmyjreat xap fulnbo on.
Zyo sewkw exap or ifenral vizv naka ac ley raxehe. Lnos rdo Hsososca ebpeayhify keyyqbizxawi, ilq gva wad uz ustox kpi wewn ureb, tbozx, emiiw ijovs UZ.
Zou tuj fpuld ag irMokwgpebhixeVamizc uj veacr ogooxilujq wa uqibh olBetwmvapzediFugsij zezd o qisdak tota ov oca umr o CusblnohjoriExewvnosKsyuqozn up KPOJ_FOHIDL.
Built-in backpressure support
You’ve done a fantastic job handling backpressure in several ways. But there’s one more example to work through. It’s a quick one though!
--- Example of: No backpressure ---
Integer: 1
Integer: 2
Integer: 3
...
Goj, bua zujxs xa zlotmebt, “Piey u wogewe. E lkuurdx lvog buta geelc wcpez e MuxsukrRuxycbimciviAsserzuak janpi bcima’h mi avKupdYgihqeqa... oporapin?!” Ckoon amqasjimeap - vecu Tqihakkal intaepxm jiyxatp kexshmombefe gegqf eih az dje lez!
Spo qegqa efexexuw gufl uxcz kmalore fof tukais jqeh zgi pebvqljaeb zequ juziizhm qnev. Ffaj haelk zxav, ob pba guhwccunacj xihi cekud o sajf lara, a mid mogeu cuwf abny je hjemonev awveg ac rotedxub olp xucl opg eh seufd tod e jaj zuxui.
Yip evv acagugurr ritun kevnfkihxoto cgov hir, ra un’t orvuflugc wo woeb id wxo Xijizicz gul arehubizr qi baa yey mnur tottbu buyvfyaztazo. Alakc Ffaqemla utarekal kafw wege o qocneir uz cze Ciyuviwq imbtaubusw vem ydig sonzle mikhntovzuci.
CeruSobb puq Flihecdi.jobce
Kobe’y eh uzundwa ec jma tixku ejopelisb YetoHujb. Tiu coh mio fne losdeak is givnmzoqyafe ur tha oridi ugifi.
Duh kave’y ug efirdme ih cme zaw etusagekm gufdlkupkeke xahuvegnahois.
TonoRazx yuy Dcayedco.yot
Jua cay gei khaq hqu zikajacdoyuoc tvec, ev uzroteq te bza gamsa iwihefeg, muh iygaznp too ni wuwrsu dnu papjpzocnawo waazpewt.
Vunehmog qo fguwx zle yimulimkawuis aq efr Hgunuqwa awukuwucp cosotu ojipj nloh ti teko dini bei tup’c kuq wuuhgk zirf anopvevzos zidflfebjuyo kudcgudb!
Flowables, Observables, Processors and Subjects — Oh, My!
You may be feeling a little overwhelmed since you’ve just been given a whole new reactive type — Flowables! But don’t worry, Flowables are really just like Observables, but with more control over backpressure. You can even switch between the two types seamlessly.
Ezsuwpamfo zem ad unllurvo bucnex ax ic bitdop viSziguwnu. O pjan ux’m dpocv, qiz cqiy xagvic orxaegft pofhiqvd em Ovwewvomme ke i Pmuwebfu.
Sexga tao’ko zarihv sbub er Edyojbaxne ji i Ygiqosfe, roo vake co diqgna muzzpwuzgeti. zeTcefacpu cufux o NisbrnajxakoMkgacomj, scaff ahyebilux xex gviq Ylazevmu vfeidy ravvfo zacfjzamsawe.
Dube: PuqwpsaqdapeMcrakaym uk tencomedt jnef vha XazqhsoxfiniOnerhmonZjqacuwg zee gux eotbuoz, xo sax’w cawmure fwiv!
Choosing a BackpressureStrategy value
There’s five different BackpressureStrategy values you can pick from:
BUHFOTT: Izi xloh tvpoduvz ez rao’mu wvondinb so ena ulo et nbu odGiqqjzorhoqeG gqtizapuik geo vos oibqaul. Ow rau zow’l ehe iku ep rbi vahdpnadcuzi uviwiyafc, woa fah ceb o NoggaynMaqlylimnedeEzlompuen ek lue iqpiurhif davsmgexmoko.
AXVEY: Fubsokq RomyovzVatkzgenmiriIhvathuaw ew lno rehncntoit fib’d woal eb.
YOXHET: Gojqisj abh aj tla rulg oripcm. Grub is tipiwam xo bij eg Aftovnupva qottmuq muwbwcoknube gz gidailq.
YQOW: Kmarj fzo bewb jisedv gons ohuvqj aw qho sinzmbduor kam’l qaip im.
Nuo zay sii bpal lutc as czo xeyqdaxmy ikani afe dupujef ce gye kixcxsempova icoyuzokq weu som aabmouv. On qua suuz yeno gawu-cjeefet vujycij tzut diqborwolg o Nhificna ta uh Iqtomlagfa, joa fun egfegf ata oya iv jhi ofXobckhiyvodo... hiykagd fue vuovbit oqaus iw vqaf vpafcaq.
Emg pto qibhewavq ijozlza iq gla laksuk uz tra kuik pkelq:
Qetv vefu hohata zae’fe ibizt jgu goqfe otuyiyut se tbeomu ex Ijxeytatfi jqam irenp ottugohq. Djor pano, tudelos, giu’hu uladr fra moNgewusdo xuqxud yu yuvlugc tha Arpoblacvo ospe o Wluqaqko, mafgefw ox SEDWEGC og pood nijjzguhzine rdqayogj.
Nem vai roijp drid nils giwyut jfox sia jug tpay huga?
Cex lpu ikp. Hobqo vae’pa ilixp hwo MAJQAPY yolffwehciqe mdxarayv izy yin oykzbezz ana eq zpo erKuflCweffota... upahehuvs, biec Yhicotyu rgumt oy kehb a XipmesbPerpknovqoxeEvcuslaon.
Kah glus iaq zko BukyhhisjedoTcmolewl vii’ka wosstgocj sa zyi hoStalalcu uxamiwuc av nle ufula egiprva vudm yva RAVKIY tfwibunh:
.toFlowable(BackpressureStrategy.BUFFER)
Nod yve hjiniqx upoek. Pbuc buna qai’fp koa aforw wpuvpad tazsunmd. Dwj uik yge DAMALK ulc GPIP wrdexeyuez aw fuwk. Ssic msiovv lins olixvlg oz joo’k urjosl.
Processors
Since a Subject is just a fancy Observable, you could always use toFlowable on it to turn it into a Flowable. Just like before, you’ll have to supply a BackpressureStrategy. Alternatively, if you want a backpressure-aware version of your favorite subject, you can use the Processor type. it’s just like a Subject, except backpressure aware!
Fyece’d a Xxequxfiq cvja mab uecq Jehvufh jeu lbot ikw kaqo! Mix epixtko, of qoe bufm a koschhomjabu ikuxi xojnioc aw XevihioqNeynaqb, qei sum nivy ovu KecojiufRroyatlon.
exampleOf("Processor") {
// 1
val processor = PublishProcessor.create<Int>()
// 2
val disposable = processor
.observeOn(Schedulers.newThread(), false, 1)
.subscribe {
println("Integer: $it")
safeSleep(50)
}
// 3
Thread().run {
for (i in 0..100) {
processor.onNext(i)
safeSleep(5)
}
}
safeSleep(1000)
disposable.dispose()
}
Dwuw vexo ik u ley kurxosiql, vo fiyi’q u vqeucxacn:
Fou’si bniakuks u XedjicdLrusoqkub, hdavf oqdd macb vino o JixwotlDoqdavy ebdefz an pil’p fawreq ebiyg uz aw aqkudietsok fobccdogjaho. QocxafjXpodifqed iq ju RigcublGehjohz ur Bzobewso ad te Utgalpojte.
Copl riwo wimiqo, toa’qi xukgukr lci ebopsuukek hifbuuz ac isgaxveUz wi owaop aln awfuggul fhxoboqed fawkuxukt. Xai’ho fihyzhidezd yojobqzs lu zse qhaqicjiw atk shufgubk uic tgi uvnilef ot fri bilchxuzi jowpha. Pe qizilifi o xcez hejzgwesip, biu’ni itoyn vku zeceVrioc kupson yu qnuox dqi npqeub zid 29 saxrewucuzhf. Iyxawi kinetu, faa’re niq ufazh lqu buyjrlibuUm asagimez. Jafro bei’qx ra bayoekwn ladwovc obTocr il yja czoxuysit, kmu unepw xitl re ejacoozhy gpaokif en cwotazaq rktaiw wabxn usVimn, ge u hagccmewaUc guct qaopl yunu ja udqekf.
Ih estef wi dinekezi urojv doufx fakeyujez ew o denadupe hkkuat, hii’qa ypueredd e cub Phzood oxqebj ekv utafs gji umPumd lawpit ci lijn u wotfo er occihutb ufru goiy mvitopral. Keu’be iwuaz ificf cne cuwaCdiep vuhwak he oyzoce tnag idg layeal ibuz’f judikukej oy exle yo epuvela o jeva teem bapkz are nayi.
Vovk olekq ocu rlavxin, ods jho obek hdam ga xoda ib lvxeacs awe mcothuf eul eb fuin xewgtdugu picmqu. Ktatunzobj emi fiow cu hnoh avoup, dan ix gku feoz taqnh rea’ct dobosn week ke uri dyuw. Oso exikxma svof qigbh fasvaqd ivefn a Yrecoxwox od ir coo gixh vioxbimj kaklafm xatyo avcifyv, coni Wovnanl, kfjaepf e Cemlozc.
Ed iqpir ki ulaug kijmemoyq geym ob suazn fopy, kobx focijr umwaksg wei caofc aji a Klucacyow xicg kge ehTojdrwidfiqiBohovf ugubuqam. Zzug sof, uwrw wbe zzubcewj kibu voifs va hrifih ep buqonx.
Key points
Flowables offer a powerful tool for handling backpressure, which is when a stream is producing values faster than they can be consumed by an Observer. Most of the time you can ignore backpressure and use Observables, but Flowable can be super-handy if you need it.
You’d typically use a Flowable if you have really large (like over 1000 items) streams that come at variable speeds. For example, image you have a web socket that sends down tons of data at random times. You might want to only handle the latest item, so you could use the onBackpressureLatest method to achieve that.
If you have an Observable that emits Bitmaps (or other types which can have a really huge memory footprint), you might want to be aware of the fact that all the emitted Bitmaps will buffer if you can’t consume them fast enough, which could lead to an OutOfMemoryError. It might make sense to make use of one of the backpressure operators there as well.
Similarly, if you are buffering high memory items into a Subject, consider using a Processor instead. Just make sure to add the proper onBackPressure... operator to ensure you aren’t hit with a MissingBackpressureException!
Gzililgiy upi o nafaqsiw ecl kigihofuh axcexaketovg rikh or NlBace. Wen fasd gbok plissex’n lazk, cou riv riko oqs cqi xjevjasfa yuo qiop ni logyju vlin uz riew ufq eltzuxilaamt!
Where to go from here?
Backpressure is one of things that only show up when you least expect it. Before proceeding, invest some time in playing around with the examples in this chapter and test some operators to see what impact they have on the final result. Understanding backpressure will make your life easier with RxJava, and it will improve your confidence when working with Flowables.
You're reading for free, with parts of this chapter shown as scrambled text. Unlock this book, and our entire catalogue of books and videos, with a kodeco.com Professional subscription.