请稍等 ...
×

采纳答案成功!

向帮助你的同学说点啥吧!感谢那些助人为乐的人

批量插入作业

经过半天研究,利用多线程实现
object kudu {
var Number=20
val ob =“aa"
def getNumber(): Int ={
Number
}
def deNumber(number:Int):Int={
Number=number-1
Number
}
def getOb():String={
ob
}
class insertTread(client: KuduClient, tableName: String,ThreadName:String) extends Runnable{
val table: KuduTable = client.openTable(tableName)
val session: KuduSession = client.newSession()
override def run(): Unit = {
while (deNumber(getNumber) > 11) {
val tem=getNumber()
synchronized(getOb())
if (tem>0){
val insert: Insert = table.newInsert()
val row: PartialRow = insert.getRow
row.addString(“word”, s"mengfansong_$getNumber”)
row.addInt(“cnd”, getNumber)
session.apply(insert)
println(ThreadName + “—” + tem)
}else{
println(“没了”)
}
}
}
}
def doubleInsert(): Unit ={
val KUDU_MASTARS="hadoop000"
val client: KuduClient = new KuduClient.KuduClientBuilder(KUDU_MASTARS).build()
val tableName="meng"
val threadPool:ExecutorService=Executors.newFixedThreadPool(5)
try {
for(i <- 0 to 2){
threadPool.execute(new insertTread(client, tableName, “ThreadName” + i))
}
}finally {
threadPool.shutdown()
}
}
def main(args: Array[String]): Unit = {
val KUDU_MASTARS="hadoop000"
val client: KuduClient = new KuduClient.KuduClientBuilder(KUDU_MASTARS).build()
val tableName="meng"
doubleInsert()
//creatTable(client,tableName)
//insertRows(client,tableName)
//deleteTable(client,tableName)
//query(client,tableName)
client.close()
}

已测试可以,请老师批阅。

正在回答 回答被采纳积分+3

1回答

Michael_PK 2020-09-20 16:35:51

这种实现方式可以的。但是你还可以在深究是否能以batch的方式插入呢

0 回复 有任何疑惑可以回复我~
  • 提问者 慕九州3016327 #1
    意思就是说,先不写入不执行apply操作,先把数据放到一个容器内,之后进行批量写入吗
    回复 有任何疑惑可以回复我~ 2020-09-20 16:43:49
问题已解决,确定采纳
还有疑问,暂不采纳
意见反馈 帮助中心 APP下载
官方微信