Merhaba RabbitMQ
RabbitMQ mesaj alıp bu mesajı gönderen bir ara yazılımdır. RabbitMQ’nun sitesinde yazılım için, postahane metaforu kullanılmıştır. RabbitMQ posthane gibi, mesajı alıp bu mesajı iletmekten sorumludur. Temel rabbitMQ kavramları:
Producer: Mesaj gönderen her bir uygulama.
Producing: Mesaj gönderme eylemi
Consumer: Mesaj alan her bir uygulama
Consuming: Mesaj alma eylemi
Queue: Mesajların rabbitmq üzerinden, bir uygulamadan bir uygulamaya gönderilirken saklandığı kutudur.
Producer, queue ve consumer aynı makinada da olabilir farklı makinalarda da olabilir. RabbitMQ bir çok programlama dilleri için API’ler sunar. Biz örneklerimizi Java ile yapacağız.
Mac için Kurulum:
Bir kaç farklı yöntem mevcut. Ben homebrew ile kurulum yaptım. Kurulum için aşağıdaki komutu çalıştırmanız yeterli.
1 2 3 4 5 |
brew install rabbitmq |
Kurulum sonrası, ‘/usr/local/sbin’ altından rabbitmq uygulamamızı başlatıp durdurabiliriz.
Başlatma
1 2 3 4 5 |
./rabbitmq-server |
Durdurma
1 2 3 4 5 |
./rabbitmqctl stop |
Kurulum esnasında, admin yetkisine sahip guest isimli bir kullanıcı eklenir. Tanımlı kullanıcıları görmek için:
1 2 3 4 5 |
./rabbitmqctl list_users |
komutunu çalıştırırız.
Kurulum sonrası ‘Merhaba RabbitMQ’ uygulamamızı geliştirelim.
İki uygulamaya ihtiyacımız var. Birincisi Producer uygulamamız olacak ve queue ya mesaj koyacak. İkinci uygulamamız ise, queue daki mesajı alacak olan Consumer uygulamamız olacak.
Uygulamaları Eclipse IDE üzerinde Java 8 ile Maven 3.3 kullanarak geliştireceğiz.
Producer
Uygulamamız rabbitmq ya bağlanıp queue’ya mesaj ekleyecek.
1* Eclipse üzerinde bir java projesi oluşturuyoruz.
2* Uygulamamıza sağ tıklayıp Configure -> Convert to Maven Project yapıyoruz. Böylece uygulamamız maven uyumlu hale geliyor.
3* pom.xml:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>Producer</groupId> <artifactId>Producer</artifactId> <version>0.0.1-SNAPSHOT</version> <build> <sourceDirectory>src</sourceDirectory> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.1</version> </dependency> </dependencies> </project> |
4* SenderStarter.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
package io.bilisim.rabbitmq.main; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; public class SenderStarter { private final static String QUEUE_NAME = "MERHABA"; private final static String HOST_NAME = "localhost"; private final static String USER_NAME = "guest"; private final static String PASSWORD = "guest"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Connection connection; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Merhaba RabbitMQ"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println("message sent"); channel.close(); connection.close(); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } } |
ConnectionFactory: rabbitmq’ya bağlanmak için, java api tarafından sunulan soket soyutlama katmanıdır. Kimlik doğrulama vb işlemleri bizim için yapar. Connection nesnesini ConnectionFactory üzerinden alırız.
Connection: Uygulamadan rabbitmq ya açılan TCP connection’dır.
Channel: tek bir TCP bağlantısını kullanalan sanal bağlantılar olarak adlandırılabilir. Bazı durumlarda rabbitmq’ya bir den fazla tcp bağlantısı ihtiyacımız olabilir. TCP bağlantısı açmak hem yeni kaynak tüketimine neden olur hem de yönetim zorlukları içerir. Bu sebeple Channel interface’i kullanılır. Her bir thread başına bir channel açılması önemlidir.
Channnel oluşturduktan sonra queue tanımı yapıyoruz. Queue ismi önemli burada. Consumer uygulamamızda, aynı isimli queue’ya bağlanacak. Bu tanımda, verilen isim ile daha önceden tanımlanmış bir queue var ise birşey yapılmaz, yok ise queue oluşturulur. Ardından ilk mesajımızı gönderiyoruz.
Consumer
1* Eclipse üzerinde bir java projesi oluşturuyoruz.
2* Uygulamamıza sağ tıklayıp Configure -> Convert to Maven Project yapıyoruz. Böylece uygulamamız maven uyumlu hale geliyor.
3* pom.xml:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>Consumer</groupId> <artifactId>Consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <build> <sourceDirectory>src</sourceDirectory> <plugins> <plugin> <artifactId>maven-compiler-plugin</artifactId> <version>3.5.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>4.0.1</version> </dependency> </dependencies> </project> |
4* ReceiverStarter.java:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
package io.bilisim.rabbitmq.main; import java.io.IOException; import java.util.concurrent.TimeoutException; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.Consumer; import com.rabbitmq.client.DefaultConsumer; public class ReceiverStarter { private final static String QUEUE_NAME = "MERHABA"; private final static String HOST_NAME = "localhost"; private final static String USER_NAME = "guest"; private final static String PASSWORD = "guest"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost(HOST_NAME); factory.setUsername(USER_NAME); factory.setPassword(PASSWORD); Connection connection; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println("Waiting for Message"); Consumer consumer = new DefaultConsumer(channel) { public void handleDelivery(String consumerTag, com.rabbitmq.client.Envelope envelope, com.rabbitmq.client.AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body,"UTF-8"); System.out.println("ReceivedMessage: "+message); }; }; channel.basicConsume(QUEUE_NAME, true, consumer); } catch (IOException | TimeoutException e) { e.printStackTrace(); } } } |
Producer uygulamamızdan farklı olan kod satırı, Queue’dan mesajları almamızı sağlayacak olan Consumer arayüzüdür. DefaultConsumer implementasyon sınıfının handleDelivery metodunu override ettik. Böylece bir callback metot oluşturduk. Queue’ya mesaj geldiğinde, bu metot çağrılır ve mesaj receiver uygulamaya ulaşır.
Şuan uygulamalarımızı çalıştırmaya hazır hale geldik. Önce producer uygulamamızı çalıştıralım. Uygulama başarılı şekilde çalıştıktan sonra aşağıdaki komut ile, queue içerisindeki mesaj sayısını görelim.
1 2 3 4 5 6 7 |
./rabbitmqctl list_queues Listing queues ... MERHABA 1 |
Şimdi Receiver uygulamamızı çalıştıralım. Aşağıdaki çıktıyı almış olmalıyız.
1 2 3 4 5 6 |
Waiting for Message ReceivedMessage: Merhaba RabbitMQ |
Şimdilik bu kadar. İnşallah faydalı olmuştur.
İyi Çalışmalar