高吞吐量:Kafka能够处理大规模的数据流,并支持高吞吐量的消息传输。
持久性:Kafka将消息持久化到磁盘上,保证了消息不会因为系统故障而丢失。
分布式:Kafka是一个分布式系统,可以在多个节点上运行,具有良好的可扩展性和容错性。
支持多种协议:Kafka支持多种协议,如TCP、HTTP、UDP等,可以与不同的系统进行集成。
灵活的消费模式:Kafka支持多种消费模式,如拉取和推送,可以根据需要选择合适的消费模式。
可配置性强:Kafka的配置参数非常丰富,可以根据需要进行灵活配置。
社区支持:Kafka作为Apache旗下的开源项目,拥有庞大的用户基础和活跃的社区支持,方便用户得到及时的技术支持。
添加依赖:在pom.xml文件中添加Kafka的依赖,包括spring-kafka和kafka-clients。确保版本与你的项目兼容。
创建生产者:创建一个Kafka生产者类,实现Producer接口,并使用KafkaTemplate发送消息。
配置生产者:在Spring Boot的配置文件中配置Kafka生产者的相关参数,例如bootstrap服务器地址、Kafka主题等。
发送消息:在需要发送消息的地方,注入Kafka生产者,并使用其发送消息到指定的Kafka主题。
创建消费者:创建一个Kafka消费者类,实现Consumer接口,并使用KafkaTemplate订阅指定的Kafka主题。
配置消费者:在Spring Boot的配置文件中配置Kafka消费者的相关参数,例如group id、auto offset reset等。
接收消息:在需要接收消息的地方,注入Kafka消费者,并使用其接收消息。
处理消息:对接收到的消息进行处理,例如保存到数据库或进行其他业务逻辑处理。
pom中填了依赖
org.springframework.kafka spring-kafka 2.8.1 org.apache.kafka kafka-clients 2.8.1
创建生产者:创建一个Kafka生产者类,实现Producer接口,并使用KafkaTemplate发送消息。
import org.apache.kafka.clients.producer.*; import org.springframework.beans.factory.annotation.Value; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @Component public class KafkaProducer { @Value("${kafka.bootstrap}") private String bootstrapServers; @Value("${kafka.topic}") private String topic; private KafkaTemplate kafkaTemplate; public KafkaProducer(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void sendMessage(String message) { Producer producer = new KafkaProducer<>(bootstrapServers, new StringSerializer(), new StringSerializer()); try { producer.send(new ProducerRecord<>(topic, message)); } catch (Exception e) { e.printStackTrace(); } finally { producer.close(); } } }
配置生产者:在Spring Boot的配置文件中配置Kafka生产者的相关参数,例如bootstrap服务器地址、Kafka主题等。
import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.kafka.core.DefaultKafkaProducerFactory; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.ProducerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.ConsumerConfig; import org.springframework.kafka.listener.ConcurrentMessageListenerContainer; import org.springframework.kafka.listener.MessageListener; import org.springframework.context.annotation.PropertySource; import java.util.*; import org.springframework.beans.factory.*; import org.springframework.*; import org.springframework.*;expression.*;value; @Value("${kafka}") Properties kafkaProps = new Properties(); @Bean public KafkaTemplate kafkaTemplate(ProducerFactory pf){ KafkaTemplate template = new KafkaTemplate<>(pf); template .setMessageConverter(new StringJsonMessageConverter()); template .setSendTimeout(Duration .ofSeconds(30)); return template ; } @Bean public ProducerFactory producerFactory(){ DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(kafkaProps); factory .setBootstrapServers(bootstrapServers); factory .setKeySerializer(new StringSerializer()); factory .setValueSerializer(new StringSerializer()); return factory ; } @Bean public ConsumerFactory consumerFactory(){ DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory<>(consumerConfigProps); factory .setBootstrapServers(bootstrapServers); factory .setKeyDeserializer(new StringDeserializer()); factory .setValueDeserializer(new StringDeserializer()); return factory ; } @Bean public ConcurrentMessageListenerContainer container(ConsumerFactory consumerFactory, MessageListener listener){ ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(consumerFactory); container .setMessageListener(listener); container .setConcurrency(3); return container ; } @Bean public MessageListener
消费者
import org.apache.kafka.clients.consumer.*; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @Component public class KafkaConsumer { @Value("${kafka.bootstrap}") private String bootstrapServers; @Value("${kafka.group}") private String groupId; @Value("${kafka.topic}") private String topic; private KafkaTemplate kafkaTemplate; public KafkaConsumer(KafkaTemplate kafkaTemplate) { this.kafkaTemplate = kafkaTemplate; } public void consume() { Consumer consumer = new KafkaConsumer<>(consumerConfigs()); consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); for (ConsumerRecord record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); } } } private Properties consumerConfigs() { Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); return props; } }
Kafka和RocketMQ都是开源的消息队列系统,它们具有许多相似之处,但在一些关键方面也存在差异。以下是它们在数据可靠性、性能、消息传递方式等方面的比较:
下一篇:RustDesk 搭建