RxJS’e Merhaba Deyin
Son dönemlerde RxJS kelimesini sıkça duyduğunuzu ve “Nedir bu RxJS?” dediğinizi tahmin edebiliyorum. Bu yazı ile RxJS konsepti hakkında bilmeniz gereken temel kavramlar hakkında bilgi vermeye çalışacağım.
Son dönemlerde RxJS
kelimesini sıkça duyduğunuzu ve “Nedir bu RxJS?” dediğinizi tahmin edebiliyorum. Bu yazı ile RxJS konsepti hakkında bilmeniz gereken temel kavramlar hakkında bilgi vermeye çalışacağım.
Stream(Akış)
Verilerin bütün olarak değilde küçük parçalar halinde transfer etmeyi sağlayan bir yapıdır.
Observable Nedir?
Observable: zaman içerisinde gelebilecek bir veri akışını veya kaynağını temsil eder. Neredeyse her şeyden Observable yaratabilirsiniz. RxJs’te Observable
lar genellikle olaylardan (butona veya sayfaya tıklama, arama kutusuna değer yazma veya uygulama sayfasının değişmesi vb.) oluşturulur. Observable
yaratmanın en kolay yolu; RxJS’te bulunan built-in observable
yaratma fonksiyonları kullanmaktır. Örneğin; sayfada mousenin tıklama olaylarından observabe yaratmak için yardımcı fonksiyon olan fromEvent
kullanılabilir.
Observable
için bir kaç karakteristik özelliklere sahip fonksiyon diyebiliriz. İçinde next
,error
, complate
metotlarına sahip bir nesne olan bir observer
alır ve iptal mantığını döndürür.
Bir observable
; observer’ı ayarlar ve değerlerini almak istediğimiz şeye
bağlar. Bu şey
üretici(producer
) olarak adlandırılır. Bu şey
bazen; DOM’deki bir tıklama veya textbox’a yazma işlemi, bir değer kaynağıdır. Hatta HTTP üzerinden iletişim gibi daha karmaşık bir şey olabilir.
Observable için aşağıdaki örneği bakalım:
1 2 3 4 5 6 7 8 9 |
// fromEvent operatörünü import ediyoruz import { fromEvent } from 'rxjs'; // Sayfa tıklaması için Observable yaratıyoruz const tiklama$ = fromEvent(document, 'click'); |
fromEvent
operatörü ile sayfa üzerindeki tıklama olayından observable
oluşturuyoruz. Sayfa üzerinde her hangi bir yere tıkladığınızda bir işlem gerçekleşmiyor. Bu aslında stream
den gelecek değerler için bir tanımlama veya bir taslak. Bir observable
; abonelik yapıldıktan sonra değer yaymaya başlar. Peki nedir bu abonelik?
Subscription(Abonelik)
Subscription
; her şeyi harekete geçiren işlemdir. Bir dokunuşla (subscribe
) akmaya başlayan içinde bira bulunduran, musluk gibi düşünebilirsiniz. Birinin sadece musluğu çevirmesi gerekiyor. Musluktan bira akışının gözlenebilir(observable
) yapılması rolü aboneye(subscriber
) aittir.
Bir subscription
yaratmak için subscribe()
metodunu çağırmak gerekir. observer
olarak da bilinen, bir fonksiyon (veya nesne) sağlayan subscribe()
metodunun çağrılması gerekir. Reactive Programlamada; her olaya nasıl tepki vereceğine karar verebileceğiniz yer burasıdır. Yukarıda yazdığımız kodda bir adım daha ilerleyelim. tiklama$
observableni subscribe yaparak; sayfa üzerinde yapılan tüm akışları görebiliyoruz artık.
1 2 3 4 5 6 7 8 9 10 11 12 |
// fromEvent operatörünü import ediyoruz import { fromEvent } from 'rxjs'; // Sayfa tıklaması için Observable yaratıyoruz const tiklama$ = fromEvent(document, 'click'); // observable abone olup. tüm tıklamaları konsola logluyoruz tiklama$.subscribe(event => console.log(event)); |
Yukarıdaki örnekte tiklama$.subscribe()
:
- Her bir mouse tıklaması için bir
event listener
yaratacak - Her mouse tıklama işleminden sonra
subscribe
metoduna (observer) tıklama olayını geçirecek. - Temizleme mantığı içeren bir abonelikten çıkma(
unsubscribe
) ile bir subscription nesnesi döndürür.
Subscriptionda next
, error
, complate
kullanımı ile ilgili örnek:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// fromEvent operatörünü import ediyoruz import { fromEvent } from 'rxjs'; // Sayfa tıklaması için Observable yaratıyoruz const tiklama$ = fromEvent(document, 'click'); // observable abone olup. tüm tıklamaları konsola logluyoruz tiklama$.subscribe({ // başarılı yayılma işleminde next: event => console.log(event), // hata durumunda error: error => console.log(error), // tamamlanma durmunda complete: () => console.log('Tamamlandı!') }); // tiklama$.unsubscribe(); |
Önemli not: Her bir subscription işlemi yeni bir tane execution context
yaratır. Bunun anlamı; subscribe
metodunu cağrılmasından çok kısa bir zaman sonra yeni bir event listener
yaratılır. Bizim örneğimizde; sayfaya yapılan her tıklamada yeni bir execution context
yaratılır.
subscription
varsayılan olarak, observable
ile observer
arasında bire bir, tek taraflı bir konuşma oluşturur. Bu durumu: Takım liderinizin (observable
), yazdığınız kodda hata olması durumunda size (observer
) bağırması (emitting
) gibi düşünebiliriz. Bu aynı zamanda tek yönlü yayın olarak da bilinir(Televizyon veya Radyo yayını). Bir konferans konuşma senaryosunu tercih ederseniz(bir observable
, birçok observer
arasında olan). Subjects
ile çok noktaya yayın(açık veya açık olmayan) içeren farklı bir yaklaşım tercih edebilirsiniz. Başka bir yazıda Subjects hakkında detaylı bilgi vereceğim.
Operatörler
Operatörler, bir kaynaktan gelen değerleri manipüle etmemize yardımcı olurlar, dönüştürülen değerlerin observable
değerini döndürür. JavaScript Array
metotlarına aşina iseniz, RxJS operatörlerinin çoğu size tanıdık gelecektir.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
// iihtiyacımız olan operatörleri import ediyoruz import { of } from 'rxjs'; import { map } from 'rxjs/operators'; /* * 'of' operatörü değerleri sıralı olarak dağıtmanıza yardımcı olur * Bu örnekte, "RxJS", "operatörler", "harika", "Dostum", "...." değerlerini sırasıyla yayar */ const dataSource = of("RxJS", "operatörler", "harika", "Dostum", "...."); // kaynak observable a subscription işlemi yapıyoruz const subscription = dataSource .pipe( // yayılan tüm değerlere 3 tane "!" ekliyor map(value => value + "!!!") ) .subscribe(value => console.log(value)); |
Bazı değerleri filtrelemek istiyorsanız, filter
operatörünü kullanın:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
// iihtiyacımız olan operatörleri import ediyoruz import { of } from 'rxjs'; import { filter } from 'rxjs/operators'; /* * 'of' operatörü değerleri sıralı olarak dağıtmanıza yardımcı olur * Bu örnekte, "RxJS", "operatörler", "harika", "Dostum", "...." değerlerini sırasıyla yayar */ const dataSource = of("RxJS", "operatörler", "harika", "Dostum", "...."); // kaynak observable a subscription işlemi yapıyoruz const subscription = dataSource .pipe( // yayılan "Dostum" değerini filtreliyoruz. filter(value => value !== "Dostum") ) // log: 2, 3, 4, 5, 6 .subscribe(value => console.log(value)); |
Uygulama geliştirirken çözmeniz gereken bir çok sorun için, RxJS’te sorunuzun çözümü için mutlaka bir operatör vardır. İlk başlarda operatörlerle çalışırken zorlanabilirsiniz ama kullandıkça sevecek ve yavaş yavaş bu konuda uzmanlaşacaksınız. RxJS’te bulunan operatörler konusunda ilerde detaylı bir yazı yazmayı planlıyorum. Beni takip ederseniz bu güzellikleri kaçırmazsınız.
Pipe
Pipe
fonksiyonu, observable
veri kaynağınızdaki operatörler ile montaj hattıdır. Tıpkı bir arabanın bitmiş halinin fabrikadan çıkmadan önce uğradığı bir dizi üretim bandı gibi düşünebilirsiniz. Kaynaktan gelen veriler, ihtiyaca göre filtreleme, dönüştürülme gibi süreçlerden geçebilir. Pipe
fonksiyonunda, observable
zinciri içinde 3 (veya daha fazla) fonksiyon kullanmak olağan dışı bir durum değildir. Örneğin observable
ile oluşturulmuş bir arama
işleminde yapılan işlemleri optimize etmek için birden çok operatör kullanılabilir. Aşağıdaki örneğe bakalım:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
console.clear(); import { fromEvent, of } from 'rxjs'; import { debounceTime, distinctUntilChanged, map, switchMap, tap } from 'rxjs/operators'; const aIleBaslayanSehirler = keys => [ 'ankara', 'adana', 'ardahan', 'adıyaman', 'afyon karahisar', 'artvin', 'aydın' ].filter(e => e.indexOf(keys.toLowerCase()) > -1); const observable$ = keys => of(aIleBaslayanSehirler(keys)) .pipe( tap(_ => console.log(`Arama işlemi: ${new Date()} de yapıldı`)) ); fromEvent(document.getElementById('arama'), 'keyup') .pipe( // 200ms bekleme debounceTime(200), map((e: any) => e.target.value), // aynı değer girilirse göz ardı edecektir. "a" harfine basılı tutunca peşi sıra "a" harfi yazılmayacak distinctUntilChanged(), // bir istek sonuçlanmadan yeni bir arama yapılırsa bir önceki istek iptal edilecek switchMap(observable$), // gelen değer client tarafında işlenip, ekrana yazılıyor tap(c => document.getElementById('cikti').innerText = c.join('\n')) ).subscribe(); |
Peki hangi operatörün kullanım senaryonuza uygun olduğunu nereden bileceksiniz?
Operatörler Ortak Kategoride gruplanabilir
Hangi operatörü kullanacağınızı belirlemek için ilk yapmanız gereken iş, ilgili kategoriyi belirlemektir. Bir kaynaktan gelen veriyi filtrelemek mi istiyorsunuz? O zaman filtreleme
operatörlerini kullanın.
Filtreleme Operatörleri:
audit
,auditTime
,debounce
,debounceTime
,distinct
,distinctKey
,
distinctUntilChanged
,distinctUntilKeyChanged
,elementAt
,filter
,
first
,ignoreElements
,last
,sample
,sampleTime
,single
,skip
,
Bir hatayı izlemeye veya observable streamdeki veri akışında bulunan hatayı ayıklamaya mı çalışıyorsunuz? Bu iş için utility
operatörlerini kullanın.
Utility Operatörleri:
tap
,delay
,delayWhen
,dematerialize
,
Observable yaratma operatörleri:
Bu operatörler, herhangi bir şeyden observable
yaratmanıza imkan verir. Jenerikten, özel kullanım durumlara kadar her şeyi bir streame dönüştürmekte özgürsünüz. Örneğin, kullanıcının bir sayfada bulunan yazı da kaydırma çubuğu ile aşağıya doğru ilerlediğini düşünün. Bu aşağı kayma işlemini fromEvent
operatörü ile bir streame dönüştürebilirsiniz.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import { fromEvent } from 'rxjs'; import { throttleTime, tap } from 'rxjs/operators'; const mesaj = document.getElementById('mesaj'); const mesajiYaz = _ => mesaj.innerText += ' Kayiyorum :)'; // sayfanın aşağı yukarı kaymasından bir observable oluşturuluyor. fromEvent(document, 'scroll') .pipe( throttleTime(300), tap(mesajiYaz) ) .subscribe(); |
Birleştirme Operatörleri:
Birleştirme operatörleri, bir çok observable
’dan gelen bilgilerin birleştirilmesine izin verir. Yayılan değerlerin sırası, zamanı ve yapısı, bu operatörler arasındaki birincil varyasyondur. En çok kullanılan birleştirme operatörleri: combineLatest
, concat
, merge
, startWith
ve withLatestFrom
.
Örneğin; hesaplama işlemi yaparken, birden çok kaynak gelen güncel verileri birleştirebiliriz.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
import { combineLatest, of } from 'rxjs'; import { map } from 'rxjs/operators'; const kaynakBir$ = of(10, 20, 30, 40, 50); const kaynakIki$ = of(1, 2, 3); const sonuc$ = combineLatest(kaynakBir$, kaynakIki$).pipe( map(([a, b]) => a*b) ); sonuc$.subscribe(x => console.log('Sonuc:' + x)); |
Hata İşleme Operatörleri:
Hata işleme operatörleri, hataları incelikle ele almaya ve yeniden denemeye olanak sağlar sağlarlar. Aşağıdaki örnekte bölme işleminde, sıfıra bölüm durumunda çıkan hatayı işleme gösterilmiştir. En yaygın kullanılan hata işleme operatörü catchError
’dür.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
import { of } from 'rxjs'; import { map, catchError } from 'rxjs/operators'; of(8, 6, 4, 2, 0).pipe( map(n => { if (n === 2) throw 'hata' return n / (n - 2); }), catchError(err => of('Sıfıra', 'Bölünme', 'Hatası')), ) .subscribe(x => console.log(x)); |
Filtreleme Operatörleri:
Filtreleme operatörleri, observable
bir kaynaktan gelen değerleri olduğu gibi alma, azaltma ya da bir akış içinde değerlerin birikmesine olanak sağlar. En yaygın kullanılan filtreleme operatörleri şunlardır: debounceTime
, distinctUntilChanged
, filter
,take
ve takeUntil
.
Aşağıdaki örnekte take()
operatörü kullanılarak kaynaktan gelen ilk 3 değer alınıyor.
1 2 3 4 5 6 7 8 9 10 11 |
import { interval } from 'rxjs'; import { take } from 'rxjs/operators'; const sayac$ = interval(1000); // take ile kaynak'tan gelen ilk 3 değeri alıyoruz. ilk 3 değer alındıktan sonra abonelik sona eriyor const ilkUcSayi = sayac$.pipe(take(3)); ilkUcSayi.subscribe(x => console.log(x)); |
Birden çok noktaya yayın(MultiCasting) operatörleri:
RxJS’te observable
lar varsayılan olarak, tek noktaya yayın mantığı ile çalışır. Yani abone başına bir kaynak şeklinde. Multicasting operatörleri, ortaya çıkabilecek yan etkileri birden çok abone arasında paylaştırmanıza olanak sağlar. Yaygın olarak kullanılan Multicasting operatörleri: shareReplay
ve share
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
import { timer } from 'rxjs'; import { tap, mapTo, share } from 'rxjs/operators'; const sayac$ = timer(2000); const sonuc = sayac$.pipe( tap(() => console.log('***Yan Etkiler***')), mapTo('SONUÇ') ); const subscribe = sonuc.subscribe(console.log); const sonucuPaylas = sonuc.pipe(share()); sonucuPaylas.subscribe(console.log); sonucuPaylas.subscribe(console.log); sonucuPaylas.subscribe(console.log); sonucuPaylas.subscribe(console.log); sonucuPaylas.subscribe(console.log); |
Dönüştürme Operatörleri:
RxJS operatör zincirinden geçerken, değerleri dönüştürmek sıklıkla yapılan bir iştir. Bu operatörler, ihtiyaç duyduğunuz her kullanım durumu için dönüştürme konusunda size yardımcı olurlar. En yaygın kullanılan dönüştürme operatörleri: concatMap
, map
, mergeMap
, scan
ve switchMap
.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
import { of } from 'rxjs'; import { concatMap,delay } from 'rxjs/operators'; const kaynak$ = of('Merhaba', 'Dünya'); const concatMapOrnek$ = kaynak$ .pipe ( delay(2000), //concatMap(val => of(`${val}!`)), concatMap(val => `${val}!`) ); const subscribe = concatMapOrnek$ .subscribe(console.log); |
KAPANIŞ
RxJS ilk kod geliştirmek ilk başlarda gözünü korkutabilir o yüzden hemen pes etmeyin. Uğraştıkça seveceksiniz.
Bir sonraki RxJS yazısında görüşmek üzere.