我将csv文件导入到hbase 使用ImportTsv生成HFile
hbase org.apache.hadoop.hbase.mapreduce.ImportTsv
已经成功,但是我想使用自定义的协处理器 导入之后触发 postBulkLoadHFile 事件,获取到数据后进行插入ES,达到二级缓存的作用。 因为是hfile的 所以它操作的不是表级别的,触发不到 postPut 这个事件,如果单纯的操作表结构是可以的。
现在的问题是 如何获取到数据
public class HbaseDataSyncEsObserver implements RegionObserver, RegionCoprocessor, BulkLoadObserver{
@Override
public void postBulkLoadHFile(ObserverContext<RegionCoprocessorEnvironment> ctx,
List<Pair<byte[], String>> stagingFamilyPaths, Map<byte[], List<Path>> finalPaths) {
Map<String, Object> data = new HashMap<>();
Map<String, Map<String, Object>> esData = new HashMap<>();
for (Pair<byte[], String> familyPath : stagingFamilyPaths) {
String key = Bytes.toString(familyPath.getFirst());
String value = familyPath.getSecond();
LOG.info(Bytes.toString(familyPath.getFirst()) + "===JULONG===>" + familyPath.getSecond());
data.put(key, value);
// esData.put(key, value);
}
esData.put("000", data);
ElasticSearchUtil.saveEsDataWithBulk(esData, index);
}
这里获取到的数据 只是 info : HDFS 地址;