请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

老师,我使用的storm是1.1.1,kafka是1.0.0。重启worker时,数据丢失,也就是说spout没有重发数据。请问老师,这该怎么配置?非常感谢

我的kafkaspout配置为:

KafkaSpoutConfig<String,String> build = KafkaSpoutConfig.builder("192.168.0.99:9092",topic)                 .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)                 .setGroupId("testGroup")

.build();             

我在重启执行worker时,数据是丢失的。在bolt里添加ack与否都是数据丢失。


正在回答 回答被采纳积分+3

1回答

Michael_PK 2018-08-30 11:58:05

首先你是如何确认丢失了?在生产是肯定是要ack的,不然真丢了,

0 回复 有任何疑惑可以回复我~
  • 提问者 慕哥8850026 #1
    老师好,我用的是测试环境。
    1)kafka producer每隔两秒发送一次数据,数据有编号的,storm将收到的数据打印出来。
    2)运行过程中,我将storm的worker进程kill掉,storm集群会重新启动该worker进程,发现新打印出来的数据和woker进程kill前的数据编号有丢失。
    
    下面的回复是代码,请老师帮忙分析下原因呗,非常感谢。
    回复 有任何疑惑可以回复我~ 2018-09-10 18:07:15
  • 提问者 慕哥8850026 #2
    public class StormKafkaTopo {
    
    	public static void main(String[] args) throws Exception {
    		String topic = "project_topic";
    		
    		TopologyBuilder builder = new TopologyBuilder();
    		
    		KafkaSpoutConfig<String,String> build = KafkaSpoutConfig.builder("192.168.0.99:9092",topic)
    				.setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
    				.setGroupId("testGroup")
    				.setOffsetCommitPeriodMs(10000)
    				.setMaxUncommittedOffsets(250)
    				.build();
    				
    		KafkaSpout<String, String> kafkaSpout = new KafkaSpout<String, String>(build);
    		builder.setSpout("kafkaspout", kafkaSpout);
    		
    		builder.setBolt("printBolt", new PrintBolt()).shuffleGrouping("kafkaspout");
    		
    		Config config = new Config();
    		
    		if(args.length > 0) {
    			
    			config.setDebug(false);
    //			config.setNumWorkers(2);
    			
    			StormSubmitter.submitTopology(args[0], config, builder.createTopology());
    		} else {
    			config.setDebug(true);
    			config.setNumWorkers(2);
    			
    			LocalCluster cluster = new LocalCluster();
    			cluster.submitTopology("kafkaSpout", config, builder.createTopology());
    		}
    
    	}
    
    }
    回复 有任何疑惑可以回复我~ 2018-09-10 18:07:45
  • 提问者 慕哥8850026 #3
    public class PrintBolt extends BaseRichBolt {
    	
    	private OutputCollector collector;
    
    	public void execute(Tuple tuple) {
    		try {
    			String str = tuple.getString(4);
    
    			 JSONObject jsonObject = JSON.parseObject(str);
    				
    			 Set<String> jsonSet = jsonObject.keySet();
    			 
    			 Iterator<String> iterator = jsonSet.iterator();
    			 while(iterator.hasNext()){
    				 
    				 String strIterator = iterator.next();
    				 
    				 int value = jsonObject.getInteger(strIterator);
    				 System.out.println(str+ ",,,, " + value);
    			 }
    			
    			this.collector.ack(tuple);
    		} catch (Exception e) {
    			this.collector.fail(tuple);
    			System.out.println("******: " + tuple.getString(4));
    			System.out.println("................................");
    		}
    		
    		System.out.println();
    	}
    
    	public void prepare(Map map, TopologyContext context, OutputCollector collector) {
    		
    		this.collector = collector;
    	}
    
    	public void declareOutputFields(OutputFieldsDeclarer declarer) {
    //		declarer.declare(new Fields("intsmaze"));
    		
    	}
    }
    回复 有任何疑惑可以回复我~ 2018-09-10 18:09:21
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信