背景
工作中Java开发大部分项目可能都是使用spring/springboot,好处就是可以很容易的集成其他技术或中间件。本文通过源码讲解了springboot集成kafka时如何消费的。
实例
pom.xml
org.springframework.kafka
spring-kafka
application.yml
spring:
kafka:
bootstrap-servers: 127.0.0.1:9092
consumer:
group-id: cosnumer-group
enable-auto-commit: true
auto-commit-interval: 100ms
properties:
session.timeout.ms: 15000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
auto-offset-reset: earliest
producer:
retries: 0 #若设置大于0的值,客户端会将发送失败的记录重新发送
batch-size: 16384 #当将多个记录被发送到同一个分区时, Producer 将尝试将记录组合到更少的请求中。这有助于提升客户端和服务器端的性能。这个配置控制一个批次的默认大小(以字节为单位)。16384是缺省的配置
buffer-memory: 33554432 #Producer 用来缓冲等待被发送到服务器的记录的总字节数,33554432是缺省配置
key-serializer: org.apache.kafka.common.serialization.StringSerializer #关键字的序列化类
value-serializer: org.apache.kafka.common.serialization.StringSerializer #值的序列化类
KafkaConsumerListener.java
/**
* 自定义bean
**/
@Component
public class KafkaConsumerListener {
// 加个注解即可实现监听消费
@KafkaListener(topics = "app-test")
public void receive(ConsumerRecord,> record) {
// handle
}
}
可以看出通过非常简单的代码就可以实现KafkaConsumer的功能。
原理
Springboot项目的主函数(Main)一般都是SpringApplication.run(xx.class, args)。看过源码的应该都知道,springboot项目启动过程其实核心是在ApplicationContext中完成的,主要流程如下:
@Override
public void refresh() throws BeansException, IllegalStateException {
synchronized (this.startupShutdownMonitor) {
// Prepare this context for refreshing.
prepareRefresh();
// Tell the subclass to refresh the internal bean factory.
ConfigurableListableBeanFactory beanFactory = obtainFreshBeanFactory();
// Prepare the bean factory for use in this context.
prepareBeanFactory(beanFactory);
try {
// Allows post-processing of the bean factory in context subclasses.
postProcessBeanFactory(beanFactory);
// Invoke factory processors registered as beans in the context.
invokeBeanFactoryPostProcessors(beanFactory);
// Register bean processors that intercept bean creation.
registerBeanPostProcessors(beanFactory);
// Initialize message source for this context.
initMessageSource();
// Initialize event multicaster for this context.
initApplicationEventMulticaster();
// Initialize other special beans in specific context subclasses.
onRefresh();
// Check for listener beans and register them.
registerListeners();
// Instantiate all remaining (non-lazy-init) singletons.
finishBeanFactoryInitialization(beanFactory);
// Last step: publish corresponding event.
finishRefresh();
}
}
}
其中bean的创建和初始化都是在
finishBeanFactoryInitialization这一步完成的,spring bean初始化前后都会有相应的处理(类似于拦截器),见源码:
/**
* Initialize the given bean instance, applying factory callbacks
* as well as init methods and bean post processors.
* Called from {@link #createBean} for traditionally defined beans,
* and from {@link #initializeBean} for existing bean instances.
* @see #applyBeanPostProcessorsBeforeInitialization
* @see #invokeInitMethods
* @see #applyBeanPostProcessorsAfterInitialization
*/
protected Object initializeBean(final String beanName, final Object bean, @Nullable RootBeanDefinition mbd) {
...
// 初始化前置处理
wrappedBean = applyBeanPostProcessorsBeforeInitialization(wrappedBean, beanName);
// 初始化
invokeInitMethods(beanName, wrappedBean, mbd);
// 初始化后置处理
wrappedBean = applyBeanPostProcessorsAfterInitialization(wrappedBean, beanName);
return wrappedBean;
}
如果bean的方法中有使用注解@KafkaListener,则会在
KafkaListenerAnnotationBeanPostProcessor#
postProcessAfterInitialization 方法中做相应的处理
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
...
// 找出使用KafkaListener注解的方法
Map<Method, Set> annotatedMethods = xxx;
for (Map.Entry<Method, Set> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (KafkaListener listener : entry.getValue()) {
// 做相应的处理
processKafkaListener(listener, method, bean, beanName);
}
}
}
protected void processListener(MethodKafkaListenerEndpoint,> endpoint, KafkaListener kafkaListener,
Object bean, Object adminTarget, String beanName) {
...
// 注册 KafkaEndpoint(记录consumer信息)
this.registrar.registerEndpoint(endpoint, factory);
}
上面的操作已经记录了所有Kafka Consumer的信息(入口是解析@KafkaListener)。
在所有bean创建并初始化之后会调用bean的afterPropertiesSet方法:
@Override
public void afterPropertiesSet() {
registerAllEndpoints();
}
这一步会逐一创建对应的
KafkaMessageListenerContainer(
KafkaListenerEndpointRegistry
#registerListenerContainer),记录所有的kafkaListenerContainer
/**
* Create a message listener container for the given {@link KafkaListenerEndpoint}.
* This create the necessary infrastructure to honor that endpoint
* with regards to its configuration.
*/
@SuppressWarnings("unchecked")
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory> factory,
boolean startImmediately) {
// KafkaListenerContainerFactory#createListenerContainer(endpoint)
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
}
在完成bean初始化之后(包括后置处理),会在finishRefresh这一步中启动一些继承了Lifecycle的bean
/**
* Start the specified bean as part of the given set of Lifecycle beans,
* making sure that any beans that it depends on are started first.
*/
private void doStart(Map lifecycleBeans, String beanName, boolean autoStartupOnly) {
Lifecycle bean = lifecycleBeans.remove(beanName);
bean.start();
}
KafkaListenerEndpointRegistry就继承了Lifecycle,所以相应的start方法就会调用
@Override
public void start() {
for (MessageListenerContainer listenerContainer : getListenerContainers()) {
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
listenerContainer.start();
}
}
this.running = true;
}
从源码中可以看到,依次会启动
KafkaMessageListenerContainer
// 启动KafkaConsumer
@Override
protected void doStart() {
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
// 设置为可执行的状态,拉取数据的时候会用到
setRunning(true);
this.listenerConsumerFuture = containerProperties
.getConsumerTaskExecutor()
.submitListenable(this.listenerConsumer);
}
KafkaConsumer线程执行,就可以不断消费到数据了
@Override
public void run() {
// running = true 开启拉取数据
while (isRunning()) {
// 拉取数据
pollAndInvoke();
}
}
总结
springboot很容易的集成了kafka组件,kafka producer和consumer都帮我们封装好了,我们使用的时候只需要简单的配置和修改就可以运行了。但是其中的一些原理还得我们通过分析源码才能看出底层做了什么手脚,本文就通过源码介绍了项目启动过程中Kafka consumer是如何启动的。