Ver código fonte

First commit

till 4 anos atrás
commit
cd7cc5f7b7

+ 3 - 0
.gitignore

@@ -0,0 +1,3 @@
+.idea
+*.iml
+target

+ 30 - 0
kafka_server/docker-stack.yml

@@ -0,0 +1,30 @@
+version: '3.2'
+services:
+  zookeeper:
+    image: wurstmeister/zookeeper:latest
+    deploy:
+      replicas: 1
+      restart_policy:
+        condition: on-failure
+    ports:
+      - "2181:2181"
+    networks:
+      - kafka-tier
+
+  kafka:
+    image: wurstmeister/kafka:2.12-2.3.0
+    deploy:
+      replicas: 1
+      restart_policy:
+        condition: on-failure
+    ports:
+      - "9092:9092"
+    networks:
+      - kafka-tier
+    environment:
+      - KAFKA_ADVERTISED_HOST_NAME=[YOUR_HOST_NAME]
+      - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
+      - ZOOKEEPER_IP=zookeeper
+
+networks:
+  kafka-tier:

+ 50 - 0
pom.xml

@@ -0,0 +1,50 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-starter-parent</artifactId>
+        <version>2.3.3.RELEASE</version>
+        <relativePath/> <!-- lookup parent from repository -->
+    </parent>
+
+    <groupId>org.example</groupId>
+    <artifactId>kafka-demo</artifactId>
+    <version>1.0-SNAPSHOT</version>
+    <dependencies>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.junit.vintage</groupId>
+                    <artifactId>junit-vintage-engine</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>

+ 11 - 0
src/main/java/kafkademo/MainApplication.java

@@ -0,0 +1,11 @@
+package kafkademo;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class MainApplication {
+    public static void main(String[] args) {
+        SpringApplication.run(MainApplication.class, args);
+    }
+}

+ 15 - 0
src/main/java/kafkademo/controller/HelloWorldController.java

@@ -0,0 +1,15 @@
+package kafkademo.controller;
+
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+@Controller
+public class HelloWorldController {
+
+    @ResponseBody
+    @RequestMapping("/hello")
+    public String hello() {
+        return "Hello World";
+    }
+}

+ 24 - 0
src/main/java/kafkademo/controller/KafkaProducerController.java

@@ -0,0 +1,24 @@
+package kafkademo.controller;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Controller;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.ResponseBody;
+
+@Controller
+public class KafkaProducerController {
+    @Autowired
+    private KafkaTemplate<String, String> kafkaTemplate;
+
+    @ResponseBody
+    @RequestMapping("/produce")
+    public String produce() {
+        try {
+            kafkaTemplate.send("one-exchange", "Hello");
+        } catch (Exception e) {
+            return "Produce failed";
+        }
+        return "Produce succeed";
+    }
+}

+ 35 - 0
src/main/java/kafkademo/kafka/consumer/KafkaConsumerConfig.java

@@ -0,0 +1,35 @@
+package kafkademo.kafka.consumer;
+
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.ConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@EnableKafka
+@Configuration
+public class KafkaConsumerConfig {
+    @Bean
+    public ConsumerFactory<String, String> consumerFactory() {
+        Map<String, Object> props = new HashMap<>();
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.9.128:9092");
+        props.put(ConsumerConfig.GROUP_ID_CONFIG, "group-id");
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        return new DefaultKafkaConsumerFactory<>(props);
+    }
+
+    @Bean
+    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
+        ConcurrentKafkaListenerContainerFactory<String, String>
+                factory = new ConcurrentKafkaListenerContainerFactory<>();
+        factory.setConsumerFactory(consumerFactory());
+        return factory;
+    }
+}

+ 13 - 0
src/main/java/kafkademo/kafka/listener/SingleListener.java

@@ -0,0 +1,13 @@
+package kafkademo.kafka.listener;
+
+import org.springframework.kafka.annotation.KafkaListener;
+
+public class SingleListener {
+
+    @KafkaListener(topics = "one-exchange", groupId = "group-id")
+    public String listen(String msg) {
+        System.out.println("Received Messasge in group - group-id: " + msg);
+        return "Received Messasge in group - group-id: " + msg;
+    }
+
+}

+ 30 - 0
src/main/java/kafkademo/kafka/producer/KafkaProducerConfig.java

@@ -0,0 +1,30 @@
+package kafkademo.kafka.producer;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Configuration
+public class KafkaProducerConfig {
+    @Bean
+    public ProducerFactory<String, String> producerFactory() {
+        Map<String, Object> configProps = new HashMap<>();
+        configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.9.128:9092");
+        configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        return new DefaultKafkaProducerFactory<>(configProps);
+    }
+
+    @Bean
+    public KafkaTemplate<String, String> kafkaTemplate() {
+        return new KafkaTemplate<>(producerFactory());
+    }
+}