Java 9 Yenilikleri-3-Reactive Stream (Flow API)

2 5.806

JDK 9 ile gelen yenilikleri incelemeye devam ediyoruz. Serinin önceki makalelerine aşağıdaki bağlantılar ile erişebilirsiniz.

Java 9 Yenilikleri-1-Ne Oldu?

Java 9 Yenilikleri-2-Modüler Java

Java 9 Yenilikleri-2-Modüler Java-2

Bu makale boyunca Java 9 ile gelen Flow API’ye dair öğrendiklerimi sizler ile paylaşmaya çalışacağım.

Öncelikle Flow API ve reactive stream’in ne olduğuna ve anahtar kavramlarına bakalım. Flow API reactive stream standartının JDK içerisine eklenmesinden daha fazla birşey değildir. Ne demek şimdi bu? Biraz açalım. Hali hazırda reactive stream işleyen, RxJava, Akka, Vert.x vb araç veya kütüphaneler mevcut. Bunların altında imzası olan mühendisler tarafından asgari bir reactive stream standartı belirleniyor ve bir manifesto yayınlanıyor. JEP 266 ile java içerisine eklenmiş haline de Flow API deniyor 🙂 (Aslında reactive-stream.jar ve ilgli jarlar ile JDK 8’e de ekleyebiliriz). Reaktif stream standartının temelini de tabiki reaktif manifesto oluşturuyor. Bu konuyu Alper hali hazırda işlemeye devam etmekte. O yüzden biz oraya hiç girmeyelim sonra bizi döver möver 🙂 Şimdiye kadar bakmadıysanız bağlantıları aşağıda.

Reactive Programming – Reactive Programlama Nedir?

Reactive Programming – Reactive Manifesto

Bu standart çerçevesi içerisinde reaktif stream şöyle tanımlanıyor :

asynchronous stream processing with non-blocking back pressure.

JDK 9 içerisinde bulunan Flow API, java.util.concurrent.Flow sınıfı içerisinde bulunan 4 interface ve SubmissionPublisher isimli sınıftan oluşur. Flow API dinamik Push/Pull temelli Publish-Subscribe yaklaşımı üzerine geliştirilmiştir. Java, Publisher ve Subscriber arasındaki asenkron iletişimi Executor ve CompletableFuture ile kolaylıkla sağlıyor.

Asenkron stream işlemedeki en önemli sorunlardan biri ise back-pressure. Back-pressure’in ne olduğuna bakalım ama önce farklı bir pencereden. ( Reactive Stream konusu içerisinde kullanılan Stream, pipe, backpressure, flow vb kavramlar açıkcası bana super mario’yu çağrıştırdı :). Evet bu kavramlar akışkan sistem tasarımında kullanılan kavramlar.)

Back-pressure: Bir sistem içerisindeki basıncın, sistemi besleyen basınçtan fazla olmasını tanımlayan  kavramdır. Basınç ile Akış hızı arasında da ters orantı mevcuttur. Yani yüksek basınç düşük akış hızı anlamına geliyor. Buradan yazılım dünyasına gelirsek, bir subscriber’ın veriyi işleme hızı, publisher’ın veriyi gönderme hızından az olduğu durumu tanımlar. (basınç ve hız ters orantılıydı. Verinin yavaş işlenmesi basıncın arttığı anlamına gelir) Bu durumda, subscriber tarafında veri birikmeye başlar.

Zaten Flow API’nin dinamik push/pull temelli bir yaklaşım üzerine inşa edilmesi de bu durumun önüne geçebilmektir. Subscriber, Publisher’dan Subscription’ın request metotunu kullanarak işleyebileceği kadar veri ister(pull), Publisher da bu sayıdaki veriyi gönderir(push).

Şimdi az önce bahsettiğimi arayüzlere ve implemente edilmiş sınıfımıza bakalım.

Flow.Publisher<T> 

Veri akışını, kendisine kayıtlı olan abonelere, abonenin isteği sonrasında gönderir(pull/push). Gönderme işlemi asenkron olmalıdır. Executor ve CompatableFuture bu amaç için kullanılır. (SubmissionPublisher sınıfı Publisher’in implementasyonudur.) Kayıt işlemi için subscribe metotunu kullanır. Bu metot Subscriber tipinde bir parametre alır.

Flow.Subscription

Publisher ve Subscriber arasındaki bağlantıyı sağlar. Subscriber veri isteğini, Subscription’ın request metotu ile yapar. İstediği zaman cancel metotu ile aboneliği bitirebilir.

Flow.Subscriber<T>

