Thread Ama Ne Zaman
Bir önceki makalemizde, thread neden kullanılır ve nasıl tanımlanır konularını anlamaya çalıştık. Bu makalemizde ise, thread oluşturmayı gerektiren durumları, bir uygulama üzerinden ve uygulamayı adım adım geliştirerek göreceğiz. Her adım da bir problemi görüp, çözümünü bulacağız. Bulduğumuz çözümler sayesinde de java thread konusunun diğer başlıklarını hep beraber öğreneceğiz.
Bunun için iki java uygulaması yazacağız. İlk uygulama bir sunucu uygulaması olacak. İkinci uygulamamız da sunucuya bağlanan ve aldığı veriyi işleyen bir uygulama olacak.
Sunucu uygulamamız bize yapısal olmayan, virgül ile ayrılmış string tipinde veriler gönderecek. Yapısal olmayandan kastım; sınıflandırması yapılmamış veri.
Veri örneklerimiz aşağıdaki gibi 3,4 ve 5 uzunluklu string ifadeler olacak.
A,B,C
A,B,C,D
A,B,C,D,E
Sunucu uygulamanın örneğimizde yapacağı tek iş bu olacak.
İstemci uygulamamız bu verileri soket üzerinden alacak ve karakter uzunluğuna göre sınıflandıracak.
Sınıflandırma mantığımız ve etiketlerimiz;
3 –> X
4 –> Y
5 –> Z
Etiketlenen her veri için de bir tekil id oluşturacağız. Şimdi kodlamaya geçelim.
İlk olarak sunucu uygulamamızı yapalım. Sunucumuz TCP protokolü ile haberleşen bir uygulama olacak. Java da ağ programlama da kullandığımız sınıflar java.net paketinin altındadır.
Sunucu uygulamamızın kodu aşağıdaki gibi olacak.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 |
package io.bilisim.java.thread.main; import java.io.BufferedWriter; import java.io.IOException; import java.io.OutputStreamWriter; import java.net.ServerSocket; import java.net.Socket; import java.text.MessageFormat; import java.util.ArrayList; import java.util.List; public class SenderStarter { private static List<String> veri; private static final int PORT_NUMBER = 9999; public static void main(String[] args) throws IOException { System.out.println("Sunucu uygulama basladi"); //listemizi dolduruyoruz. veriListesiDoldur(); //sunucu soketimizi olusturduk. 9999 portundan client bekleyecek ServerSocket sunucu = new ServerSocket(PORT_NUMBER); //baglanacak istemcileri bekliyor. Baglanti saglanana kadar bekler. Socket socket = sunucu.accept(); BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream())); for(String v : veri) { System.out.println(MessageFormat.format("{0} veri gonderiliyor...", v)); writer.write(v+"n"); writer.flush(); } writer.close(); socket.close(); sunucu.close(); System.out.println("Sunucu uygulama sonlandi"); } private static void veriListesiDoldur() { veri = new ArrayList<String>(); veri.add("A,B,C"); veri.add("A,B,C,D"); veri.add("A,B,C,D,E"); veri.add("E,F,G"); veri.add("E,F,G,H"); } } |
Şimdi ikinci uygulamamızı geliştirmeye başlayalım. Uygulamamızı problemleri görüp çözümleri uygulayarak geliştireceğimizi belirtmiştik.
Sunucuya bağlanmak için bir Soket sınıfı yazacağız. Aldığımız verileri işleyecek bir DataProcessor sınıfı yazacağız. İşlediğimiz veriyi LabeledEntity örnekleri içerisine ekleyeceğiz.
Soket.java sınıfı:
Soket.java sınıfımız Runnable interface’ini implemente etti. Yani bu sınıfımız artık ayrı bir thread üzerinden çalışacak haldedir.
Sunucuya bağlanıp veriyi alıyoruz. Sonrasında DataProcessor sınıfının proceedData metoduna aldığımız veriyi gönderiyoruz. Metot bittikten sonra var ise bir sonraki veriyi alıyoruz. Bu işlem her veri alındığında tekrarlanıyor.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
package io.bilisim.java.thread.soket; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.Socket; import java.text.MessageFormat; import java.util.Calendar; import io.bilisim.java.thread.processor.DataProcessor; public class Soket implements Runnable { private static final int PORT_NUMBER = 9999; private static final String HOST_NAME = "localhost"; @Override public void run() { Socket client = null; BufferedReader reader = null; try { client = new Socket(HOST_NAME, PORT_NUMBER); reader = new BufferedReader(new InputStreamReader(client.getInputStream())); String data = null; DataProcessor processor = new DataProcessor(); while((data = reader.readLine()) != null) { System.out.println(MessageFormat.format("Gelen Veri: {0}, Gelme Saniyesi: {1}", data,Calendar.getInstance().get(Calendar.SECOND))); processor.proceedData(data); } } catch (IOException e) { e.printStackTrace(); } finally { closeReader(reader); closeSocket(client); } } private void closeReader(BufferedReader reader) { try { if(reader != null) { reader.close(); } } catch (IOException e) { e.printStackTrace(); } } private void closeSocket(Socket soket) { try { if(soket != null) { soket.close(); } } catch (IOException e) { e.printStackTrace(); } } } |
DataProcessor.java sınıfı:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
package io.bilisim.java.thread.processor; import java.text.MessageFormat; import java.util.Calendar; import io.bilisim.java.thread.entity.LabeledEntity; import io.bilisim.java.thread.enums.DataType; public class DataProcessor { public void proceedData(String data) { try { Thread.sleep(2000); String[] dataParts = data.split(","); LabeledEntity entity = null; if(dataParts.length == 3) { System.out.println("X Veri Tipi"); entity = new LabeledEntity(data,DataType.X); } else if(dataParts.length == 4) { System.out.println("Y Veri Tipi"); entity = new LabeledEntity(data,DataType.Y); } else if(dataParts.length == 5) { System.out.println("Z Veri Tipi"); entity = new LabeledEntity(data,DataType.Z); } else { System.out.println("Bilinmeyen "); } System.out.println(MessageFormat.format("{0} islendi. Saniye: {1}", data,Calendar.getInstance().get(Calendar.SECOND))); } catch (InterruptedException e) { e.printStackTrace(); } } } |
proceedData metodu içerisinde veriyi “,” ile split ediyoruz ve oluşan dizinin boyutuna göre sınıflandırma yapıyoruz. Metot içerisine veri üzerinde yapılan işlemlerin tükettiği ortalama bir süre sağlamak için 2 saniye bekleten kodu ekledik.
LabeledEntity sınıfı ve DataType enum’u:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 |
package io.bilisim.java.thread.entity; import io.bilisim.java.thread.enums.DataType; public class LabeledEntity { private String data; private DataType dataType; public LabeledEntity(String data, DataType dataType) { this.data = data; this.dataType = dataType; } public String getData() { return data; } public void setData(String data) { this.data = data; } public DataType getDataType() { return dataType; } public void setDataType(DataType dataType) { this.dataType = dataType; } } |
1 2 3 4 5 6 7 8 9 |
package io.bilisim.java.thread.enums; public enum DataType { X,Y,Z; } |
ClientStarter.java sınıfı:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
package io.bilisim.java.thread.main; import io.bilisim.java.thread.soket.Soket; public class ClientStarter { public static void main(String[] args) { Thread thread = new Thread(new Soket(),"Client"); thread.start(); } } |
Artık uygulamalarımızı çalıştırabiliriz. İlk olarak DataSender isimli sunucu uygulamamızı ardından DataClassifier isimli asıl uygulamamızı çalıştırıyoruz. Sonuç aşağıdaki gibi olacak.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
Gelen Veri: A,B,C, Gelme Saniyesi: 24 X Veri Tipi A,B,C islendi. Saniye: 26 Gelen Veri: A,B,C,D, Gelme Saniyesi: 26 Y Veri Tipi A,B,C,D islendi. Saniye: 28 Gelen Veri: A,B,C,D,E, Gelme Saniyesi: 28 Z Veri Tipi A,B,C,D,E islendi. Saniye: 30 Gelen Veri: E,F,G, Gelme Saniyesi: 30 X Veri Tipi E,F,G islendi. Saniye: 32 Gelen Veri: E,F,G,H, Gelme Saniyesi: 32 Y Veri Tipi E,F,G,H islendi. Saniye: 34 |
Problem:
Uygulamamız 2 saniyede bir yeni veri alabiliyor. Bu durum sunucu tarafında veri birikmesine sebep oluyor. Uygulamamızın veri alma süresinin iyileştirilmesi gerekmekte.
Çözüm:
1-İlk olarak sunucudan veriyi alır almaz bir java koleksiyonuna ekleyelim ve hemen bir sonraki veriyi alalım. Cevap dönme süremizi kısaltalım.
2-Koleksiyon içerisine eklediğimiz verileri okuyabilmesi için , DataProcessor sınıfını ayrı bir thread olarak çalıştıracak kod değişikliğini yapacağız.
Bu değişiklikleri ilgili sınıf isminin sonuna V2 getirerek yapacağız. Geliştirmemiz bittiğinde,
SoketV2.java
DataProcessorV2.java
ClientStarterV2.java
DataQueue.java
sınıflarını oluşturmuş olacağız.
SoketV2.java sınıfı:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
package io.bilisim.java.thread.soket; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.Socket; import java.text.MessageFormat; import java.util.Calendar; import io.bilisim.java.thread.queue.DataQueue; public class SoketV2 implements Runnable { private static final int PORT_NUMBER = 9999; private static final String HOST_NAME = "localhost"; @Override public void run() { Socket client = null; BufferedReader reader = null; try { client = new Socket(HOST_NAME, PORT_NUMBER); reader = new BufferedReader(new InputStreamReader(client.getInputStream())); String data = null; while((data = reader.readLine()) != null) { System.out.println(MessageFormat.format("Gelen Veri: {0}, Gelme Saniyesi: {1}", data,Calendar.getInstance().get(Calendar.SECOND))); DataQueue.add(data); } } catch (IOException e) { e.printStackTrace(); } finally { closeReader(reader); closeSocket(client); } } private void closeReader(BufferedReader reader) { try { if(reader != null) { reader.close(); } } catch (IOException e) { e.printStackTrace(); } } private void closeSocket(Socket soket) { try { if(soket != null) { soket.close(); } } catch (IOException e) { e.printStackTrace(); } } } |
Yaptığımız değişiklik, DataQueue isimli sınıfımızın add metodunu çağıran kodu ekledik, DataProcessor sınıfını ise kaldırdık. Bu şekilde, gelen veri hemen queue ya ekleniyor ve bir sonraki veri alınıyor.
DataProcessorV2.java sınıfı:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
package io.bilisim.java.thread.processor; import java.text.MessageFormat; import java.util.Calendar; import io.bilisim.java.thread.entity.LabeledEntity; import io.bilisim.java.thread.enums.DataType; import io.bilisim.java.thread.queue.DataQueue; public class DataProcessorV2 implements Runnable{ @Override public void run() { while(true) { try { String data = DataQueue.take(); proceedData(data); } catch (Exception e) { e.printStackTrace(); } } } private void proceedData(String data) { try { Thread.sleep(2000); String[] dataParts = data.split(","); LabeledEntity entity = null; if(dataParts.length == 3) { System.out.println("X Veri Tipi"); entity = new LabeledEntity(data,DataType.X); } else if(dataParts.length == 4) { System.out.println("Y Veri Tipi"); entity = new LabeledEntity(data,DataType.Y); } else if(dataParts.length == 5) { System.out.println("Z Veri Tipi"); entity = new LabeledEntity(data,DataType.Z); } else { System.out.println("Bilinmeyen "); } System.out.println(MessageFormat.format("{0} islendi. Saniye: {1}", data,Calendar.getInstance().get(Calendar.SECOND))); } catch (InterruptedException e) { e.printStackTrace(); } } } |
DataProcessorV2 sınıfımız artık ayrı bir thread de çalışacak. Sınıfının run metodunda while ile sonsuz bir döngü oluşturduk. Çünkü uygulama çalıştığı sürece, DataProcessor thread’i sürekli queue ya bakmalı. DealQueue sınıfının açıklamasında, hangi java koleksiyon nesnesini ve kullanma nedenimizi açıklayacağız. proceedData metodumuz da public den private a çevrildi.
DataQueue.java sınıfı:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
package io.bilisim.java.thread.queue; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; public class DataQueue { private static BlockingQueue<String> DATA_QUEUE = new ArrayBlockingQueue<>(1024); public static boolean add(String e) { return DATA_QUEUE.add(e); } public static String take() throws InterruptedException { return DATA_QUEUE.take(); } } |
Bu sınıfımızın içerisine BlockingQueue tipinde bir koleksiyon ekledik. BlockingQueue tipi java.util.concurrent paketinin altında bulunur. Sınıfımıza delegasyon yöntemi ile, BlockingQueue tipine ait add ve take metotlarını ekledik. add metodu, queue eleman eklerken, take metodu queue’dan eleman alır. take metodunun bizim için en önemli özelliği, eğer queue boş ise, akış burada durdurulur ve queue’ya eleman ekleninceye kadar bekler. Sonsuz bir döngü ile eğer bir koleksiyonu kontrol eden bir kodumuz var ise, bizim için en önemli konu koleksiyon boş iken kodun devam etmemesidir. Çünkü kod devam eder ise, thread hiç bir iş yapmazken bile CPU tüketir. Buda bizim için istenmeyen bir durumdur.
ClientStarterV2.java sınıfı:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
package io.bilisim.java.thread.main; import io.bilisim.java.thread.processor.DataProcessorV2; import io.bilisim.java.thread.soket.SoketV2; public class ClientStarterV2 { public static void main(String[] args) { Thread thread = new Thread(new SoketV2(),"Client"); thread.start(); Thread dataProcessor = new Thread(new DataProcessorV2(),"Data Processor"); dataProcessor.start(); } } |
DataProcessor sınıfımızı da thread olarak çalıştırdır. Artık her iki uygulamayı da çalıştırıp sonuçlara bakabiliriz.
Sonuç:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
Gelen Veri: A,B,C, Gelme Saniyesi: 52 Gelen Veri: A,B,C,D, Gelme Saniyesi: 52 Gelen Veri: A,B,C,D,E, Gelme Saniyesi: 52 Gelen Veri: E,F,G, Gelme Saniyesi: 52 Gelen Veri: E,F,G,H, Gelme Saniyesi: 52 X Veri Tipi A,B,C islendi. Saniye: 54 Y Veri Tipi A,B,C,D islendi. Saniye: 56 Z Veri Tipi A,B,C,D,E islendi. Saniye: 58 X Veri Tipi E,F,G islendi. Saniye: 0 Y Veri Tipi E,F,G,H islendi. Saniye: 2 |
Sonuçlardan da gördüğümüz üzere problemimizi çözdük. Ama bu seferde gelen veriler bizim uygulamamızda birikiyor. Bir sonraki makalemizde inşallah bu problem üzere odaklanıp çözmeye çalışacağız.
Şimdilik bu kadar. İnşallah faydalı olmuştur.
İyi Çalışmalar