Apache Kafka’nın Java ile kullanımı
Merhaba arkadaşlar,
Apache Kafka ile ilgili bilgiyi kurulum ve kullanımı hakkında bir önceki yazımda detaylı deyinmiştik.
Link: http://www.turkishh.com/programlama/kafka-nedir-kurulumu-ve-kullanimi/
Bu yazımda kafka ile javada procuder ve consumer yapan küçük bir uygulama yapacağız. Uygulamamız maven olacak springboot ile yapacağız uygulamamızı.Kafka’yı kullanabilmemiz için Java version 1.8 olması lazım.
Senaryomuz basit prodecure yapacak bir rest servis yazacağız. Servise istek attığımızda messajımızı parametre ile geçeceğiz ve kafka kütüphanelerini kullanarak producer yapacağız ve aynı şekilde producer ettiğimiz mesajımızı consumer edip konsola yazacağız. Ayrıyeten komut satırından da consumer edeceğiz.
İlk önce bağımlılıklarımızı tanımlayalım pom dosyamız şöyle olacak.
pom.xml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>io.bilisim</groupId> <artifactId>ApacheKafka</artifactId> <packaging>jar</packaging> <version>0.0.1-SNAPSHOT</version> <name>ApacheKafka</name> <url>http://www.turkishh.com</url> <properties> <java.version>1.8</java.version> </properties> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.3.5.RELEASE</version> </parent> <dependencies> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.2</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> <exclusion> <groupId>log4j</groupId> <artifactId>log4j</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>javax.ws.rs</groupId> <artifactId>jsr311-api</artifactId> <version>1.1.1</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-kafka</artifactId> <version>1.1.1.RELEASE</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.integration</groupId> <artifactId>spring-integration-java-dsl</artifactId> <version>1.1.0.RELEASE</version> </dependency> </dependencies> <build> <finalName>ApacheKafka</finalName> </build> </project> |
SpringBoot kullandığımız için uygulamamızı start edecek sınıfı yazıyoruz
Application.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
package io.bilisim; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.integration.config.EnableIntegration; @SpringBootApplication @EnableIntegration public class Application { private static final Logger LOGGER = LoggerFactory.getLogger(Application.class); public static void main(String[] args) { LOGGER.info("Starting..."); SpringApplication.run(Application.class, args); } } |
Sonrasında properties dosyamızı resources dizininde altına atıyoruz.
service.properties
1 2 3 4 5 6 7 8 9 |
kafka.producer.host=localhost kafka.producer.port=9092 kafka.producer.topic=turkishh.com kafka.consumer.zookeeperAddress=localhost:2181 kafka.consumer.topic=io.bilisim |
Daha sonra KafkaConfig sınıfımızı yazıyoruz .
KafkaConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 |
package io.bilisim.kafka; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; @Component public class KafkaConfig { @Value("${kafka.topic:io-bilisim}") private String topic; @Value("${kafka.consumer.zookeeperAddress}") private String zookeeperAddress; public KafkaConfig() { } public KafkaConfig(String topic, String brokerAddress, String zookeeperAddress) { this.topic = topic; this.zookeeperAddress = zookeeperAddress; } public String getTopic() { return topic; } public String getZookeeperAddress() { return zookeeperAddress; } } |
Prodecure yapabilmemiz için config sınıfını yazıyoruz.
KafkaProducerConfig.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 |
package io.bilisim.kafka; import java.util.Properties; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.PropertySource; import org.springframework.context.support.PropertySourcesPlaceholderConfigurer; import kafka.javaapi.producer.Producer; import kafka.producer.ProducerConfig; @Configuration @PropertySource("classpath:/service.properties") public class KafkaProducerConfig { @Value("${kafka.producer.host}") private String kafkaHost; @Value("${kafka.producer.port}") private int kafkaPort; @Bean public static PropertySourcesPlaceholderConfigurer propertySourcesPlaceholderConfigurer() { return new PropertySourcesPlaceholderConfigurer(); } @Bean public Producer<String,String> kafkaProducer() { Properties properties = new Properties(); properties.put("metadata.broker.list",kafkaHost + ":" + kafkaPort); properties.put("serializer.class","kafka.serializer.StringEncoder");; ProducerConfig producerConfig = new ProducerConfig(properties); Producer<String,String> producer = new Producer<String, String>(producerConfig); System.out.println("kafka is ready on " + kafkaHost + ":" + kafkaPort); return producer; } } |
Consumer yapabilmemiz için bir başka config sınıfımızı yazıyoruz.
ConsumerConfiguration.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 |
package io.bilisim.kafka; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.integration.dsl.IntegrationFlow; import org.springframework.integration.dsl.IntegrationFlows; import org.springframework.integration.dsl.SourcePollingChannelAdapterSpec; import org.springframework.integration.dsl.kafka.Kafka; import org.springframework.integration.dsl.kafka.KafkaHighLevelConsumerMessageSourceSpec; import org.springframework.integration.dsl.support.Consumer; import org.springframework.integration.kafka.support.ZookeeperConnect; @Configuration public class ConsumerConfiguration { @Autowired private KafkaConfig kafkaConfig; private Log log = LogFactory.getLog(getClass()); @Bean IntegrationFlow consumer() { log.info("starting consumer.. Access Point -> " + "topic : " + kafkaConfig.getTopic() + "zookpeeperaddress : " + kafkaConfig.getZookeeperAddress()); KafkaHighLevelConsumerMessageSourceSpec messageSourceSpec = Kafka .inboundChannelAdapter(new ZookeeperConnect(this.kafkaConfig.getZookeeperAddress())) .consumerProperties( props -> props.put("auto.offset.reset", "smallest").put("auto.commit.interval.ms", "1")) .addConsumer("iot-events-group", metadata -> metadata.consumerTimeout(100) .topicStreamMap(m -> m.put(this.kafkaConfig.getTopic(), 1)).maxMessages(10) .valueDecoder(String::new)); Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer = e -> e.poller(p -> p.fixedDelay(100)); return IntegrationFlows.from(messageSourceSpec, endpointConfigurer) .<Map<String, List<String>>> handle((payload, headers) -> { payload.entrySet().forEach(e -> log.info(readProducer(payload))); return null; }).get(); } public String readProducer(Map<String, List<String>> payload) { try { List<List<String>> values = new ArrayList<List<String>>(payload.values()); @SuppressWarnings("unchecked") ConcurrentHashMap<String, List<String>> value = (ConcurrentHashMap<String, List<String>>) values.get(0); for (int i = 0; i < value.get(0).size(); i++) { log.info(" producer data : " + value.get(0).get(i)); } } catch (Exception e) { log.info("Ended in failure method ConsumerConfiguration@createEvents " + e.getMessage()); } return "success"; } } |
Kafka producer yapabilmemiz için producer servisimizi yazıyoruz.
KafkaProducerService.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
package io.bilisim.kafka; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.PropertySource; import org.springframework.stereotype.Service; import kafka.javaapi.producer.Producer; import kafka.producer.KeyedMessage; @Service @PropertySource("classpath:/service.properties") public class KafkaProducerService { private Log log = LogFactory.getLog(getClass()); @Value("${kafka.producer.topic}") String kafkaTopic; @Autowired Producer<String,String> producer; /** * Kafka writes value * * @param data - The type parameter String * @return the method return type is void * @author mkilic */ public void setMessage(String data){ KeyedMessage<String, String> message =new KeyedMessage<String, String>(kafkaTopic,data); producer.send(message); } } |
Contoller sınıfımızı yazıyoruz.
KafkaProducerController.java
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 |
package io.bilisim.controller; import javax.ws.rs.Consumes; import javax.ws.rs.Produces; import javax.ws.rs.core.MediaType; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.PropertySource; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import com.turkishh.kafka.KafkaProducerService; @RestController @RequestMapping("/kafka") @PropertySource("classpath:/service.properties") public class KafkaProducerController { @Autowired KafkaProducerService kafkaProducerService; @RequestMapping(value = "/producer/{message}", method = RequestMethod.GET) @Consumes({ MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN, MediaType.APPLICATION_XML }) @Produces({ MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN, MediaType.APPLICATION_XML }) public String producer(@PathVariable("message") String message) { kafkaProducerService.setMessage(message); return "true"; } } |
Uygulamamızı çalıştırdığımız zaman servisimize istek atmamız lazım producer işlemi için
http://localhost:8080/kafka/producer/<mesajımız>
Producer ettiğimizde çıktısını konsola bakabilirz yada bir önceki yazımdaki consumer ekranından görebiliriz.
çıktısı:
konsol çıktısı: