WorkQueue
Mesaj kuyruklarını, task işleyecek şekilde de kullanabiliriz. WorkQueue, işlenmesi uzun süre alan işlerin bir mesaj gibi queue içerisine konması yaklaşımıdır. Genellikle, sistem kaynaklarını fazla tüketen işlemler ve özellikler web uygulamalarında kullanılır. Örneğin, yeniden ölçeklemesi yapılacak resim dosyaları veya pdf dosya üretimlerinde, Consumer uygulamalar başka bir sunucuda çalıştırılarak, web sunucusunun bu işlemlerden etkilenmesinin önüne geçilir.
Queue’ya eklenen işlerin çoklu consumerlara dağıtımı rabbitmq tarafından yapılır. Yük dağılımı rabbitmq sorumluluğundadır. Biz sadece producer ve consumer uygulamaları yazarız.
RabbitMQ’ya yabancı iseniz, Merhaba RabbitMQ makalemizi incelemenizi öneririm.
Bu makalemizde yapacaklarımız sırası ile;
1* Producer ve Consumer uygulamaları geliştireceğiz.
2* Producer uygulamamız farklı sayıda * işaretinden oluşan işler gönderecek(*,***,**).
3* Consumer uygulamamız mesajdaki her bir * için 1 saniye bekleyecek. Bu sayede, farklı süreler alan işleri simule etmiş olacağız.
Producer uygulamamız:
ProducerWQ isimli bir java projesi oluşturalım. Projemizi maven uyumlu hale getirelim.
pom.xml:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>ProducerWQ</groupId> <artifactId>ProducerWQ</artifactId> <version>0.0.1-SNAPSHOT</version> <build> <sourceDirectory>src</sourceDirectory> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.1</version> </dependency> </dependencies> </project> |
SenderStater.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
package io.bilisim.rabbitmq.main; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class SenderStarter { private final static String QUEUE_NAME = "MERHABA"; private final static String HOST_NAME = "localhost"; private final static String USER_NAME = "guest"; private final static String PASSWORD = "guest"; static String[] TASKS = {"*","*****","**","***"}; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Connection connection; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); for(String task : TASKS) { channel.basicPublish("", QUEUE_NAME, null, task.getBytes()); System.out.println("task sent"); } channel.close(); connection.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } } |
Consumer uygulamamız:
ConsumerWQ isimli bir java projesi oluşturalım. Projemizi maven uyumlu hale getirelim.
pom.xml:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>ConsumerWQ</groupId> <artifactId>ConsumerWQ</artifactId> <version>0.0.1-SNAPSHOT</version> <build> <sourceDirectory>src</sourceDirectory> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.1</version> </dependency> </dependencies> </project> |
ReceiverStarter.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
package io.bilisim.rabbitmq.main; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; public class ReceiverStarter { private final static String QUEUE_NAME = "MERHABA"; private final static String HOST_NAME = "localhost"; private final static String USER_NAME = "guest"; private final static String PASSWORD = "guest"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Connection connection; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("Waiting for Task"); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); proceedWork(message); }; }; boolean autoAck = true; channel.basicConsume(QUEUE_NAME, autoAck, consumer); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } private static void proceedWork(String message) { try { System.out.println("ReceivedTask: "+message); long waitMiliseconds = message.length() * 1000L; Thread.sleep(waitMiliseconds); } catch (Exception e) { e.printStackTrace(); } } } |
Uygulamalarımız geliştirdikten sonra her iki uygulamızdan birer jar dosyası oluşturuyoruz. Sonra istediğimiz bir klasöre bu jarlarımı ve ayrıca amqp-client-4.0.1.jar ile slf4j-api-1.7.21.jar dosyalarını koyuyoruz. Bu iki jar dosyasını $USER-HOME/.m2/repository altında bulabilirsiniz. Klasörün içeriği aşağıdaki gibi olmalı.
Artık uygulamamalarımızı çalıştırabiliriz. Komut satırından çalıştıracağız. Test için çalışan iki adet Consumer bir adet Producer uygulaması olması lazım. Üç tane terminal ekranı açalım. Jar klasörüne gidelim. Klasörün içerisinden çalıştırıcağız.
İki terminale aşağıdaki komutu kopyalayalım.
1 2 3 4 5 |
java -cp amqp-client-4.0.1.jar:slf4j-api-1.7.21.jar:ConsumerWQ-0.0.1-SNAPSHOT.jar io.bilisim.rabbitmq.main.ReceiverStarter |
Üçüncü terminale de takip eden komutu kopyalayalım.
1 2 3 4 5 |
java -cp amqp-client-4.0.1.jar:slf4j-api-1.7.21.jar:ProducerWQ-0.0.1-SNAPSHOT.jar io.bilisim.rabbitmq.main.SenderStarter |
Önce consumer komutları sonrasında producer komutun çalıştıralım. Terminalllerde aşağıdaki çıktılar yazmalı.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
//Producer task sent task sent task sent task sent //Consumer Waiting for Task ReceivedTask: * ReceivedTask: ** //Consumer Waiting for Task ReceivedTask: ***** ReceivedTask: *** |
Çıktılardan da görebileceğimiz gibi, queue’dan tasklar sıra ile Consumer’lara gönderildi.
1nci task: *
2nci task: *****
3ncü task: **
4ncü task: ***
1nci ve 3ncü tasklar bir consumera, diğer iki task ise diğer consumera gönderildi. İşin bitip bitmemesi kontrol edilmiyor. Bu mesaj dağıtım modeline round-robin denir. Bizim istediğimiz ise, Consumer’ların idle kalmaması. Bunun için;
channed.basicQos(int i) metotunu autoAck değerini false yaparak kullanıyoruz. Bu cümle burada dursun, biz Message Acknowledgement ne demek ona bakalım.
Message Acknowledgement:
Acknowledgement’ın alındı bildirimi, onay gibi anlamları vardır. Acknowledge, rabbitmq ya özgü bir durum değildir. İki farklı uygulamanın konuşturulduğu mimarilerde, ack mesajı gerekli durumlarda kullanılır. Biz uygulamamızı autoAck = true olarak geliştirdik. Queue mesaj’ı gönderdi ve peşinden sildi. Mesajların işlenip işlenmemesi önemli olmadığı durumlar da bu yapı kullanılabilir. Ama workqueue gibi task gönderdiğimiz durumda dahil olmak üzere, bir çok uygulamada mesajların işlenmesi çok önemlidir. Consumer’ın herhangi bir anda çalışmıyor olma durumunda (Channel kapanması veya TCP bağlantısının kopma durumu) Consumer’ın aldığı mesajın başka bir Consumer’a gönderilebilmesi için autoAck = false olarak çalışmalıyız.
AutoAck false olduğunda, eğer Consumer’dan ack mesajı gelmez ise, mesaj tekrardan queue’ya eklenir ve başka bir Consumer bağlı ise, ona gönderilir.
basicConsume() metotunun ikinci parametresi autoAck parametresini alır. Sanırım şimdi yukarıdaki cümlemiz daha anlamlı oldu sanırım.
ReceiverStarter.java sınıfımızı aşağıdaki gibi düzenleyelim.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
package io.bilisim.rabbitmq.main; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; public class ReceiverStarter { private final static String QUEUE_NAME = "MERHABA"; private final static String HOST_NAME = "localhost"; private final static String USER_NAME = "guest"; private final static String PASSWORD = "guest"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Connection connection; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.basicQos(1);//NEW System.out.println("Waiting for Task"); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); proceedWork(message); channel.basicAck(envelope.getDeliveryTag(), false);//NEW }; }; boolean autoAck = false;//NEW channel.basicConsume(QUEUE_NAME, autoAck, consumer); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } private static void proceedWork(String message) { try { System.out.println("ReceivedTask: "+message); long waitMiliseconds = message.length() * 1000L; Thread.sleep(waitMiliseconds); } catch (Exception e) { e.printStackTrace(); } } } |
Consumer uygulamasından yeni bir jar oluşturup uygulamalarımızı daha önceki sıra ile çalıştırdığımızda aşağıdaki sonucu alırız.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
//Consumer Waiting for Task ReceivedTask: * ReceivedTask: ** ReceivedTask: *** //Consumer Waiting for Task ReceivedTask: ***** |
Gördüğümüz gibi, taskları verimli bir şekilde Consumer’lara gönderdik.
Ack mesajı kullandığımız yapıda her zaman mesaj kaybolmasının önüne geçemez. RabbitMQ eğer bir şekilde kapanır ise, queue içerisindeki mesajlar kaybolur. Bunu engellemek için hem queue’yu hem de mesajımızı kalıcı(durable) olarak etiketlemeliyiz.
queueDeclare metotunun ikinci parametresi kalıcılık parametresidir. basicPublish metotunun ikinci parametreside mesaj için kalıcılık parametresidir. Bunlar kalıcı olarak set edildiğinde, mesaj queue’ya gönderildikten sonra hem queue’ya eklenir hem diske yazılır. Bu şekilde rabbitMQ kapansa bile diskte yazılan mesajlar kaybolmaz.
NOT: RabbitMQ da tanımladığımız bir queue’nun özellikleri daha sonra güncellenemez. Biz önceki örneklerimizde kullanıdığımız MERHABA isimli queue’yu durable false olarak tanımlamıştık, şuan aynı queue’yu kullanamayız.
SenderStarter.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
package io.bilisim.rabbitmq.main; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.MessageProperties; public class SenderStarter { private final static String QUEUE_NAME = "DURABLE"; private final static String HOST_NAME = "localhost"; private final static String USER_NAME = "guest"; private final static String PASSWORD = "guest"; static String[] TASKS = {"*","*****","**","***"}; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Connection connection; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null); for(String task : TASKS) { channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, task.getBytes()); System.out.println("task sent"); } channel.close(); connection.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } } |
ReceiverStarter.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
package io.bilisim.rabbitmq.main; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; public class ReceiverStarter { private final static String QUEUE_NAME = "DURABLE"; private final static String HOST_NAME = "localhost"; private final static String USER_NAME = "guest"; private final static String PASSWORD = "guest"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Connection connection; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null); channel.basicQos(1);//NEW System.out.println("Waiting for Task"); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); proceedWork(message); channel.basicAck(envelope.getDeliveryTag(), false);//NEW }; }; boolean autoAck = false;//NEW channel.basicConsume(QUEUE_NAME, autoAck, consumer); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } private static void proceedWork(String message) { try { System.out.println("ReceivedTask: "+message); long waitMiliseconds = message.length() * 1000L; Thread.sleep(waitMiliseconds); } catch (Exception e) { e.printStackTrace(); } } } |
Her iki jar dosyamızı güncelledik. Producer uygulamamızı çalıştırdık. Ardından queue’ları listeledik.
1 2 3 4 5 6 7 8 |
./rabbitmqctl list_queues Listing queues ... MERHABA 0 DURABLE 4 |
RabbitMQ’yu stop start edip tekrardan queue listeleme komutunu çalıştırdık.
1 2 3 4 5 6 7 |
./rabbitmqctl list_queues Listing queues ... DURABLE 4 |
DURABLE isimli queue’nun ve içindeki mesajların kalıcı olduğunu gördük.
Şimdilik bu kadar. İnşallah faydalı olmuştur.
İyi Çalışmalar