Слияние кода завершено, страница обновится автоматически
def createDatabase(dbName,tableName, ps1, ps2){
tableSchema = table(1:0,`id`datetime`value,[INT,DATETIME,FLOAT]);
db1 = database("", VALUE, ps1)
db2 = database("", RANGE, ps2)
db = database(dbName,COMPO,[db1,db2])
dfsTable = db.createPartitionedTable(tableSchema,tableName,`datetime`id)
}
def generate1DayData(day, id, freqPerDay){
startTime = day.datetime()
idSize = size(id)
numRecords = freqPerDay * idSize
idVec = array(INT, numRecords)
for(i in 0:idSize) idVec[(i*freqPerDay) : ((i+1)*freqPerDay)] = id[i]
return table(idVec, take(startTime+0..(freqPerDay-1),numRecords) as datetime, rand(1.0, numRecords) as value)
}
def singleThreadWriting(id, startDay, days, freqPerDay, numIdPerPartition){
t = loadTable("dfs://svmDemo","sensors")
for(d in 0:days){
index=0
do{
t.append!(generate1DayData(startDay + d, id[index+0..(numIdPerPartition-1)], freqPerDay))
index += numIdPerPartition
}while (index < size(id))
}
}
def multipleThreadWriting(id, startDay, days, freqPerDay, numIdPerPartition, threads) {
//split id to multiple part for parallel writing
idCountPerThread = ceil(id.size()\threads/numIdPerPartition)*numIdPerPartition
ploop(singleThreadWriting{, startDay, days, freqPerDay, numIdPerPartition}, id.cut(idCountPerThread))
}
def mainJob(id, startDay, days, ps1, ps2, freqPerDay, numIdPerPartition, threads){
if(existsDatabase("dfs://svmDemo"))
dropDatabase("dfs://svmDemo")
createDatabase("dfs://svmDemo","sensors", ps1, ps2)
if(threads == 1)
submitJob("submit_singleThreadWriting", "write data", singleThreadWriting{id, startDay, days, freqPerDay, numIdPerPartition})
else
submitJob("submit_multipleThreadWriting", "write data", multipleThreadWriting{id, startDay, days, freqPerDay, numIdPerPartition, threads})
}
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
login("admin","123456")
freqPerDay=86400
numMachines=100
numMetrics=50
numMachinesPerPartition=2
numIdPerPartition=numMachinesPerPartition*numMetrics
ps1=2020.09.01..2020.12.31
ps2=(numMetrics*numMachinesPerPartition)*(0..(numMachines/numMachinesPerPartition))+1
id =1..(numMachines*numMetrics)
startDay=2020.09.01
//写入多少天的数据
days = 5
//多少个线程并行写入
threads = 20
mainJob(id, startDay, days, ps1, ps2, freqPerDay, numIdPerPartition, threads)
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )