请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

storm读取增量数据导致触发从头到尾的计算

我用storm读取hbase数据,为使其达到增量读取进行实时计算,在取数据的时候将获得到的hbase行键(升序时间戳)赋值给static变量times。

static String times = “” ;

同时在从hbase取数据之前建立行键过滤器

Filter filter1 = new RowFilter(CompareFilter.CompareOp.GREATER,
new BinaryComparator(Bytes.toBytes(times)));

当数据读取完之后,times按道理来讲是行键中最大的值了。

times =new SimpleDateFormat(“yyyy-MM-dd HH:mm:ss.0”).format(InsertTimeToHis);
InsertTimeToHis是行键值

当数据源hbase数据发生改变时,我的想法是从hbase拿到的行键去和times比较,大于它的才会触发任务进行计算。
实际情况中,我hbase数据源的增量导入sqoop脚本一运行,storm就不管times和行键的大小,从等待任务中运行,将数据全部重新计算。这个问题请问如何解决?实现行键大于times才进行运算。。。。。。

正在回答

1回答

Michael_PK 2019-11-20 15:44:47

1) 首先storm只管处理数据,你进来的数据的时间sqoop过来的 是否还正确? 只要进来的是正确的,那么按照你的思路实现是可以的,但是不确定过的你业务逻辑实现的是否真的是你想要的,所以建议,自己造点数据,debug跑下,你的代码是否是真的按照你的思路实现的对不对

2)正常情况下,只要rowkey设计好,rowkey一般是一个组装的值,是可以做到 把你想要的值直接在hbase中覆盖掉


0 回复 有任何疑惑可以回复我~
  • 提问者 慕函数7570418 #1
    我sqoop中的sql语句有order by时间戳,在hbase表中也是升序的。时间戳格式就是yyyy-MM-dd HH:mm:ss.0 这种格式。我也debug过,单单spout拿数据是使用迭代器的,那数据拿取就是顺序性的,至于放后面传的顺序可以不用管。然而目前就是我说的那样,sqoop脚本跑起来他就不管过滤器,全部重新计算一遍。这就是困惑不解的地方了
    回复 有任何疑惑可以回复我~ 2019-11-20 16:23:39
  • Michael_PK 回复 提问者 慕函数7570418 #2
    那问题估计还是出在sqoop出来的数据的问题上,要是OK,那就业务逻辑写的有问题。大概就只有这两个地方出问题了
    回复 有任何疑惑可以回复我~ 2019-11-20 17:39:33
  • 提问者 慕函数7570418 回复 Michael_PK #3
    public class DataSpout01 extends BaseRichSpout{
    	
      private static final long serialVersionUID = 3051457019378793963L;
      private static final String HBASE_ROOT_DIR = "hdfs://hadoop01:9000/hbase";
      private SpoutOutputCollector collector;
      Connection connection = null;
      Configuration config = null;
      Lock lock = new ReentrantLock();
      //定义变量缓存行键值
      static String times = "" ;
      
      @Override
      public void nextTuple(){
    	  
        this.lock.lock();
        try{
          ResultScanner orderRs = null;
          Table orderInfoTable = null;
          try{
        	  //创建连接,获取hbase表
            this.connection = ConnectionFactory.createConnection(this.config);
            orderInfoTable = this.connection.getTable(TableName.valueOf("Jieziw_dev01:order_info0101"));
            //定义行键过滤器
            Filter filter1 = new RowFilter(CompareFilter.CompareOp.GREATER,
                    new BinaryComparator(Bytes.toBytes(times)));
            
            Scan scan = new Scan();
            scan.setFilter(filter1);
            orderRs = orderInfoTable.getScanner(scan);
          }catch (IOException e1){
            e1.printStackTrace();
          }
    回复 有任何疑惑可以回复我~ 2019-11-20 17:56:19
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信