spring boot 使用 Kafka
创始人
2024-11-05 22:09:09
0

一、Kafka作为消息队列的好处

  1. 高吞吐量:Kafka能够处理大规模的数据流,并支持高吞吐量的消息传输。

  2. 持久性:Kafka将消息持久化到磁盘上,保证了消息不会因为系统故障而丢失。

  3. 分布式:Kafka是一个分布式系统,可以在多个节点上运行,具有良好的可扩展性和容错性。

  4. 支持多种协议:Kafka支持多种协议,如TCP、HTTP、UDP等,可以与不同的系统进行集成。

  5. 灵活的消费模式:Kafka支持多种消费模式,如拉取和推送,可以根据需要选择合适的消费模式。

  6. 可配置性强:Kafka的配置参数非常丰富,可以根据需要进行灵活配置。

  7. 社区支持:Kafka作为Apache旗下的开源项目,拥有庞大的用户基础和活跃的社区支持,方便用户得到及时的技术支持。

二、springboot中使用Kafka

  1. 添加依赖:在pom.xml文件中添加Kafka的依赖,包括spring-kafka和kafka-clients。确保版本与你的项目兼容。

  2. 创建生产者:创建一个Kafka生产者类,实现Producer接口,并使用KafkaTemplate发送消息。

  3. 配置生产者:在Spring Boot的配置文件中配置Kafka生产者的相关参数,例如bootstrap服务器地址、Kafka主题等。

  4. 发送消息:在需要发送消息的地方,注入Kafka生产者,并使用其发送消息到指定的Kafka主题。

  5. 创建消费者:创建一个Kafka消费者类,实现Consumer接口,并使用KafkaTemplate订阅指定的Kafka主题。

  6. 配置消费者:在Spring Boot的配置文件中配置Kafka消费者的相关参数,例如group id、auto offset reset等。

  7. 接收消息:在需要接收消息的地方,注入Kafka消费者,并使用其接收消息。

  8. 处理消息:对接收到的消息进行处理,例如保存到数据库或进行其他业务逻辑处理。

三、使用Kafka

pom中填了依赖

       org.springframework.kafka       spring-kafka       2.8.1             org.apache.kafka       kafka-clients       2.8.1   
  1. 创建生产者:创建一个Kafka生产者类,实现Producer接口,并使用KafkaTemplate发送消息。

import org.apache.kafka.clients.producer.*;   import org.springframework.beans.factory.annotation.Value;   import org.springframework.kafka.core.KafkaTemplate;   import org.springframework.stereotype.Component;      @Component   public class KafkaProducer {       @Value("${kafka.bootstrap}")       private String bootstrapServers;          @Value("${kafka.topic}")       private String topic;          private KafkaTemplate kafkaTemplate;          public KafkaProducer(KafkaTemplate kafkaTemplate) {           this.kafkaTemplate = kafkaTemplate;       }          public void sendMessage(String message) {           Producer producer = new KafkaProducer<>(bootstrapServers, new StringSerializer(), new StringSerializer());           try {               producer.send(new ProducerRecord<>(topic, message));           } catch (Exception e) {               e.printStackTrace();           } finally {               producer.close();           }       }   }
  1. 配置生产者:在Spring Boot的配置文件中配置Kafka生产者的相关参数,例如bootstrap服务器地址、Kafka主题等。

