经过半天研究,利用多线程实现
object kudu {
var Number=20
val ob =“aa"
def getNumber(): Int ={
Number
}
def deNumber(number:Int):Int={
Number=number-1
Number
}
def getOb():String={
ob
}
class insertTread(client: KuduClient, tableName: String,ThreadName:String) extends Runnable{
val table: KuduTable = client.openTable(tableName)
val session: KuduSession = client.newSession()
override def run(): Unit = {
while (deNumber(getNumber) > 11) {
val tem=getNumber()
synchronized(getOb())
if (tem>0){
val insert: Insert = table.newInsert()
val row: PartialRow = insert.getRow
row.addString(“word”, s"mengfansong_$getNumber”)
row.addInt(“cnd”, getNumber)
session.apply(insert)
println(ThreadName + “—” + tem)
}else{
println(“没了”)
}
}
}
}
def doubleInsert(): Unit ={
val KUDU_MASTARS="hadoop000"
val client: KuduClient = new KuduClient.KuduClientBuilder(KUDU_MASTARS).build()
val tableName="meng"
val threadPool:ExecutorService=Executors.newFixedThreadPool(5)
try {
for(i <- 0 to 2){
threadPool.execute(new insertTread(client, tableName, “ThreadName” + i))
}
}finally {
threadPool.shutdown()
}
}
def main(args: Array[String]): Unit = {
val KUDU_MASTARS="hadoop000"
val client: KuduClient = new KuduClient.KuduClientBuilder(KUDU_MASTARS).build()
val tableName="meng"
doubleInsert()
//creatTable(client,tableName)
//insertRows(client,tableName)
//deleteTable(client,tableName)
//query(client,tableName)
client.close()
}
已测试可以,请老师批阅。