Exchange-Fanout
Evvelki iki makalemiz (Merhaba RabbitMQ, WorkQueue) genelde mesaj queueya özelde ise rabbitmq’ya giriş mahiyetinde idi. Bu ve takip eden makalelerimiz ile birlikte rabbitmq’nun temeli olan exchange konusunu anlamaya çalışacağız.
RabbitMQ’nun mesaj gönderme modelinin temel mantığı; producer’ın mesajı göndereceği queue’yu bilmemesi ve mesajı direk queue’ya göndermemesidir. Exchange’de bu mantığın temelidir. Exchange producerdan mesajı alır ve mesajı queue veya gueuelara gönderir. Exchange aldığı mesajı ne yapacağını ise aşağıdaki bilgilere göre belirler.
1* Exchange tipi
2* Routing_Key(Mesaj ile bildirilir)
3* Binding_Key(Consumer tarafından bildilir)
RabbitMQ mimarisi aşağıdaki gibidir.
Exchange de queue gibi kalıcı(durable) yapılabilir. RabbitMQ restart edilse bile mesajların kaybolması engellenir.
rabbitmqctl list-exchanges komutu ile var olan exchangeler listelenir. Biz bu komutu çalıştıralım. Sonuç aşağıdaki gibi olucaktır.
1 2 3 4 5 6 7 8 9 10 11 12 13 |
Listing exchanges ... amq.match headers direct amq.rabbitmq.log topic amq.fanout fanout amq.headers headers amq.direct direct amq.topic topic amq.rabbitmq.trace topic |
Sonuçtaki ilk kısım exchange isimlerini gösterir. Sonuçlar içinde bir tane isimsiz exchange mevcut. Biz önceki iki makalemizde bu isimsiz exchange’i kullandık aslında.
1 2 3 4 5 |
channel.basicPublish("", "hello", null, message.getBytes()); |
ilk parametre exchange ismini alır. Biz bu değeri boş string olarak göndermiştik. İkinci kısım ise exchange tiplerini gösterir. RabbitMQ’da dört adet exchange tipi vardır.
*fanout
*direct
*topic
*headers
Biz bu makalemizde fanout exchange tipine yoğunlaşacağız. Fanout’a geçmeden önce iki önemli başlıktan bahsedelim. Sonrasında ise fanout özelinde alışık olduğumuz kodlama işine girişelim.
Binding:
Exhange ile Queue arasında kurulan ilişkiye binding denir. Exchange gelen mesajları hangi queue’ya koyacağını binding’e göre karar verir.
rabbitmqctl list-bindings ile listelemesini yaparız.
Geçici Kuyruklar(Temporary Queues):
Queue tanımlama işini rabbitmq ya bıraktığımız durumdur. RabbitMQ’ya her bağlandığımızda yeni bir queue oluşmasını ve bağlantıyı kapattığımızda otomatik silinmesini istiyorsak, geçici kuruk oluştururuz.
Fanout:
Bu exchange tipinde, exchange aldığı tüm mesajları, kendisine bağlı olan tüm queue’lara gönderir.
Mesaj gönderirken routing_key değeri vermemize gerek yoktur. Vermiş olsak bile, bu deger dikkate alınmaz. Producer ve Consumer uygulamalarımızı geliştirelim.
Producer:
1* Eclipse ile bir java projesi oluşturalım.
2* Projemizi maven uyumlu hale getirelim.
3* pom.xml dosyamız aşağıdaki gibi olacak.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>ProducerFanout</groupId> <artifactId>ProducerFanout</artifactId> <version>0.0.1-SNAPSHOT</version> <build> <sourceDirectory>src</sourceDirectory> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.1</version> </dependency> </dependencies> </project> |
4* SenderStarter.java sınıfını geliştirelim.
SenderStarter.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
package io.bilisim.rabbitmq.main; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class SenderStarter { private final static String HOST_NAME = "localhost"; private final static String USER_NAME = "guest"; private final static String PASSWORD = "guest"; private final static String EXHANGE_NAME = "FANOUTTYPE"; private final static String EXHANGE_TYPE = "fanout"; static String[] MESSAGES = {"X","Y","Z"}; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Connection connection; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXHANGE_NAME, EXHANGE_TYPE); for(String message : MESSAGES) { channel.basicPublish(EXHANGE_NAME, "", null, message.getBytes()); System.out.println("message sent"); } channel.close(); connection.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } } |
1 2 3 4 5 |
channel.exchangeDeclare(EXHANGE_NAME, EXHANGE_TYPE); |
Kodu ile exhange tanımı yaptık. İsmini ve exchange tipini belirttik.
Producer:
1* Eclipse ile bir java projesi oluşturalım.
2* Projemizi maven uyumlu hale getirelim.
3* pom.xml dosyamız aşağıdaki gibi olacak.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>ConsumerFanout</groupId> <artifactId>ConsumerFanout</artifactId> <version>0.0.1-SNAPSHOT</version> <build> <sourceDirectory>src</sourceDirectory> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.1</version> </dependency> </dependencies> </project> |
4* ReceiverStarter.java sınıfını geliştirelim.
ReceiverStarter.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
package io.bilisim.rabbitmq.main; import java.io.IOException; import java.text.MessageFormat; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; public class ReceiverStarter { private final static String HOST_NAME = "localhost"; private final static String USER_NAME = "guest"; private final static String PASSWORD = "guest"; private final static String EXCHANGE_NAME = "FANOUTTYPE"; private final static String EXCHANGE_TYPE = "fanout"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Connection connection; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("Waiting for Message"); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println(MessageFormat.format("{0} Received", message)); }; }; boolean autoAck = true; channel.basicConsume(queueName, autoAck, consumer); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } } |
1 2 3 4 5 |
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); |
Producer ve consumer’in aynı exchange’i kullanması lazım.
1 2 3 4 5 |
String queueName = channel.queueDeclare().getQueue(); |
channel.queueDeclare() metodu ile geçici bir queue oluşturduk. Queue ismini rabbit verecek. Bu sebeple getQueue() ile queue ismini alıyoruz.
1 2 3 4 5 |
channel.queueBind(queueName, EXCHANGE_NAME, ""); |
Oluşan queue ile exchange arasında binding oluşturuyoruz. Üçüncü parametreyi boş gönderdik. Bunu diğer exhange tiplerini anlatırken kullanacağız.
Geçmiş makalelerimizde olduğu gibi jar dosyalarını oluşturup uygulamalarımızı çalıştıracağız. İlk önce Consumer uygulamasını çalıştıracağız. Eğer ilk olarak Producer çalıştırılır ise, queue henüz olmadığından exhange e gönderilen mesajlar kaybolur.
İlk önce iki ayrı terminalde Consumer çalıştıralım. Ardından aşağıdaki komutlar ile queue, binding ve exchange’leri listeleyelim.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
./rabbitmqctl list_bindings Listing bindings ... exchange DURABLE queue DURABLE [] exchange amq.gen-HRFtMQQ1yvTeW4ILRm7O6g queue amq.gen-HRFtMQQ1yvTeW4ILRm7O6g [] exchange amq.gen-zRDee5-nubJ3q3KtBsgDOw queue amq.gen-zRDee5-nubJ3q3KtBsgDOw [] FANOUTTYPE exchange amq.gen-HRFtMQQ1yvTeW4ILRm7O6g queue [] FANOUTTYPE exchange amq.gen-zRDee5-nubJ3q3KtBsgDOw queue [] ./rabbitmqctl list_queues Listing queues ... amq.gen-HRFtMQQ1yvTeW4ILRm7O6g 0 amq.gen-zRDee5-nubJ3q3KtBsgDOw 0 ./rabbitmqctl list_exchanges Listing exchanges ... amq.match headers direct FANOUTTYPE fanout amq.rabbitmq.log topic amq.fanout fanout amq.headers headers amq.direct direct amq.topic topic amq.rabbitmq.trace topic |
Yaratılan geçici queue’ları ve exchange ile bu queue’lar arası binding’leri gördük.
Şimdi Producer’i çalıştıralım. Uygulama çıktıları aşağıdaki gibi olacaktır.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
message sent message sent message sent Waiting for Message X Received Y Received Z Received Waiting for Message X Received Y Received Z Received |
Fanout exchange tipi ile ilgili makalemiz bu kadar. İnşallah faydalı olmuştur.
İyi Çalışmalar