分享好友 最新动态首页 最新动态分类 切换频道
史上最轻便好用的kafka ui界面可视化图形界面工具
2024-12-27 12:17
package com.jq.kafkaui.util;

史上最轻便好用的kafka ui界面可视化图形界面工具

import com.alibaba.fastjson.JSONObject; import com.jq.kafkaui.domain.Topic; import com.jq.kafkaui.dto.ResponseDto; import com.jq.kafkaui.dto.SourceInfo; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.admin.*; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.protocol.types.Field; import org.springframework.util.StringUtils; import java.util.*; import java.util.concurrent.ExecutionException; import java.util.stream.Collectors; @Slf4j public class KafkaUtil { public static AdminClient createAdminClientByProperties(SourceInfo sourceInfo) { Properties prop = getCommonProperties(sourceInfo); prop.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, sourceInfo.getBroker()); prop.setProperty(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "2000"); prop.setProperty(AdminClientConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, "2000"); return AdminClient.create(prop); } private static Properties getCommonProperties(SourceInfo sourceInfo) { Properties prop = new Properties(); String userName = sourceInfo.getUserName(); String password = sourceInfo.getPassword(); if (!StringUtils.isEmpty(userName) && !StringUtils.isEmpty(password)) { prop.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=" + userName + " password=" + password + ";"); prop.put("security.protocol", "SASL_PLAINTEXT"); prop.put("sasl.mechanism", "PLAIN"); } return prop; } public static ResponseDto listTopicsWithOptions(SourceInfo sourceInfo, String keyword) { AdminClient adminClient = null; try { // 创建AdminClient客户端对象 adminClient = createAdminClientByProperties(sourceInfo); ListTopicsOptions options = new ListTopicsOptions(); // 列出内部的Topic options.listInternal(true); // 列出所有的topic ListTopicsResult result = adminClient.listTopics(options); Collection<TopicListing> topicListings = result.listings().get(); List<Topic> collect = topicListings.stream().map(t -> { Topic topic = new Topic(); topic.setName(t.name()); topic.setInternal(t.isInternal()); return topic; }).sorted(Comparator.comparing(t -> t.getName())).collect(Collectors.toList()); if (keyword != null) { collect = collect.stream().filter(t -> t.getName().contains(keyword)).collect(Collectors.toList()); } ResponseDto success = ResponseDto.success(collect); return success; } catch (Exception e) { log.error(e.getMessage(), e); return ResponseDto.fail(e.getMessage()); } finally { adminClient.close(); } } public static void createTopic(SourceInfo sourceInfo, String topic, Integer partition, Integer replica) throws Exception { AdminClient adminClient = null; try { adminClient = createAdminClientByProperties(sourceInfo); List<NewTopic> topicList = new ArrayList(); NewTopic newTopic = new NewTopic(topic, partition, replica.shortValue()); topicList.add(newTopic); CreateTopicsResult result = adminClient.createTopics(topicList); result.all().get(); result.values().forEach((name, future) -> System.out.println("topicName:" + name)); } catch (Exception e) { } finally { adminClient.close(); } } public static Producer<String, String> getProducer(SourceInfo sourceInfo) { Properties props = getCommonProperties(sourceInfo); props.put("bootstrap.servers", sourceInfo.getBroker()); props.put("acks", "all"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); return producer; } public static KafkaConsumer<String, String> getConsumer(SourceInfo sourceInfo, String topic, String group, String offset) { Properties props = getCommonProperties(sourceInfo); props.setProperty("bootstrap.servers", sourceInfo.getBroker()); props.setProperty("group.id", group); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("auto.offset.reset", offset); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Collections.singleton(topic)); return consumer; } public static KafkaConsumer<String, String> getConsumer(SourceInfo sourceInfo, Collection<String> topics, String group, String offset) { Properties props = getCommonProperties(sourceInfo); props.setProperty("bootstrap.servers", sourceInfo.getBroker()); props.setProperty("group.id", group); props.setProperty("enable.auto.commit", "true"); props.setProperty("auto.commit.interval.ms", "1000"); props.setProperty("auto.offset.reset", offset); props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(topics); return consumer; } public static void main(String[] args) throws Exception { } public static void deleteTopic(SourceInfo sourceInfo, String name) { AdminClient adminClient = createAdminClientByProperties(sourceInfo); List<String> list = new ArrayList<>(); list.add(name); adminClient.deleteTopics(list); adminClient.close(); } public static JSONObject node2Json(Node node) { JSONObject leaderNode = new JSONObject(); leaderNode.put("id", node.id()); leaderNode.put("host", node.host()); leaderNode.put("port", node.port()); leaderNode.put("rack", node.rack());
最新文章
置顶【商家券API】常见问题官方精选热门
Q1:商家券接口文档参数字段”适用商品范围goods_name”是在哪里展示的?A1:在商家券详情里的优惠说明展示,具体展示规则如下:换购券:“商家券批次名称stock_name”和“适用商品范围goods_name”拼接满减券:适用商品范围goods_name折扣
新奥精准资料免费大全,可持续执行探索_免费版46.676
随着数字时代的到来,数据的获取和分析成为了企业决策的重要依据。新奥精准资料免费大全,免费版46.676,作为一套全面的数据分析工具,为用户提供了强大的数据支持。本文将详细介绍这一工具的特点、功能以及如何可持续地执行探索。新奥精准
自我提升的4个好方法
月5停止无意义的抱怨。要明白,无论当下的处境多么艰难,都只是你自己造成的,与别人无关,抱怨只会雪上加霜,并不能带来任何有用的改变。与其怨天尤人,不如停下吐槽的嘴巴,踏踏实实地去做一些能改变生活的事。如果你觉得自己一无是处,
营销推广岗岗位职责
营销推广岗岗位职责15篇  在我们平凡的日常里,接触到岗位职责的地方越来越多,制定岗位职责能够有效的地防止因为职位分配不合理而导致部门之间或是员工之间出现工作推脱、责任推卸等现象发生。一般岗位职责是怎么制定的呢?下面是小编收
百度AI的2020
世界的2020,是充满不确定性的变局之年;中国的2020,是团结一心、共克时艰、于变局中开新局的希望之年;百度 AI 的2020,是坚定信念,拥抱变化,践行“科技为更好”的实干之年。 回望2020年&#x
用AI绘技:一键生成超逼真美女写真,让每个家人都成为画家!
访问搜索引擎:打开你的浏览器,输入“搜狐简单AI”,进入其官方网站或小程序。创建账号并登录:如果你是第一次使用,可以选择用微信或手机号注册一个新账号,随即登录。选择模板:在主界面中,你会看到不同风格的模板,依照需要选择一个适
电商SEO优化攻略,揭秘提升流量与率的黄金秘籍
电商SEO优化是提升店铺流量与率的关键。通过关键词研究、优化产品描述、优化图片和、建立高质量的外链、提升网站速度等方法,可以有效提高店铺在搜索引擎中的排名,吸引更多潜在顾客,从而提高店铺流量与率。掌握SEO优化技巧,让您的电商店
苹果怎么投电视
在智能家居日益普及的今天,将手机屏幕投射到电视上已成为许多用户享受大屏娱乐的常用方式。对于苹果用户来说,将iPhone或iPad的内容投屏到电视上,不仅操作简单,而且体验流畅。以下是几种常见的方法,帮助苹果用户轻松实现投屏。一、AirP
【R80591桌面下载】OPPO R80591桌面10.5.2免费下载
「91桌面V10」造动X计划!全宇宙最最玩的桌面美化APP,畅享铁血战士、中国新说唱、张一山、宋祖儿、油爆叽丁、蘑菇点点、秋田君、汤圆酱等众多知名IP主题,还有更多版权形象神秘企划中!==特色亮点==1.【主题免费啦】高能版本现在开启,大
米菲米索正品全国包邮网上药店——(官方直营商城药店)第一时间发货+正品确保!
米菲米索正品全国包邮网上药店——(官方直营商城药店)第一时间发货+正品确保!,安全的网上药品零售药房,顺丰包邮可货到付款米菲米索正品全国包邮网上药店——(官方直营商城药店)第一时间发货+正品确保! 许多女性就会有着激情过后,因为措
相关文章
推荐文章
发表评论
0评