RxJS’e yakından bakış -2: Subjects
Bu yazıda RxJS’in özel bir Observable
türü olan Subject
’e yakından bakacağız. Subject
, nedir? ne işe yarar? hırlı mıdır? hırsız mıdır? Hepsini bu yazının sonunda öğrenmiş olacağız.
Bir Önceki Yazılara ulaşmak için burayı tıklayınız
https://bilisim.io/2019/10/08/rxjse-merhaba-deyin/
https://bilisim.io/2019/10/20/rxjse-yakindan-bakis-1-observables/
Subject
, birbirinden bağımsız Observer’lara verileri yaymak(multicast
) için kullanılan tiptir. Observable’ler varsayılan olarak tek bir observer’a verileri yayar(unicast
).
Observable’ı şu şekilde düşünebiliriz. Yazılım takım liderinizin, yazdığınız kodda çıkan hata yüzünden size bağırmasıdır. Burada Yazılım takım lideri observable
, siz ise observer
’ınızdır.
Subject
’i ise şu şekilde düşünebiliriz. Yazılım takım liderinizin ekibinin yaptığı işi beğenmeyerek tüm takıma aynı anda bağırması gibidir. Burada yazılım takım lideri Subject
, takım üyeleri birbirinden bağımsız oberver
’dır.
Her subject bir observable’dır. Var olan bir subject’e yayılan değerleri almak için bir observer ile abone olabilirsiniz. Observer açısından baktığınızda kendisine gelen verilerin, tek noktaya yayın yapan bir observable
’dan mı yoksa subject
’ten mi gelip gelmediği bilenemez. Subject’te bulunan subscribe
fonksiyonu cağrıldığında, veri sağlayan yeni bir yürütme çağırmaz. Kendisine verilen observer’ı, Observer’ların listesine kayıt eder.
Her subject bir observer’dır. İçinde next(v)
, error(e)
ve complete()
metotları olan bir nesnedir. Subject’e yeni bir değer sağlamak için next(deger)
metodunu çağırın ve bu sağlanan yeni değer, kendisini dinlemek için kayıt olan observer’lara iletilecektir(multicasted).
Aşağıdaki örnekte Subject’e eklenen iki adet observer bulunmaktadır. Subject’i bir kaç yeni değer ile besliyoruz. https://stackblitz.com/edit/rxjs-subject-1?devtoolsheight=100
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
import { Subject } from 'rxjs'; const subject = new Subject<number>(); // Subjecte kayıt olan birinci Observer subject.subscribe({ next: (v) => console.log(`observerA: ${v}`) }); // Subjecte kayıt olan ikinci Observer subject.subscribe({ next: (v) => console.log(`observerB: ${v}`) }); subject.next(1); // subjecte verilen 1 değeri A ve B observerlar'ına yayıldı subject.next(2); // subjecte verilen 2 değeri A ve B observerlar'ına yayıldı |
Subject aynı zamanda bir observer olduğu için, Bir observable’a yapılan her hangi bir abonelik işleminde subject’i argüman verebilirsiniz. Aşağıdaki örnekte bunu görebilirsiniz. https://stackblitz.com/edit/rxjs-subject-2?devtoolsheight=100
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
import { Subject, from } from 'rxjs'; const subject = new Subject<number>(); subject.subscribe({ next: (v) => console.log(`observerA: ${v}`) }); subject.subscribe({ next: (v) => console.log(`observerB: ${v}`) }); const observable = from([1, 2, 3]); // diziden observable oluşturuluyor. observable.subscribe(subject); // Subjecti argüman olarak kullanıp abonelik işlemi yapılıyor |
Yukarıdaki örnekte, tek bir observer’a yayın yapan observable’ı, çok noktaya yayın yapan bir duruma getirmiş olduk. Bir nevi Observable, subject’e dönüştü diyebiliriz.
Kendi alanında uzman 3 farklı Subject türü mevcuttur. Bunlar: BehaviorSubject
, ReplaySubject
ve AsyncSubject
.
Observables Radyo : FM 103.1
Multicasted Observable
: Birden çok aboneye sahip olan Subject’e bildirimleri iletir.
Multicasted Observable
: Birden çok observer’a, aynı observable değerini göstermek için bir Subject kullanır.
Aşağıdaki örnekte multicast
operatörü ile bu işlemin nasıl yapıldığını görebilirsiniz. https://stackblitz.com/edit/t3xwp5?devtoolsheight=100
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
import { from, Subject } from 'rxjs'; import { multicast } from 'rxjs/operators'; const source = from([1, 2, 3]); const subject = new Subject(); const multicasted = source.pipe(multicast(subject)); // aşağıdak aslında `subject.subscribe({...})`: işlemleri yapılmaktadır multicasted.subscribe({ next: (v) => console.log(`observerA: ${v}`) }); multicasted.subscribe({ next: (v) => console.log(`observerB: ${v}`) }); // `source.subscribe(subject)` işlemi yapılmaktadır. multicasted.connect(); |
Yukarıdaki örnekğin çalışma mantığı şu şekildedir: Observerlar, Subject’e abone olur, Subject’e kaynak Observable’a abone olur.
multicast
fonksiyonu; Abonelik işlemine başlandığında, Subject gibi davranan bir Observable’a döner. multicast
fonksiyonu içinde connect() metodu barındıran “ConnectableObservable” tipinde bir observable döner.
connect()
metodu,paylaşılan Observable çalışmasının, tam olarak ne zaman başlayacağını belirlemek için önemlidir. connect() metodu çağırıldığında aslında içeride source.subscribe(subject)
çağrılmaktadır. connect()
bir Subscription döndüğünden abonelikten çıkma(unsubscribe) işlemi yapılabilir.
connect()
metodunu manüel olarak çağırmak ve abonelik işlemleriyle uğraşmak genellikle zor ve uğraştırıcıdır.
Aşağıdaki örnekte connect()
manüel çağrılıp, işi biten observer’lar için unsubcribe
işlemleri açıktan yapılıyor. https://stackblitz.com/edit/hakawv?devtoolsheight=100
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
import { interval, Subject } from 'rxjs'; import { multicast } from 'rxjs/operators'; const source = interval(1000); const subject = new Subject(); const multicasted = source.pipe(multicast(subject)); let subscription1, subscription2, subscriptionConnect; subscription1 = multicasted.subscribe({ next: (v) => console.log(`observerA: ${v}`) }); // ilk abone kendine kayıt olduğu için connect() metodunu çağırmalıyız. subscriptionConnect = multicasted.connect(); // işlem başladıktan 1 saniye sonra 2. abone geliyor setTimeout(() => { subscription2 = multicasted.subscribe({ next: (v) => console.log(`observerB: ${v}`) }); }, 1200); // iki saniye sonra Birinci abone yayından çıkıyor setTimeout(() => { subscription1.unsubscribe(); }, 2400); // 2 saniye sonra en sona abone yayından çıkıyor // tüm aboner yayından çıktığı için yayını kapatmalıyız setTimeout(() => { subscription2.unsubscribe(); subscriptionConnect.unsubscribe(); // paylaşılan observer'dan unsubscribe yapılıyor }, 4000); |
Yukarıdaki örneğimizde her bir saniyede bir değer üreten source
observable’ını ve bir adet subject
yaratıp; multicast operatörünü kullanarak multicasted observable
’ını yaratıyoruz . Daha sonra akış sırası şu şekilde oluyor:
- Birinci observer olan
subscripton1
,multicasted
Observable’ına abone olur. multicasted
Observable’ısubscriptionConnect
değişkeni ileconnect()
metodu çağrılarak yayına başlar.- Bir saniye sonra
0
değerisubscripton1
için sunulur. - İkinci observer olan
subscripton2
, iki saniye sonramulticasted
Observable’ına abone olur. 1
değerisubscripton1
vesubscripton2
observerlarına sunulur.- Yayın başladıktan dört saniye sonra
subscripton1
yayından ayrılır 2
değeri sadecesubscripton2
ye sunulur.- Yayın başladıktan dört saniye sonra
subscripton2
de yayından ayrılır. subscriptionConnect
observable’ı unscribe yapılarak yayın sonlandırılır.
Yukarıdaki işlemlerin otomatik olması için refCount
operatörü yardımımıza koşar. Aşağıdaki örneği inceleyelim: https://stackblitz.com/edit/zgg3qd?devtoolsheight=100
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 |
import { interval, Subject } from 'rxjs'; import { multicast, refCount } from 'rxjs/operators'; const source = interval(1000); const subject = new Subject(); const refCounted = source.pipe(multicast(subject), refCount()); let subscription1, subscription2; // Birinci Observer abone olur olmaz connect() methodu çağrılıyor console.log('observerA subscribed'); subscription1 = refCounted.subscribe({ next: (v) => console.log(`observerA: ${v}`) }); setTimeout(() => { console.log('observerB subscribed'); subscription2 = refCounted.subscribe({ next: (v) => console.log(`observerB: ${v}`) }); }, 1200); setTimeout(() => { console.log('observerA unsubscribed'); subscription1.unsubscribe(); }, 2400); // son abone yayından çıkar çıkmaz. yapılan yayın otomatik olarak duracaktır. setTimeout(() => { console.log('observerB unsubscribed'); subscription2.unsubscribe(); }, 4000); |
Yukarıdaki örnekte ilk abone yayına dahil olur olmaz, yayın otomatik olarak başlıyor. En son abone yayından çıkınca, yayın otomatik olarak bitirilir.
BehaviorSubject
BehaviorSubject; şimdiki/geçerli değer
kavramına sahip olan bir subject türüdür. En son yayılan değeri hafızada tutar. Yeni bir Observer abonelik yapınca bu değer anında BehaviorSubject
aboneye iletilir. BehaviorSubject, yaratılırken bir başlangıç değeri alır.
BehaviorSubjects,
zaman içindeki değerleri
temsil etmek için kullanışlıdır. Örneğin, doğum günlerinin bir olay akışı Subject’tir, ancak bir kişinin yaş akışı bir BehaviorSubject olacaktır.
https://stackblitz.com/edit/vma4rn?devtoolsheight=100
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
import { BehaviorSubject } from 'rxjs'; const subject = new BehaviorSubject(0); // başlangıç değeri subject.subscribe({ next: (v) => console.log(`observerA: ${v}`) }); subject.next(1); subject.next(2); setTimeout(() => { subject.subscribe({ next: (v) => console.log(`observerB: ${v}`) }); }, 1000); setTimeout(() => { subject.next(3); }, 2000); |
Yukarıdaki örnekte sıfır
başlangıç değeri ile “BehaviorSubject” yaratılıyor. Daha sonra birinci observer, subject’e abone oluyor. 1
ve 2
değerleri birinci observer’a sunuluyor. Süreç başladıktan bir saniye sonra ikinci observer, subject’e abone oluyor ve anında 2
değeri kendisine sunuluyor. Süreç başladıktan iki saniye sonra 3
değeri her iki observer’a sunuluyor.
ReplaySubject
ReplaySubject;
BehaviorSubject
’le benzer işi yapmaktadır. BehaviorSubject son değeri yeni katılan aboneye sunarken, ReplaySubject ise yaratılırken kendisine verilen hafızada tutulacak değer sayısı kadar değeri yeni aboneye iletir. https://stackblitz.com/edit/8xpxmi?devtoolsheight=100
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
import { ReplaySubject } from 'rxjs'; const subject = new ReplaySubject(3); // buffer 3 values for new subscribers subject.subscribe({ next: (v) => console.log(`observerA: ${v}`) }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); setTimeout(()=> subject.subscribe({ next: (v) => console.log(`observerB: ${v}`) }),2000); setTimeout(()=>subject.next(5),3000); |
Yukarıdaki örnekte sadece son üç değeri tutacak şekilde bir ReplaySubject yaratılıyor. İlk observer abonelik yaptıktan sonra 1
,2
,3
,4
değerleri kendisine sunuluyor. Süreç başladıktan iki saniye sonra ikinci observer abonelik işlemi yapıyor. kendisine 2
, 3
,4
değerleri anında sunuluyor. Süreç başladıktan üç saniye sonra 5
değeri her iki observer’a sunuluyor.
AsyncSubject
Süreç tamamlandıktan sonra en son değeri tüm observer’lara gönderen bir Subject türüdür. https://stackblitz.com/edit/95xh7d?devtoolsheight=100
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 |
import { AsyncSubject } from 'rxjs'; const subject = new AsyncSubject(); subject.subscribe({ next: (v) => console.log(`observerA: ${v}`) }); subject.next(1); subject.next(2); subject.next(3); subject.next(4); subject.subscribe({ next: (v) => console.log(`observerB: ${v}`) }); subject.next(5); subject.complete(); |
Yukarıdaki örnekte yaratılan AsyncSubject’e ilk observer abone olduktan sonra 1
, 2
,3
ve 4
değerleri üretiliyor. Daha sonra ikinci observer abone olduktan sonra, 5
değeri üretilip complete()
metodu çağrılıyor. Bu işlemin sonunda sadece 5
değeri her iki obserbver’a sunuluyor.