kafka怎么读(在spring怎么读取kafka文件)
本文目录
在spring怎么读取kafka文件
ound Channel Adapter用来发送消息到Kafka。 消息从Spring Integration Channel中读取。 你可以在Spring application context指定这个channel。一旦配置好这个Channel,就可以利用这个Channel往Kafka发消息。 明显地,Spring Integration特定的消息发送给这个Adaptor,然后发送前在内部被转为Kafka消息。当前的版本要求你必须指定消息key和topic作为头部数据 (header),消息作为有载荷(payload)。例如
请教一个关于使用spark 读取kafka只能读取一个分区数据的问题
我先写了一个kafka的生产者程序,然后写了一个kafka的消费者程序,一切正常。生产者程序生成5条数据,消费者能够读取到5条数据。然后我将kafka的消费者程序替换成使用spark的读取kafka的程序,重复多次发现每次都是读取1号分区的数据,而其余的0号和2号2个分区的数据都没有读到。请哪位大侠出手帮助一下。 我使用了三台虚拟机slave122,slave123,slave124作为kafka集群和zk集群;然后生产者和消费者程序以及spark消费者程序都是在myeclipse上完成。 软件版本为:kafka_2.11-0.10.1.0,spark-streaming-kafka-0-10_2.11-2.1.0,zookeeper-3.4.9 spark消费者程序主要代码如下:Map《String, Object》 kafkaParams = new HashMap《》();kafkaParams.put(“bootstrap.servers“, “slave124:9092,slave122:9092,slave123:9092“);kafkaParams.put(“key.deserializer“, “org.apache.kafka.common.serialization.StringDeserializer“);kafkaParams.put(“value.deserializer“,“org.apache.kafka.common.serialization.StringDeserializer“);kafkaParams.put(“group.id“, “ssgroup“);kafkaParams.put(“auto.offset.reset“, “earliest“); //update mykafka,“earliest“ from the beginning,“latest“ from the rear of topickafkaParams.put(“enable.auto.commit“, “true“); //messages successfully polled by the consumer may not yet have resulted in a Spark output operation, resulting in undefined semanticskafkaParams.put(“auto.commit.interval.ms“, “5000“);// Create a local StreamingContext with two working thread and batch interval of 2 secondSparkConf conf = new SparkConf();//conf被set后,返回新的SparkConf实例,所以多个set必须连续,不能拆开。conf.setMaster(“local“).setAppName(“streaming word count“).setJars(new String{“D:\\Workspaces\\MyEclipse 2015\\MyFirstHadoop\\bin\\MyFirstHadoop.jar“});;try{JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));Collection《String》 topics = new HashSet《》(Arrays.asList(“order“));JavaInputDStream《ConsumerRecord《String, String》》 oJInputStream = KafkaUtils.createDirectStream(jssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.《String, String》Subscribe(topics, kafkaParams));JavaPairDStream《String, String》 pairs = oJInputStream.mapToPair(new PairFunction《ConsumerRecord《String, String》, String, String》() {private static final long serialVersionUID = 1L; @Override public Tuple2《String, String》 call(ConsumerRecord《String, String》 record) { try {BufferedWriter oBWriter = new BufferedWriter(new FileWriter(“D:\\Workspaces\\MyEclipse 2015\\MyFirstHadoop\\bin\\mysparkstream\\MyFirstHadoop.out“,true)); String strLog = “^^^^^^^^^^^ “ + System.currentTimeMillis() / 1000 + “ mapToPair:topic:“ + record.topic() + “,key:“ + record.key() + “,value:“ + record.value() + “,partition id:“ + record.partition() + “,offset:“ + record.offset() + “.\n“; System.out.println(strLog); oBWriter.write(strLog); oBWriter.close(); } catch (IOException e) {// TODO Auto-generated catch blocke.printStackTrace();} return new Tuple2《》(record.key(), record.value()); }});pairs.print();jssc.start(); //start here in factjssc.awaitTermination();jssc.close();}catch(Exception e){// TODO Auto-generated catch blockSystem.out.println(“Exception:throw one exception“);e.printStackTrace();}
kafkaesque什么意思
Kafkaesque adj. 卡夫卡式;受压抑和恶梦般的所谓“卡夫卡式”(Kafkaesque)一语,成了人在遥远而荒诞的、强制性的外力面前孑然无助的写照,所有对非人性化公司、官僚机构或集权统治有过切身感受的现代读者供你参考,祝你开心。
更多文章:
河南一地发布疫情(2020年疫情解封后河南省许昌市什么时间学生开学)
2024年7月12日 02:58
人事管理系统数据库设计(数据库人事管理系统怎么做急求 oracle)
2024年6月28日 00:24
electronic所有形式(初中英语electricity的用法)
2024年7月24日 04:12
redirected(forward 和redirect的区别)
2024年6月7日 05:46
isnumber函数的使用方法及实例(c语言中的isnumber函数的作用)
2024年7月23日 05:57
schedule中文翻译(手账本里面的:schedule和itinerary item 是什么意思,怎么翻译)
2024年4月4日 06:14
正则表达式生成工具(f12 开发调试工具怎么调试正则表达式)
2024年9月30日 02:05
html5颜色怎么写(html5设置颜色 <p>智能电地暖</p> 这个表示设置背景色)
2024年9月1日 02:45
jdk版本越高越好吗(jdk的版本是越高越好吗为什么有人说jdk1.5才是最好用的版本呢)
2024年7月7日 14:14
个人博客选择wordpress还是typecho好呢?emlog可以转到dedecms吗
2024年6月8日 15:45
在微机中,数据总线可以传输地址信号和数据信息吗?在微机中主机由微处理器和什么组成
2024年7月24日 08:09