请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

在spark中使用for循环会不会对性能造成影响

正在回答

2回答

举个例子看看

0 回复 有任何疑惑可以回复我~
  • 提问者 孤独观测者 #1
    hbaseRDD.filter(x =>{
          val time = Bytes.toString(x._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("time")))
          time > "2020-08-11T00:00:00"
        }).map(x => {
          // 根据mac来对数据进行分组
          val id = Bytes.toString(x._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("id")))
          val mac = Bytes.toString(x._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("mac")))
          val time = Bytes.toString(x._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("time")))
          val companyId = Bytes.toString(x._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("companyId")))
    
          (mac, Bike(mac, time, id, companyId))
        }).groupByKey().collect().map(x => {
          //得到同一单车mac下所有的Bike
          val mac = x._1
          val value:Iterable[Bike] = x._2
          var list:ListBuffer[Bike] = new ListBuffer[Bike]()
    
          var tmp:String = value.iterator.next().id
          list+=value.iterator.next()
          println("单车:"+mac)
    
          for (elem <- value) {
    
            println("    时间: "+ elem.time+",单车轨迹"+elem.id)
            if(elem.id != tmp){
              list+=elem
              tmp = elem.id
            }
          }
          println("测试:"+ list)
          list
        })
    回复 有任何疑惑可以回复我~ 2020-08-11 22:33:48
  • 提问者 孤独观测者 #2
    想根据Bike这个对象中的id(id代表一个地点),对hbase中获取的数据进行去重。现在可能在一个地点半小时内上传了多个数据,但我只想在一个地点保留一个数据
    回复 有任何疑惑可以回复我~ 2020-08-11 22:37:20
  • Michael_PK 回复 提问者 孤独观测者 #3
    groupByKey().collect() 这种方法使用的时候要注意,因为collect是将所有数据返回到driver的,你driver是否能抗的住,这是个问题
    回复 有任何疑惑可以回复我~ 2020-08-11 23:58:26
提问者 孤独观测者 2020-08-11 22:34:34

hbaseRDD.filter(x =>{
 val time = Bytes.toString(x._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("time")))
 time > "2020-08-11T00:00:00"
}).map(x => {
 // 根据mac来对数据进行分组
 val id = Bytes.toString(x._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("id")))
 val mac = Bytes.toString(x._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("mac")))
 val time = Bytes.toString(x._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("time")))
 val companyId = Bytes.toString(x._2.getValue(Bytes.toBytes("info"), Bytes.toBytes("companyId")))

 (mac, Bike(mac, time, id, companyId))
}).groupByKey().collect().map(x => {
 //得到同一单车mac下所有的Bike
 val mac = x._1
 val value:Iterable[Bike] = x._2
 var list:ListBuffer[Bike] = new ListBuffer[Bike]()

 var tmp:String = value.iterator.next().id
 list+=value.iterator.next()
 println("单车:"+mac)

 for (elem <- value) {

   println("    时间: "+ elem.time+",单车轨迹"+elem.id)
   if(elem.id != tmp){
     list+=elem
     tmp = elem.id
   }
 }
 println("测试:"+ list)
 list
})

0 回复 有任何疑惑可以回复我~
  • 这种结果是聚合后的,没啥大问题的,因为聚合后的结果的数量级已经没多大了
    回复 有任何疑惑可以回复我~ 2020-08-11 23:57:38
  • 提问者 孤独观测者 回复 Michael_PK #2
    我还是有些疑惑,可能因为java写多了。所以在spark里写for循环来处理数据,就感觉会不会比spark自带的算子效率低下?
    回复 有任何疑惑可以回复我~ 2020-08-12 00:09:31
  • Michael_PK 回复 提问者 孤独观测者 #3
    能用算子的毋庸置疑,因为分布式
    回复 有任何疑惑可以回复我~ 2020-08-12 00:16:34
问题已解决,确定采纳
还有疑问,暂不采纳
微信客服

购课补贴
联系客服咨询优惠详情

帮助反馈 APP下载

慕课网APP
您的移动学习伙伴

公众号

扫描二维码
关注慕课网微信公众号