Java 9 Yenilikleri-3-Reactive Stream (Flow API)
JDK 9 ile gelen yenilikleri incelemeye devam ediyoruz. Serinin önceki makalelerine aşağıdaki bağlantılar ile erişebilirsiniz.
Java 9 Yenilikleri-2-Modüler Java
Java 9 Yenilikleri-2-Modüler Java-2
Bu makale boyunca Java 9 ile gelen Flow API’ye dair öğrendiklerimi sizler ile paylaşmaya çalışacağım.
Öncelikle Flow API ve reactive stream’in ne olduğuna ve anahtar kavramlarına bakalım. Flow API reactive stream standartının JDK içerisine eklenmesinden daha fazla birşey değildir. Ne demek şimdi bu? Biraz açalım. Hali hazırda reactive stream işleyen, RxJava, Akka, Vert.x vb araç veya kütüphaneler mevcut. Bunların altında imzası olan mühendisler tarafından asgari bir reactive stream standartı belirleniyor ve bir manifesto yayınlanıyor. JEP 266 ile java içerisine eklenmiş haline de Flow API deniyor 🙂 (Aslında reactive-stream.jar ve ilgli jarlar ile JDK 8’e de ekleyebiliriz). Reaktif stream standartının temelini de tabiki reaktif manifesto oluşturuyor. Bu konuyu Alper hali hazırda işlemeye devam etmekte. O yüzden biz oraya hiç girmeyelim sonra bizi döver möver 🙂 Şimdiye kadar bakmadıysanız bağlantıları aşağıda.
Reactive Programming – Reactive Programlama Nedir?
Reactive Programming – Reactive Manifesto
Bu standart çerçevesi içerisinde reaktif stream şöyle tanımlanıyor :
asynchronous stream processing with non-blocking back pressure.
JDK 9 içerisinde bulunan Flow API, java.util.concurrent.Flow sınıfı içerisinde bulunan 4 interface ve SubmissionPublisher isimli sınıftan oluşur. Flow API dinamik Push/Pull temelli Publish-Subscribe yaklaşımı üzerine geliştirilmiştir. Java, Publisher ve Subscriber arasındaki asenkron iletişimi Executor ve CompletableFuture ile kolaylıkla sağlıyor.
Asenkron stream işlemedeki en önemli sorunlardan biri ise back-pressure. Back-pressure’in ne olduğuna bakalım ama önce farklı bir pencereden. ( Reactive Stream konusu içerisinde kullanılan Stream, pipe, backpressure, flow vb kavramlar açıkcası bana super mario’yu çağrıştırdı :). Evet bu kavramlar akışkan sistem tasarımında kullanılan kavramlar.)
Back-pressure: Bir sistem içerisindeki basıncın, sistemi besleyen basınçtan fazla olmasını tanımlayan kavramdır. Basınç ile Akış hızı arasında da ters orantı mevcuttur. Yani yüksek basınç düşük akış hızı anlamına geliyor. Buradan yazılım dünyasına gelirsek, bir subscriber’ın veriyi işleme hızı, publisher’ın veriyi gönderme hızından az olduğu durumu tanımlar. (basınç ve hız ters orantılıydı. Verinin yavaş işlenmesi basıncın arttığı anlamına gelir) Bu durumda, subscriber tarafında veri birikmeye başlar.
Zaten Flow API’nin dinamik push/pull temelli bir yaklaşım üzerine inşa edilmesi de bu durumun önüne geçebilmektir. Subscriber, Publisher’dan Subscription’ın request metotunu kullanarak işleyebileceği kadar veri ister(pull), Publisher da bu sayıdaki veriyi gönderir(push).
Şimdi az önce bahsettiğimi arayüzlere ve implemente edilmiş sınıfımıza bakalım.
Flow.Publisher<T>
1 2 3 4 5 6 7 8 |
@FunctionalInterface public static interface Publisher<T> { public void subscribe(Subscriber<? super T> subscriber); } |
Veri akışını, kendisine kayıtlı olan abonelere, abonenin isteği sonrasında gönderir(pull/push). Gönderme işlemi asenkron olmalıdır. Executor ve CompatableFuture bu amaç için kullanılır. (SubmissionPublisher sınıfı Publisher’in implementasyonudur.) Kayıt işlemi için subscribe metotunu kullanır. Bu metot Subscriber tipinde bir parametre alır.
Flow.Subscription
1 2 3 4 5 6 7 8 |
public static interface Subscription { public void request(long n); public void cancel(); } |
Publisher ve Subscriber arasındaki bağlantıyı sağlar. Subscriber veri isteğini, Subscription’ın request metotu ile yapar. İstediği zaman cancel metotu ile aboneliği bitirebilir.
Flow.Subscriber<T>
1 2 3 4 5 6 7 8 9 10 |
public static interface Subscriber<T> { public void onSubscribe(Subscription subscription); public void onNext(T item); public void onError(Throwable throwable); public void onComplete(); } |
Publisher’a abone olup yayımlanan verileri alır. Publisher veriyi subscriber’ın istemesini bekler.(unblocking backpressure).
onSubscribe(Flow.Subscription) → Subscribe işlemi sonrası çağrılan metottur. Bu metot Flow.Subscription tipinde bir parametre bekler. Subscription Publisher içerisinde oluşturulur.
onNext(T) → Yayımlanan verilerin geldiği metottur.
onError(Throwable) → Publisher veya Subscription da meydana gelebilecek bir hata sonrası çağrılan metottur. OnError çağrıldıktan sonra abone başka bir mesaj alamaz.
onComplete() → Başka bir mesaj gönderilmeyeceği durumlarda çağrılır.
Flow.Processor<T,R>
1 2 3 4 5 6 |
public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> { } |
Hem Publisher hem de Subscriber gibi davranan bir interfacedir. Kullanım amacı, bir pipe halinde verinin asıl Publisher’dan alınıp bir veya daha fazla işleme tabi tutulup son Subscriber’a ulaştırılmasıdır.
SubmissionPublisher
Bir Publisher implementasyon sınıfıdır.
Flow API’nin çalışma mantığı
1-Subscriber bir nesne Publisher nesnesine subscribe metotu ile abone olur.
2-Publisher nesnesi kendi Subscription implementasyonundan oluşturduğu nesnesini, Subscriber’in onSubscribe metotuna gönderir.
3- Bu işlem sonrasında hem subscriber da hem de publisher da iletişimleri için kullanacakları Subscription nesnesi oluşur.
Artık örneğimize geçebiliriz. Örneğimizi Eclipse Oxygen 4.7.1a ve JDK-9.0.1 ile yapacağız.
1* Eclipse’de yeni bir java projesi oluşturalım.
2* Message isimli bir sınıf yapalım. Publisherdan Subscribera bu sınıf örneklerini göndereceğiz.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
package io.bilisim.flow; public class Message { private long id; private String description; public Message(long id, String description) { super(); this.id = id; this.description = description; } public long getId() { return id; } public void setId(long id) { this.id = id; } public String getDescription() { return description; } public void setDescription(String description) { this.description = description; } @Override public String toString() { return "Id : "+this.id+" Description: "+this.description; } } |
3* Subscriber implementasyon örneğimizi yapalım. Açıklamaları kod üzerinde yaptım.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
package io.bilisim.flow; import java.text.MessageFormat; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; public class SubscriberImp implements Subscriber<Message> { /* * Publisherda yaratılıp, onSubscribe metotu ile gonderilecek olan subscription örneği. * İletişim bunun üzerinden devam ettirilecek */ private Subscription subscription; private final static long REQUEST_COUNT = 3;//publisherdan bir seferde istenecek mesaj sayısı private long proceedRequest = 0;//işlenen mesaj sayısı @Override public void onSubscribe(Subscription arg0) { System.out.println("Abonelik Başladı"); //local subscription orneğimizi set ettik. subscription = arg0; /* * REQUEST_COUNT kadar mesaj istegimizi Publisher'a gonderdik. * Eger burada istek yapmaz isek, abone olmuş olsak bile herhangi bir mesaj alamayız */ subscription.request(REQUEST_COUNT); } @Override public void onNext(Message message) { if(message != null) { proceedRequest++;//işlenen mesajı bir arttırıyoruz System.out.println("Message: "+message.toString()+" "+Thread.currentThread().getName()); //Request ettiğimiz kadar veriyi işlediysek yeni bir istek yapıyoruz. //burada yeniden istek yapmaz isek, Publisher ilk istekden sonra başka mesaj göndermez. if(proceedRequest == REQUEST_COUNT) { proceedRequest = 0; subscription.request(REQUEST_COUNT); System.out.println(MessageFormat.format("Sıradaki {0} message istendi", REQUEST_COUNT)); } } } @Override public void onComplete() { System.out.println("Abonelik Sonlandı! "+Thread.currentThread().getName()); } @Override public void onError(Throwable arg0) { arg0.printStackTrace(System.err); } } |
4* Uygulamamızı çalıştıracağımız aynı zamanda stream ve publisher’i oluşturacağımız sınıfımızı yapalım.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
package io.bilisim.flow; import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.TimeUnit; import java.util.stream.LongStream; public class MainClass { public static void main(String[] args) { //Publisher implementasyonu olan SubmissionPublisher sınıfından bir ornek olusturduk. SubmissionPublisher<Message> publisher = new SubmissionPublisher<Message>(); //SubscriberImp sınıfımızdan bir örnek oluşturduk. SubscriberImp subscriberImp = new SubscriberImp(); //Publisher'a örneğimizi abone ettik. publisher.subscribe(subscriberImp); //Stream oluşturduk ve her bir değer için bir mesaj instance'i yaptık LongStream.range(0, 5).mapToObj(i -> new Message(i,"Message -> "+i)).forEach(message -> { /* * publisher ile oluşan message örneğimizi subscribera gonderdik. * Gonderim asenkron olduğundan, subscribera tüm mesajların geldiğini görebilmek için * her gönderim sonraso 1 saniye bekledik. */ publisher.submit(message); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }); publisher.close(); } } |
Uygulamamızı çalıştırdığımızda aşağıdaki çıktıyı almış olmalıyız.
1 2 3 4 5 6 7 8 9 10 11 12 |
Abonelik Başladı Message: Id : 0 Description: Message -> 0 ForkJoinPool.commonPool-worker-1 Message: Id : 1 Description: Message -> 1 ForkJoinPool.commonPool-worker-1 Message: Id : 2 Description: Message -> 2 ForkJoinPool.commonPool-worker-1 Sıradaki 3 message istendi Message: Id : 3 Description: Message -> 3 ForkJoinPool.commonPool-worker-1 Message: Id : 4 Description: Message -> 4 ForkJoinPool.commonPool-worker-1 Abonelik Sonlandı! ForkJoinPool.commonPool-worker-1 |
Gördüğümüz gibi commonPool içinden bir thread ile çalışarak asenkron gönderimi sağladı. MainClass içerisine bir subscriber örneği daha yapıp publishera abone edelim.
1 2 3 4 5 6 |
SubscriberImp subscriberImp2 = new SubscriberImp(); publisher.subscribe(subscriberImp2); |
Çıktımız;
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 |
Abonelik Başladı Abonelik Başladı Message: Id : 0 Description: Message -> 0 ForkJoinPool.commonPool-worker-3 Message: Id : 0 Description: Message -> 0 ForkJoinPool.commonPool-worker-2 Message: Id : 1 Description: Message -> 1 ForkJoinPool.commonPool-worker-2 Message: Id : 1 Description: Message -> 1 ForkJoinPool.commonPool-worker-3 Message: Id : 2 Description: Message -> 2 ForkJoinPool.commonPool-worker-2 Message: Id : 2 Description: Message -> 2 ForkJoinPool.commonPool-worker-3 Sıradaki 3 message istendi Sıradaki 3 message istendi Message: Id : 3 Description: Message -> 3 ForkJoinPool.commonPool-worker-3 Message: Id : 3 Description: Message -> 3 ForkJoinPool.commonPool-worker-2 Message: Id : 4 Description: Message -> 4 ForkJoinPool.commonPool-worker-2 Message: Id : 4 Description: Message -> 4 ForkJoinPool.commonPool-worker-3 Abonelik Sonlandı! ForkJoinPool.commonPool-worker-3 Abonelik Sonlandı! ForkJoinPool.commonPool-worker-2 |
bu sefer iki thread ile çalıştığını görebiliriz.
Bu örneğimize bir processor ekleyelim. Processor sınıfımız message’in description değerini upper case yapsın.
Önce Processor interface’inden UpperProcessor isimli bir implementasyon yapalım.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
package io.bilisim.flow; import java.util.concurrent.Flow.Subscriber; import java.util.concurrent.Flow.Subscription; import java.text.MessageFormat; import java.util.concurrent.SubmissionPublisher; import java.util.function.Function; public class UpperProcessor extends SubmissionPublisher<Message> implements Subscriber<Message> { //Istedimiz islemi yapacağımız FunctionInterface'imiz private Function<Message, Message> upperFunction; private Subscription subscription; private final static long REQUEST_COUNT = 5; private long proceedRequest = 0; public UpperProcessor(Function<Message, Message> upperFunction) { this.upperFunction = upperFunction; } @Override public void onSubscribe(Subscription arg0) { this.subscription = arg0; subscription.request(REQUEST_COUNT); } @Override public void onNext(Message message) { if(message != null) { proceedRequest++; System.out.println("Processor Message: "+message.toString()+" "+Thread.currentThread().getName()); //FunctionalInterface'imizin apply metotu ile constructorda geçilen kod çalışıyor. SubmissionPublisher'in submit metotu ile //bir sonraki subscriber'a gonderiliyor. submit(upperFunction.apply(message)); if(proceedRequest == REQUEST_COUNT) { proceedRequest = 0; subscription.request(REQUEST_COUNT); System.out.println(MessageFormat.format("Sıradaki {0} message istendi", REQUEST_COUNT)); } } } @Override public void onComplete() { System.out.println("Abonelik Sonlandı! "+Thread.currentThread().getName()); } @Override public void onError(Throwable arg0) { arg0.printStackTrace(System.err); } } |
SubscriberImp sınıfımıza oldukça benziyor. İki fark mevcut.
1* Bir function interface örneği içeriyor. UpperProcessor sınıfından bir nesne örneği oluşturuduğumuzda, functional interface’in yapacağı işi contructor’a geçeceğiz.
2* onNext metotunda functional interface’imizin yapacağı iş yapılıyor ve submit ile bir sonrakine gönderiliyor sonuç
MainClass’ımızda aşağıdaki gibi oldu.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
package io.bilisim.flow; import java.util.concurrent.SubmissionPublisher; import java.util.concurrent.TimeUnit; import java.util.stream.LongStream; public class MainClass { public static void main(String[] args) { //Publisher implementasyonu olan SubmissionPublisher sınıfından bir ornek olusturduk. SubmissionPublisher<Message> publisher = new SubmissionPublisher<Message>(); //SubscriberImp sınıfımızdan bir örnek oluşturduk. SubscriberImp subscriberImp = new SubscriberImp(); //Processor ornegimizi olusturup fonksiyonu set ettik UpperProcessor upperProcessor = new UpperProcessor(message -> { String upper = message.getDescription().toUpperCase(); message.setDescription(upper); return message; }); //publisher'a processor'u subscribe ettik. Processor Interface'i Subscriber'i implemente ediyor. publisher.subscribe(upperProcessor); //upperProcessor'1 subscriberImp'ı subscribe ettik. upperProcessor.subscribe(subscriberImp); LongStream.range(0, 5).mapToObj(i -> new Message(i,"Message -> "+i)).forEach(message -> { publisher.submit(message); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } }); publisher.close(); } } |
Çalıştırdığımızda aşağıdaki çıktıyı almış olacağız.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
Abonelik Başladı Processor Message: Id : 0 Description: Message -> 0 ForkJoinPool.commonPool-worker-3 Message: Id : 0 Description: MESSAGE -> 0 ForkJoinPool.commonPool-worker-3 Processor Message: Id : 1 Description: Message -> 1 ForkJoinPool.commonPool-worker-2 Message: Id : 1 Description: MESSAGE -> 1 ForkJoinPool.commonPool-worker-2 Processor Message: Id : 2 Description: Message -> 2 ForkJoinPool.commonPool-worker-2 Message: Id : 2 Description: MESSAGE -> 2 ForkJoinPool.commonPool-worker-2 Sıradaki 3 message istendi Processor Message: Id : 3 Description: Message -> 3 ForkJoinPool.commonPool-worker-2 Message: Id : 3 Description: MESSAGE -> 3 ForkJoinPool.commonPool-worker-2 Processor Message: Id : 4 Description: Message -> 4 ForkJoinPool.commonPool-worker-3 Message: Id : 4 Description: MESSAGE -> 4 ForkJoinPool.commonPool-worker-2 Sıradaki 5 message istendi Abonelik Sonlandı! ForkJoinPool.commonPool-worker-3 |
Basit bir örnek ile Flow API’yi incelemeye çalıştık. Neden Publisher interface’inden bir implementasyon yapmadığımız sorusu aklınıza gelmiştir muhtemelen. Ben çalışırken gelmişti. Var olan dışından bir publisher implementasyonu pek önerilen bir yöntem değil. Çünkü sadece metotları implemente etmek ile olmuyor. Yukarıda linkini verdiğim reactive stream spesifikasyonuna uygun olması gerekmekte. Bunun için Reactive Streams Technology Compatibility Kit isimli bir test aracı var. Bu arac ile yaptığınız implementasyonun spesifikasyona uygunluğuna bakıyorsunuz. Eğer ömür vefa eder aklımızda yeterse inşallah bir publisher implementasyon makalesi yazarız. Umarım faydalı bir yazı olmuştur.
Gayret bizden, tevfik Allah’tan.
Etap etap ve acik anlatiminiz icin cok tesekkurler ; yillar once programlama konusundan uzaklasmama ragmen orneklerle anlatiminiz ve ilgili yorumlariniz takip edebilmem için cok faydali oldu. Gelecek makalelerinizi de insaallah bekliyoruz.
Hem yorumunuz hem de vakit ayırıp okuduğunuz için teşekker ederim.