请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

如何在bulkloadHFile中获取数据触发ES

我将csv文件导入到hbase 使用ImportTsv生成HFile

hbase org.apache.hadoop.hbase.mapreduce.ImportTsv

已经成功,但是我想使用自定义的协处理器 导入之后触发 postBulkLoadHFile 事件,获取到数据后进行插入ES,达到二级缓存的作用。 因为是hfile的 所以它操作的不是表级别的,触发不到 postPut 这个事件,如果单纯的操作表结构是可以的。

现在的问题是 如何获取到数据


public class HbaseDataSyncEsObserver implements RegionObserver, RegionCoprocessor, BulkLoadObserver{

 @Override
    public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
                                  List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths) {

        Map<String, Object> data = new HashMap<>();
        Map<String, Map<String, Object>> esData = new HashMap<>();

        for (Pair<byte[], String> familyPath : stagingFamilyPaths) {

            String key = Bytes.toString(familyPath.getFirst());
            String value = familyPath.getSecond();

            LOG.info(Bytes.toString(familyPath.getFirst()) + "===JULONG===>" + familyPath.getSecond());
            data.put(key, value);
//            esData.put(key, value);
        }
        esData.put("000", data);

        ElasticSearchUtil.saveEsDataWithBulk(esData, index);
    }

这里获取到的数据 只是 info : HDFS 地址;

正在回答 回答被采纳积分+3

1回答

Michael_PK 2021-08-09 22:18:09

bulkload是将数据以hfile的方式进行操作,这个和es没有关系的呢

你要写入es完全不需要这种,用普通的原生api以batch的方式写入es,完全就OK了。

0 回复 有任何疑惑可以回复我~
  • 提问者 lindy_chan #1
    我的数据型式是每几个小时产生的csv文件,所以直接用bulkload 导入速度是比较快的。不过es查询 hbase极慢。 使用普通协处理器 postput插入也是比较慢的。
    回复 有任何疑惑可以回复我~ 2021-08-09 22:33:10
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信