Exchange-Direct
Gönderdiğimiz mesajın belli bir queueya gitmesini istiyorsak direct modeli kullanırız. Modeli kullanabilmek için exchange tanımını yaparken, exhange tipini direct olarak belirtir, queueya bağlanırken de routing_key bilgisini veririz. (Exhange kavramları için Exchange-Fanout yazımıza bakabilirsiniz.)
Bu bilgiler ile, exchange mesajları ilgili queue’lara gönderir, consumer’lar da sadece ilgilendikleri mesajları alırlar.
Önceki makalelerimizde olduğu gibi bir örnek yapalım. Producer ve Consumer uygulamaları yapacağız. Routing_key değerlerimiz yüksek ve düşük olsun. Bunlar queue’ya konan mesajların öncelik sırasını belirlesin.
Producer:
1* Eclipse ile bir java projesi oluşturalım.
2* Projemizi maven uyumlu hale getirelim.
3* pom.xml dosyamız aşağıdaki gibi olacak.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>ProducerDirect</groupId> <artifactId>ProducerDirect</artifactId> <version>0.0.1-SNAPSHOT</version> <build> <sourceDirectory>src</sourceDirectory> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.1</version> </dependency> </dependencies> </project> |
4* SenderStarter.java sınıfını geliştirelim.
SenderStarter.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
package io.bilisim.rabbitmq.main; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class SenderStarter { private final static String HOST_NAME = "localhost"; private final static String USER_NAME = "guest"; private final static String PASSWORD = "guest"; private final static String EXHANGE_NAME = "DIRECTTYPE"; private final static String EXHANGE_TYPE = "direct"; static String[] MESSAGES = {"X","Y","Z"}; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Connection connection; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXHANGE_NAME, EXHANGE_TYPE); for(int i =0; i< MESSAGES.length;i++) { if(i % 2 == 0) { channel.basicPublish(EXHANGE_NAME, "HIGH", null, MESSAGES[i].getBytes()); System.out.println("high message sent"); } else { channel.basicPublish(EXHANGE_NAME, "LOW", null, MESSAGES[i].getBytes()); System.out.println("low message sent"); } } channel.close(); connection.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } } |
X,Y ve Z mesajlarımız mevcut. X ve Z mesajlarımızı HIGH, Y mesajımızı ise LOW routing_key ile göndereceğiz.
Consumer:
1* Eclipse ile bir java projesi oluşturalım.
2* Projemizi maven uyumlu hale getirelim.
3* pom.xml dosyamız aşağıdaki gibi olacak.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>ConsumerDirect</groupId> <artifactId>ConsumerDirect</artifactId> <version>0.0.1-SNAPSHOT</version> <build> <sourceDirectory>src</sourceDirectory> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.1</version> </dependency> </dependencies> </project> |
4* ReceiverStarter.java sınıfını geliştirelim.
ReceiverStarter.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 |
package io.bilisim.rabbitmq.main; import java.io.IOException; import java.text.MessageFormat; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; public class ReceiverStarter { private final static String HOST_NAME = "localhost"; private final static String USER_NAME = "guest"; private final static String PASSWORD = "guest"; private final static String EXCHANGE_NAME = "DIRECTTYPE"; private final static String EXCHANGE_TYPE = "direct"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Connection connection; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); String queueName = channel.queueDeclare().getQueue(); System.out.println("SEVERITY : "+args[0]); channel.queueBind(queueName, EXCHANGE_NAME, args[0]); System.out.println("Waiting for Message"); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println(MessageFormat.format("{0} Received", message)); }; }; boolean autoAck = true; channel.basicConsume(queueName, autoAck, consumer); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } } |
Consumer uygulamamıza çalıştırırken binding_key değerini parametre olarak geçeceğiz. Şimdi uygulamalarımızın jarlarını oluşturalım ve önceden de yaptığımız gibi ilk önce iki Consumer uygulamalarını bir Producer uygulamasını çalıştıralım.
1 2 3 4 5 |
java -cp amqp-client-4.0.1.jar:slf4j-api-1.7.21.jar:ConsumerDirect-0.0.1-SNAPSHOT.jar io.bilisim.rabbitmq.main.ReceiverStarter "HIGH" |
1 2 3 4 5 |
java -cp amqp-client-4.0.1.jar:slf4j-api-1.7.21.jar:ConsumerDirect-0.0.1-SNAPSHOT.jar io.bilisim.rabbitmq.main.ReceiverStarter "LOW" |
1 2 3 4 5 |
java -cp amqp-client-4.0.1.jar:slf4j-api-1.7.21.jar:ProducerDirect-0.0.1-SNAPSHOT.jar io.bilisim.rabbitmq.main.SenderStarter |
Program çıktıları sırası ile aşağıdaki gibi olacaktır.
1 2 3 4 5 6 7 8 |
SEVERITY : HIGH Waiting for Message X Received Z Received |
1 2 3 4 5 6 7 |
SEVERITY : LOW Waiting for Message Y Received |
1 2 3 4 5 6 7 |
high message sent low message sent high message sent |
İnşallah faydalı bir yazı olmuştur.
İyi Çalışmalar