import org.springframework.context.annotation.Bean;   import org.springframework.context.annotation.Configuration;   import org.springframework.kafka.core.DefaultKafkaProducerFactory;   import org.springframework.kafka.core.KafkaTemplate;   import org.springframework.kafka.core.ProducerFactory;   import org.springframework.kafka.core.DefaultKafkaConsumerFactory;   import org.springframework.kafka.core.ConsumerFactory;   import org.springframework.kafka.core.ConsumerConfig;   import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;   import org.springframework.kafka.listener.MessageListener;   import org.springframework.context.annotation.PropertySource;   import java.util.*;   import org.springframework.beans.factory.*;   import org.springframework.*;   import org.springframework.*;expression.*;value; 																																		 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	 	  @Value("${kafka}")   Properties kafkaProps = new Properties(); @Bean public KafkaTemplate kafkaTemplate(ProducerFactory pf){ KafkaTemplate template = new KafkaTemplate<>(pf); template .setMessageConverter(new StringJsonMessageConverter()); template .setSendTimeout(Duration .ofSeconds(30)); return template ; } @Bean public ProducerFactory producerFactory(){ DefaultKafkaProducerFactory factory = new DefaultKafkaProducerFactory<>(kafkaProps); factory .setBootstrapServers(bootstrapServers); factory .setKeySerializer(new StringSerializer()); factory .setValueSerializer(new StringSerializer()); return factory ; } @Bean public ConsumerFactory consumerFactory(){ DefaultKafkaConsumerFactory factory = new DefaultKafkaConsumerFactory<>(consumerConfigProps); factory .setBootstrapServers(bootstrapServers); factory .setKeyDeserializer(new StringDeserializer()); factory .setValueDeserializer(new StringDeserializer()); return factory ; } @Bean public ConcurrentMessageListenerContainer container(ConsumerFactory consumerFactory, MessageListener listener){ ConcurrentMessageListenerContainer container = new ConcurrentMessageListenerContainer<>(consumerFactory); container .setMessageListener(listener); container .setConcurrency(3); return container ; } @Bean public MessageListener

消费者

import org.apache.kafka.clients.consumer.*;   import org.springframework.kafka.core.KafkaTemplate;   import org.springframework.stereotype.Component;      @Component   public class KafkaConsumer {       @Value("${kafka.bootstrap}")       private String bootstrapServers;          @Value("${kafka.group}")       private String groupId;          @Value("${kafka.topic}")       private String topic;          private KafkaTemplate kafkaTemplate;          public KafkaConsumer(KafkaTemplate kafkaTemplate) {           this.kafkaTemplate = kafkaTemplate;       }          public void consume() {           Consumer consumer = new KafkaConsumer<>(consumerConfigs());           consumer.subscribe(Collections.singletonList(topic));           while (true) {               ConsumerRecords records = consumer.poll(Duration.ofMillis(100));               for (ConsumerRecord record : records) {                   System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());               }           }       }          private Properties consumerConfigs() {           Properties props = new Properties();           props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);           props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);           props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");           props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");           return props;       }   }

四、kafka与rocketMQ比较

Kafka和RocketMQ都是开源的消息队列系统,它们具有许多相似之处,但在一些关键方面也存在差异。以下是它们在数据可靠性、性能、消息传递方式等方面的比较:

  1. 数据可靠性:
  • Kafka使用异步刷盘方式,而RocketMQ支持异步实时刷盘、同步刷盘、同步复制和异步复制。这使得RocketMQ在单机可靠性上比Kafka更高,因为它不会因为操作系统崩溃而导致数据丢失。此外,RocketMQ新增的同步刷盘机制也进一步保证了数据的可靠性。
  1. 性能:
  • Kafka和RocketMQ在性能方面各有千秋。由于Kafka的数据以partition为单位,一个Kafka实例上可能有多达上百个partition,而一个RocketMQ实例上只有一个partition。这使得RocketMQ可以充分利用IO组的commit机制,批量传输数据,从而在replication时具有更好的性能。然而,Kafka的异步replication性能理论上低于RocketMQ的replication,因为同步replication与异步replication相比,性能上会有约20%-30%的损耗。
  1. 消息传递方式:
  • Kafka和RocketMQ在消息传递方式上也有所不同。Kafka采用Producer发送消息后,broker马上把消息投递给consumer,这种方式实时性较高,但会增加broker的负载。而RocketMQ基于Pull模式和Push模式的长轮询机制,来平衡Push和Pull模式各自的优缺点。RocketMQ的消息及时性较好,严格的消息顺序得到了保证。
  1. 其他特性:
  • Kafka在单机支持的队列数超过64个队列,而RocketMQ最高支持5万个队列。队列越多,可以支持的业务就越多。

