我们专注攀枝花网站设计 攀枝花网站制作 攀枝花网站建设
成都网站建设公司服务热线:400-028-6601

网站建设知识

十年网站开发经验 + 多家企业客户 + 靠谱的建站团队

量身定制 + 运营维护+专业推广+无忧售后,网站问题一站解决

向kafka集群发布记录的kafka客户端怎么实现

这篇文章主要介绍“向kafka集群发布记录的kafka客户端怎么实现”,在日常操作中,相信很多人在向kafka集群发布记录的kafka客户端怎么实现问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”向kafka集群发布记录的kafka客户端怎么实现”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

10年积累的网站制作、成都网站设计经验,可以快速应对客户对网站的新想法和需求。提供各种问题对应的解决方案。让选择我们的客户得到更好、更有力的网络服务。我虽然不认识你,你也不认识我。但先网站制作后付款的网站建设流程,更有昭阳免费网站建设让你可以放心的选择与我们合作。

生产者是线程安全的,而且,多线程共享同一个producer实例通常比多个producer实例更快。

这里是一个简单的例子,使用producer发送字符串数据,包含key和value。

Properties props = new Properties();
 props.put("bootstrap.servers", "localhost:9092");
 props.put("acks", "all");
 props.put("retries", 0);
 props.put("batch.size", 16384);
 props.put("linger.ms", 1);
 props.put("buffer.memory", 33554432);
 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

 Producer producer = new KafkaProducer<>(props);
 for(int i = 0; i < 100; i++)
     producer.send(new ProducerRecord("my-topic", Integer.toString(i), Integer.toString(i)));

 producer.close();

一个producer由几部分组成:1、一个buff poll,保存尚未发送的数据;2、一个后台运行的I/O线程,负责执行数据发送。producer使用完毕后,务必执行close操作,否则将会造成资源的泄漏。

send()方法是异步的。当调用它时,它将记录添加到缓冲区中,并立即返回。这使得producer能够批量的执行数据的生产。

acks有3个可能的值,0:客户端不必等待任何的server响应;1:leader of partition将会在把数据写入自己的log之后,响应客户端,而不必等待其他的follower完成同步的操作;all:leader和follower全部完成log写入操作。服务器才会响应客户端。相比之下,all最慢但是可靠性更好。

如果请求失败,生产者可以自动重试,但是我们已经设置retries = 0,那么重试将不会发生。如果我们开启了重试,可能会出现重复记录的问题。

producer保持每个partition的未发送数据的缓冲区。这些缓冲的大小由batch.size配置指定。如果增大这个配置,可以一次性执行更大的批量操作,但需要更多的内存(因为我们通常会有一个缓冲区为每个partition)。

默认情况下,缓冲区可以立即发送,即使在缓冲区中有额外的未使用的空间。但是如果你想减少请求的数量,可以设置linger.ms > 0。producer会等待一段时间(单位是毫秒)之后在进行发送,以期获得更大的批量操作。例如,在上面的代码片段,设置linger.ms = 1, 可能会有100条记录被批量发送。但是,如果在1毫秒的时间内,没有跟多的数据到达缓冲区,那么这1毫秒的等待仅仅是增加了延迟,而没有达到任何正面的效果。需要注意的是,如果在短时间内,大量的数据到达缓冲区,即使 linger.ms = 0 ,仍然会发生批量操作。

buffer.memory控制提供给producer的缓冲内存总量,如果该缓冲区的写入速率长时间大于输出速率,那么这个缓冲区将耗尽。当缓冲区耗尽后,额外的发送调用将被阻塞。阻塞一段时间之后(max.block.ms ),将会抛出一个TimeoutException。

key.serializervalue.serializer负责把record当中key和value 分别转换为byte数组,kafka提供了一组简单的序列化class。

到此,关于“向kafka集群发布记录的kafka客户端怎么实现”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注创新互联网站,小编会继续努力为大家带来更多实用的文章!


网页标题:向kafka集群发布记录的kafka客户端怎么实现
URL标题:http://shouzuofang.com/article/ieiosg.html

其他资讯