ExecutorService
ExecutorService, asenkron işler çalıştırmamızı kolaylaştıran JDK tarafından sunulan bir interface’dir. Bu interface aracılığı ile, thread seviyesinde ele almamız gereken işler ile (Thread oluşturma, hata sonucu sonlanan threadi tekrar çalıştırma vb) ilgilenmeden Thread havuzu oluştururuz.
ExecutorService örneği oluşturmanın iki farklı yöntemi vardır. Bunlardan ilki Executors sınıfında bulunan static factory metotlardır. İkincisi ise direk ThreadPoolExecutor sınıfının yapılandırıcısını kullanmaktır. Executors sınıfının metotlarına bakarsak, ThreadPoolExecutor yapılandırıcısının kullanıldığını görürürüz. ThreadPoolExecutor yapılandırıcısını kullanmak bize daha fazla esneklik sağlar. Her iki yöntemi incelemeye başlayalım.
Executors Sınıfının Static Metotları İle Tanımlama
1*Executors.newFixedThreadPool(int nThreads)
1 2 3 4 5 |
ExecutorService executor = Executors.newFixedThreadPool(10); |
Dahilinde azami 10 thread barındıran bir thread havuzu ve thread havuzunu besleyen, yani çalıştırılması için gönderilen işleri tutan üst limiti olmayan bir queue oluşturulur. Static metotun koduna bakar isek, ThreadPoolExecutor yapılandırıcısını görürüz.
1 2 3 4 5 6 7 |
return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); |
Bu metot aracılığı ile bir ExecutorService örneği oluşturuduğumuzda, threadler hemen yaratılmaz. İlk iş geldiğinde bir tane yaratılır. İkinci iş geldiğinde, eğer maksimum sayıya ulaşılmadıysa, boşta thread olsa bile yeni thread oluşturulur.
2*Executors.newCachedThreadPool();
1 2 3 4 5 |
ExecutorService executor2 = Executors.newCachedThreadPool(); |
Belirsiz sayılı bir thread havuzu oluşturur. Bir task geldiğinde yeni bir thread oluşturur ve bu thread 60 saniye boyunca havuzda kalır. Yeni bir iş geldiğinde, eğer boşta bir thread var ise, o kullanılır. Boşta thread yok ise yaratılır.
Bu metotun implementasyonuna bakar isek;
1 2 3 4 5 6 7 |
return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); |
newFixedThreadPool da olduğu gibi ThreadPoolExecutor’ı görürüz.
3*Executors.newSingleThreadExecutor()
1 2 3 4 5 |
ExecutorService executor3 = Executors.newSingleThreadExecutor(); |
Tek bir thread oluşturur ve birim zamanda tek bir iş yürütür. İş esnasında hata oluştuğunda, yeni gelen iş için yeni bir thread oluşturur. Bu metot ile oluşturulan ExecutorService’in newFixedThreadExecutor(1) metotu sonucu oluşan örnekte farkı, konfigure edilememesidir. FinalizableDelegatedExecutorService’e dikkat.
Bu metotun implementasyonu;
1 2 3 4 5 6 7 8 |
return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); |
ThreadPoolExecutor Sınıfının Yapılandırıcısı İle Tanımlama
Overload edilmiş bir kaç yapılandırıcı mevcuttur. İki tanesi üzerinden, ThreadPoolExecutor yapılandırıcısında kullanılan parametreleri inceleyelim.
1 2 3 4 5 6 7 |
new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue) new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler) |
corePoolSize: Havuzda tutulacak asgari thread sayısıdır. (allowCoreThreadTimeOut değeri set edilmediği sürece)
maximumPoolSize: Havuzdaki azami thread sayısını belirleyen parametre.
keepAliveTime: corePoolSize değerinin üzerindeki threadlerin ne kadar süre daha havuz içerisinde kalacağını belirleyen parametre.
unit: keepAliveTime değerinin birimini belirleyen parametre
workQueue: Taskların çalıştırılmadan önce tutuldukları queue’yu belirleyen parametre.
handler: Eğer üst sınırı olan bir workQueue ile çalışıyor ve bu queue kapasitesini doldurmuş ise, yeni gelen tasklar geri çevrilir . Handler parametresi ile RejectedExecutionHandler tipinde bir sınıf örneği veririz.
1 2 3 4 5 |
ExecutorService executor5 = new ThreadPoolExecutor(1, 5, 30L, TimeUnit.SECONDS, new ArrayBlockingQueue<>(1024)); |
Örneğimizde, üst sınırı 1024 eleman olan bir queue kullanan threadPoolExecutor oluşturduk. Asgari 1 azami 5 thread olacak ve asgari sayı üzerindeki thread’ler 30 saniye sonunda bitirilecek.
ExecutorService’e İş Atama
ExecutorService execute, submit, invokeAny ve invokeAll isimli metotlar ile Runnable ve Callable örneklerini iş olarak alır ve çalıştırır.
execute: Execute metotu, Runnable tipinde bir iş alan ve geri dönüş değeri olmayan bir metottur. Atanan işin yapılıp yapılmadığı hakkında bir fikir vermez. Aşağıdaki gibi basit bir örnek ile execute metotunu kullanalım.
1 2 3 4 5 6 7 8 9 10 11 |
Runnable runnable1 = new Runnable() { @Override public void run() { System.out.println("Runnable1"); } }; executor.execute(runnable1); |
Bu basit örneğimizden sonra, ThreadPooExecutor yapılandırıcısı ile iki service oluşturalım. Ve bahsettiğimiz parametrelerin nasıl davrandığını inceleyelim.
MyRunnable isimli bir sınıf oluşturacağız. Bu sınıfımızdan oluşturacağımız işleri executor’a göndereceğiz. Ayrıca işlerin reject durumlarını işleyen RejectedExecutionHandler interface’ini implemente eden RejectedExecutionHandlerImp sınıfını oluşturacağız.
MyRunnable.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 |
public class MyRunnable implements Runnable { private int number; private Exception e; public MyRunnable(int n) { number = n; } @Override public void run() { System.out.println(number+" nolu task isleniyor ->"+" ThreadName: "+Thread.currentThread().getName()); try { Thread.sleep(2000); } catch (Exception e) { this.e = e; } } public int getNumber() { return number; } public Exception getE() { return e; } } |
RejectedExecutionHandlerImp.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
public class RejectedExecutionHandlerImp implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { MyRunnable m = (MyRunnable)r; System.out.println("REJECTED TASK: "+m.getNumber()); Thread.sleep(3000); executor.execute(r); } catch (Exception e) { e.printStackTrace(); } } } |
İlk ThreadPoolExecuter örneğimizi oluşturalım.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public class MainClass { public static void main(String[] args) throws Exception{ ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(10) , new RejectedExecutionHandlerImp()); for(int i = 1; i<20;i++) { System.out.println(i+" nolu task gitti"); pool.execute(new MyRunnable(i)); } } } |
Servisimizin core thread sayısı 2, azami thread sayısı ise 4’dür. Kapasitesi 10 olan bir queue servisimiz tarafından kullanılacak. Kodumuzun çıktısı:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
1 nolu task gitti 2 nolu task gitti 3 nolu task gitti 4 nolu task gitti 1 nolu task isleniyor -> ThreadName: pool-1-thread-1 5 nolu task gitti 2 nolu task isleniyor -> ThreadName: pool-1-thread-2 6 nolu task gitti 7 nolu task gitti 8 nolu task gitti 9 nolu task gitti 10 nolu task gitti 11 nolu task gitti 12 nolu task gitti 13 nolu task gitti 14 nolu task gitti 13 nolu task isleniyor -> ThreadName: pool-1-thread-3 15 nolu task gitti 14 nolu task isleniyor -> ThreadName: pool-1-thread-4 REJECTED TASK: 15 3 nolu task isleniyor -> ThreadName: pool-1-thread-1 5 nolu task isleniyor -> ThreadName: pool-1-thread-4 4 nolu task isleniyor -> ThreadName: pool-1-thread-3 6 nolu task isleniyor -> ThreadName: pool-1-thread-2 16 nolu task gitti 17 nolu task gitti 18 nolu task gitti 19 nolu task gitti REJECTED TASK: 19 7 nolu task isleniyor -> ThreadName: pool-1-thread-4 8 nolu task isleniyor -> ThreadName: pool-1-thread-1 10 nolu task isleniyor -> ThreadName: pool-1-thread-3 9 nolu task isleniyor -> ThreadName: pool-1-thread-2 11 nolu task isleniyor -> ThreadName: pool-1-thread-3 16 nolu task isleniyor -> ThreadName: pool-1-thread-1 15 nolu task isleniyor -> ThreadName: pool-1-thread-2 12 nolu task isleniyor -> ThreadName: pool-1-thread-4 17 nolu task isleniyor -> ThreadName: pool-1-thread-4 19 nolu task isleniyor -> ThreadName: pool-1-thread-2 18 nolu task isleniyor -> ThreadName: pool-1-thread-1 |
2 adet işimiz ilk etapta servis tarafından geri çevrildi. Çünkü queue’nun kapasitesi doldu. Ayrıca azami thread sayısına ulaştık.
Şimdi ikinci örneğimize bakalım. Bu örneğimizde, üst sınırı olmayan bir queue örneği kullandık.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
public class MainClass { public static void main(String[] args) throws Exception{ ThreadPoolExecutor pool = new ThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>() , new RejectedExecutionHandlerImp()); for(int i = 1; i<20;i++) { System.out.println(i+" nolu task gitti"); pool.execute(new MyRunnable(i)); } } } |
Kodun çıktısı:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 |
1 nolu task gitti 2 nolu task gitti 3 nolu task gitti 4 nolu task gitti 1 nolu task isleniyor -> ThreadName: pool-1-thread-1 2 nolu task isleniyor -> ThreadName: pool-1-thread-2 5 nolu task gitti 6 nolu task gitti 7 nolu task gitti 8 nolu task gitti 9 nolu task gitti 10 nolu task gitti 11 nolu task gitti 12 nolu task gitti 13 nolu task gitti 14 nolu task gitti 15 nolu task gitti 16 nolu task gitti 17 nolu task gitti 18 nolu task gitti 19 nolu task gitti 3 nolu task isleniyor -> ThreadName: pool-1-thread-1 4 nolu task isleniyor -> ThreadName: pool-1-thread-2 5 nolu task isleniyor -> ThreadName: pool-1-thread-1 6 nolu task isleniyor -> ThreadName: pool-1-thread-2 7 nolu task isleniyor -> ThreadName: pool-1-thread-1 8 nolu task isleniyor -> ThreadName: pool-1-thread-2 10 nolu task isleniyor -> ThreadName: pool-1-thread-1 9 nolu task isleniyor -> ThreadName: pool-1-thread-2 11 nolu task isleniyor -> ThreadName: pool-1-thread-1 12 nolu task isleniyor -> ThreadName: pool-1-thread-2 13 nolu task isleniyor -> ThreadName: pool-1-thread-1 14 nolu task isleniyor -> ThreadName: pool-1-thread-2 15 nolu task isleniyor -> ThreadName: pool-1-thread-2 16 nolu task isleniyor -> ThreadName: pool-1-thread-1 18 nolu task isleniyor -> ThreadName: pool-1-thread-2 17 nolu task isleniyor -> ThreadName: pool-1-thread-1 19 nolu task isleniyor -> ThreadName: pool-1-thread-2 |
Üst limiti olmayan bir queue kullandığımızda, sadece coreThread sayısı kadar thread kullanıldı. Azami sayı kadar thread oluşturulmadı. Sanırım bu örnek ile parametreler daha anlaşılır olmuştur.
submit: Runnable ve Callable tiplerindeki işleri alır ve Future tipinde bir sonuç döner.
Runnable ile çalışan örneklerimizi yapalım. İlk örneğimiz;
submit(Runnable r, T result) metotu olacak. Future nesnesini döner. Metota geçilen ilk parametre çalıştırılacak kod parçasını içeren Runnable nesnesidir. İkincisi ise, geri dönüş değeridir. Atanan iş başarılı bir şekilde biterse, geri dönüş nesnesi olan Future’ın get metotu, bize parametre olarak geçtiğimiz değeri verir.
1 2 3 4 5 6 7 8 |
Runnable runnable2 = () -> System.out.println("Runnable2"); String result = "Success"; Future<String> resultEntity = executor2.submit(runnable2, result); System.out.println(resultEntity.get()); |
Kodun çıktısı aşağıdaki gibi olur. Result parametresi olarak Success gönderdik, iş başarı ile bittiğinden Future’ın get metotu Success değerini döndürdü.
1 2 3 4 5 6 |
Runnable2 Success |
İkinci örneğimiz ise;
submit(Runnable r) metotudur. Bu metotta geriye Future döner. Task başarılı ise, get metotu null döner.
1 2 3 4 5 6 7 |
Runnable runnable2 = () -> System.out.println("Runnable2"); Future<?> result = executor2.submit(runnable2); System.out.println(result.get()); |
Bir diğer overload submit metotu Callable tipinde iş alan metottur.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
Callable<String> callable = new Callable<String>() { @Override public String call() throws Exception { System.out.println("Callable"); return "Basarili"; } }; Future<String> taskResult = executor3.submit(callable); System.out.println(taskResult.get()); |
Kodun çıktısı:
1 2 3 4 5 6 |
Callable Basarili |
invokeAll ve invokeAny, her iki metotta bir callable collection alır. InvokeAny, başarılı olan bir iş için result döner, invokeAll ise Future koleksiyonu ile tüm işler için sonuç döner.
Shutting Down
ExecutorService kendiliğinden bitirilmez. Aldığı işler bitse bile, yeni işleri bekler. ExecutorService’i kapatmak için shutdown ve shutdownNow metotlarını kullanırız.
Shutdown metotu, servisi hemen kapatmaz. İlk olarak yeni işleri almayı durdurur yürütülen işler bittiğinde de kapatır.
ShutdownNow metotu ise, hemen kapatır. O sırada çalışan işleri ise, bir liste halinde döner.
ExecutorService kapatma konusunda, önerilen yöntem ise, her iki metotu awaitTermination metotunuda ekleyerek beraber kullanmaktır.
1 2 3 4 5 6 7 8 9 10 11 12 |
executorService.shutdown(); try { if (!executorService.awaitTermination(5000, TimeUnit.MILLISECONDS)) { executorService.shutdownNow(); } } catch (InterruptedException e) { executorService.shutdownNow(); } |
İlk olarak yeni iş alımını engelledik. Sonrasında 5000 milisaniye servisin kapatılmasını bekliyoruz. Eğer kapatılmadı ise, shuwdownNow ile hemen kapatıyoruz.
İnşallah faydalı bir yazı olmuştur.
İyi Çalışmalar
Teşekkürler, emeğinize sağlık
thnx dostumm, yardimci oldun