Publisher’a abone olup yayımlanan verileri alır. Publisher veriyi subscriber’ın istemesini bekler.(unblocking backpressure).

onSubscribe(Flow.Subscription) → Subscribe işlemi sonrası çağrılan metottur. Bu metot Flow.Subscription tipinde bir parametre bekler. Subscription Publisher içerisinde oluşturulur.

onNext(T) → Yayımlanan verilerin geldiği metottur.

onError(Throwable) → Publisher veya Subscription da meydana gelebilecek bir hata sonrası çağrılan metottur. OnError çağrıldıktan sonra abone başka bir mesaj alamaz.

onComplete() → Başka bir mesaj gönderilmeyeceği durumlarda çağrılır.

Flow.Processor<T,R>

Hem Publisher hem de Subscriber gibi davranan bir interfacedir. Kullanım amacı, bir pipe halinde verinin asıl Publisher’dan alınıp bir veya daha fazla işleme tabi tutulup son Subscriber’a ulaştırılmasıdır.

SubmissionPublisher

Bir Publisher implementasyon sınıfıdır.

Flow API’nin çalışma mantığı

1-Subscriber bir nesne Publisher nesnesine subscribe metotu ile abone olur.

2-Publisher nesnesi kendi Subscription implementasyonundan oluşturduğu nesnesini, Subscriber’in onSubscribe metotuna gönderir.

3- Bu işlem sonrasında hem subscriber da hem de publisher da iletişimleri için kullanacakları Subscription nesnesi oluşur.

Artık örneğimize geçebiliriz. Örneğimizi Eclipse Oxygen 4.7.1a ve JDK-9.0.1 ile yapacağız.

1* Eclipse’de yeni bir java projesi oluşturalım.

2* Message isimli bir sınıf yapalım. Publisherdan Subscribera bu sınıf örneklerini göndereceğiz.

3* Subscriber implementasyon örneğimizi yapalım. Açıklamaları kod üzerinde yaptım.

4* Uygulamamızı çalıştıracağımız aynı zamanda stream ve publisher’i oluşturacağımız sınıfımızı yapalım.

Uygulamamızı çalıştırdığımızda aşağıdaki çıktıyı almış olmalıyız.

Gördüğümüz gibi commonPool içinden bir thread ile çalışarak asenkron gönderimi sağladı. MainClass içerisine bir subscriber örneği daha yapıp publishera abone edelim.

Çıktımız;

bu sefer iki thread ile çalıştığını görebiliriz.

Bu örneğimize bir processor ekleyelim. Processor sınıfımız message’in description değerini upper case yapsın.

Önce Processor interface’inden UpperProcessor isimli bir implementasyon yapalım.

SubscriberImp sınıfımıza oldukça benziyor. İki fark mevcut.

1* Bir function interface örneği içeriyor. UpperProcessor sınıfından bir nesne örneği oluşturuduğumuzda, functional interface’in yapacağı işi contructor’a geçeceğiz.

2* onNext metotunda functional interface’imizin yapacağı iş yapılıyor ve submit ile bir sonrakine gönderiliyor sonuç

MainClass’ımızda aşağıdaki gibi oldu.

Çalıştırdığımızda aşağıdaki çıktıyı almış olacağız.

Basit bir örnek ile Flow API’yi incelemeye çalıştık. Neden Publisher interface’inden bir implementasyon yapmadığımız sorusu aklınıza gelmiştir muhtemelen. Ben çalışırken gelmişti. Var olan dışından bir publisher implementasyonu pek önerilen bir yöntem değil. Çünkü sadece metotları implemente etmek ile olmuyor. Yukarıda linkini verdiğim reactive stream spesifikasyonuna uygun olması gerekmekte. Bunun için Reactive Streams Technology Compatibility Kit isimli bir test aracı var. Bu arac ile yaptığınız implementasyonun spesifikasyona uygunluğuna bakıyorsunuz. Eğer ömür vefa eder aklımızda yeterse inşallah bir publisher implementasyon makalesi yazarız. Umarım faydalı bir yazı olmuştur.

Gayret bizden, tevfik Allah’tan.

 

2 Comments
  1. Birol Berkem says

    Etap etap ve acik anlatiminiz icin cok tesekkurler ; yillar once programlama konusundan uzaklasmama ragmen orneklerle anlatiminiz ve ilgili yorumlariniz takip edebilmem için cok faydali oldu. Gelecek makalelerinizi de insaallah bekliyoruz.

    1. Volkan Özdemir says

      Hem yorumunuz hem de vakit ayırıp okuduğunuz için teşekker ederim.

Email adresiniz yayınlanmayacaktır.