五、kafka使用场景

  1. 实时数据流处理:Kafka可以处理大量的实时数据流,这些数据流可以来自不同的源,如用户行为、传感器数据、日志文件等。通过Kafka,可以将这些数据流进行实时的处理和分析,例如进行实时数据分析和告警。
  2. 消息队列:Kafka可以作为一个消息队列使用,用于在分布式系统中传递消息。它能够处理高吞吐量的消息,并保证消息的有序性和可靠性。
  3. 事件驱动架构:Kafka可以作为事件驱动架构的核心组件,将事件数据发布到不同的消费者,以便进行实时处理。这种架构可以简化应用程序的设计和开发,提高系统的可扩展性和灵活性。
  4. 数据管道:Kafka可以用于数据管道,将数据从一个系统传输到另一个系统。例如,可以将数据从数据库或日志文件传输到大数据平台或数据仓库。
  5. 业务事件通知:Kafka可以用于通知业务事件,例如订单状态变化、库存更新等。通过订阅Kafka主题,相关的应用程序和服务可以实时地接收到这些事件通知,并进行相应的处理。
  6. 流数据处理框架集成:Kafka可以与流处理框架集成,如Apache Flink、Apache Spark等。通过集成,可以将流数据从Kafka中实时导入到流处理框架中进行处理,实现流式计算和实时分析。

 

相关内容

热门资讯

ia攻略/牛牛房卡代理九酷大厅... 今 日消息,九酷大厅/随意玩房卡添加微信33549083 苹果今日发布了 iOS 16.1 正式版更...
我来教你/金花房卡专卖店新超圣... 新超圣房卡更多详情添加微:33549083、 2、在商城页面中选择房卡选项。 3、根据...
正规平台有哪些,牛牛房卡怎么获... 微信游戏中心:青鸟大厅房卡在哪里买打开微信,添加客服微信【88355042】,进入游戏中心或相关小程...
一分钟实测分享”热玩吧房卡怎么... 一分钟实测分享”热玩吧房卡怎么弄“牛牛房卡哪里有卖游戏中心打开微信,添加客服【113857776】,...
一分钟了解“如何购买金花房卡普... 悠悠大厅是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:160470940许多玩家在游戏中会购买房卡...
IA解析/斗牛房卡充值天道联盟... IA解析/斗牛房卡充值天道联盟/随意玩/房卡怎么买Sa9Ix苹果iPhone 17手机即将进入量产阶...
我来教你/牛牛充值房卡新竹大厅... 今 日消息,新竹大厅房卡添加微信33549083 苹果今日发布了 iOS 16.1 正式版更新,简单...
玩家攻略”王者大厅房卡“牛牛房... 玩家攻略”王者大厅房卡“牛牛房卡哪里有卖 微信牛牛房卡客服微信号微信游戏中心打开微信,添加客服【11...
推荐一款!牛牛房卡代理天蝎大厅... 微信游戏中心:天蝎大厅房卡在哪里买打开微信,添加客服微信【88355042】,进入游戏中心或相关小程...
重大通报,金花充值房卡星驰娱乐... 星驰娱乐是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:【3329006910】或QQ:332900...
科技实测!金花房卡出售新二号/... 您好!微信新二号大厅链接获取房卡可以通过以下几种方式购买: 1.微信渠道:(新二号)大厅介绍:咨询...
终于找到“微信斗牛房卡如何购买... 微信斗牛是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:86909166许多玩家在游戏中会购买房卡来...
重大通报,金花房卡是正规的新道... 今 日消息,新道游/新皇豪房卡添加微信33549083 苹果今日发布了 iOS 16.1 正式版更新...
一秒了解”百万牛房卡获取“哪里... 房卡获取是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:113857776许多玩家在游戏中会购买房卡...
我来教你/斗牛房间怎么创建的南... 南瓜大厅/新道游房卡更多详情添加微:33549083、 2、在商城页面中选择房卡选项。 ...
重大通报,牛牛房卡制作链接新众... 微信游戏中心:新众亿/皇豪互娱房卡在哪里买打开微信,添加客服微信【88355042】,进入游戏中心或...
ia实测“微信链接斗牛房卡多少... 斗牛是一款非常受欢迎的棋牌游戏,咨询房/卡添加微信:15984933许多玩家在游戏中会购买房卡来享受...
正版授权!金花房卡专卖店鲨鱼众... 您好!微信鲨鱼众娱大厅链接获取房卡可以通过以下几种方式购买: 1.微信渠道:(鲨鱼众娱)大厅介绍:...
实测分享”辣椒互娱房卡详细充值... 实测分享”辣椒互娱房卡详细充值“牛牛房卡批发平台游戏中心打开微信,添加客服【113857776】,进...
IA解析/牛牛房卡怎么获得天酷... 天酷大厅房卡更多详情添加微:33549083、 2、在商城页面中选择房卡选项。 3、根...