Слияние кода завершено, страница обновится автоматически
/* 使用Welch's method计算PSD功率谱密度
Welch's method是一种周期图法:
周期图法是将随机信号的N个采样点视作一个能量有限信号,傅里叶变换后,取幅值平方除以N,以此作为对真实功率谱的估计
Welch周期图法是修正的周期图功率谱密度的估计方法,它将信号分段加窗求其功率谱密度,然后做平均处理
*/
login("admin","123456")
undef all
try{loadPlugin("/hdd/hdd1/xyl/dolphindb/DolphinDB_2.00.8.12/server/plugins/signal/PluginSignal.txt");}catch(ex){}
go
def pwelch(data, window, noverlap, nfft, fs){
N = data.size();
M = nfft;
pfft = 10*log10(fs\nfft);
n_overlap = ceil(M * noverlap * 0.01); //重叠的点数,向上取整
L = floor((N - n_overlap ) / (M - n_overlap)); //计算分成了多少段数据
N_used = (M - n_overlap) * (L - 1) + M;
noise_used = data[0:(N_used)];
P_win = sum(window * window )/M; //窗函数能量计算
if (mod(nfft, 2) == 1){ //奇数
f = ((0 .. ((nfft + 1)/2)) * fs *1.0/ nfft);
fft_sum = array(DOUBLE, (nfft + 1)/2)
abs_fft_half = array(DOUBLE, (nfft + 1)/2)
}
else{ //偶数
f = ((0 .. (nfft / 2)) * fs *1.0/ nfft);
fft_sum = array(DOUBLE, (nfft/2 + 1))
abs_fft_half = array(DOUBLE, (nfft/2 + 1))
}
for(n in 1..L){ //计算每段中每个采样点fft变换后的赋值平方和
nstart = ( n - 1 )*(M - n_overlap);
temp_noise = noise_used[nstart : (nstart + M)] * window;
temp_fft = signal::mul(signal::fft(temp_noise), 1.0/nfft);
temp_fft_half = [temp_fft[0]]
temp_fft_half.append!(signal::mul(temp_fft[1:(nfft/2)], 2.0))
temp_fft_half.append!(temp_fft[nfft/2])
abs_fft_half[0] = pow(signal::abs(temp_fft_half[0]), 2.0);
end = pow(signal::abs(temp_fft_half[nfft/2]), 2);
abs_fft_half = abs_fft_half[0:1].append!(pow(signal::abs(temp_fft_half[1:(nfft/2)]), 2) * 0.5)
abs_fft_half.append!(end)
fft_sum = fft_sum + 1.0/ P_win * abs_fft_half;
}
fft_sum = fft_sum*1.0 / L //平均处理
return [pow(10, log10(fft_sum) - pfft*1.0 / 10), f] //PSD谱的纵坐标和横坐标
}
//hamming窗函数
def hamming(N){
return 0.54-0.46*cos(2*pi*(0..(N-1))/(N-1))
}
//sampling rate and time 定义采样频率和采样时间
fs = 1024; // Hz 返回的psd谱密度频率范围就是fs/2
// ts = 600; // second 10分钟
go
ts = 20;
go
//// sampling rate and bandwidth
nfft = 8192; //// modified
window = hamming(nfft); //// modified
noverlap = fs * 0; //// modified
bandwidthL = 1; //// modified
bandwidthH = 512; //// modified
sensitivity = 10; //// V/g
gain = 10;
N=ts*fs //原始数据放入引擎计算的每一段条数都要大于N,否则自动功率谱密度估计为0
//计算振动指标:rmsAcc 振动加速度均方根 rmsVel 振动速度均方根 rmsDis 振动位移均方根
defg rms(nose,N,sensitivity,gain,window, noverlap, nfft, fs,bandwidthL,bandwidthH){
if (size(nose)<N){
return 0.0,0.0,0.0 // 注意,这里必须为浮点数形式,否则引擎就会误认为输出为整型,反而会使结果精度消失
}
temp= nose/sensitivity/gain * 9.8 //电压信号改成加速度信号
temp=temp-mean(temp)
res=pwelch(temp, window, noverlap, nfft, fs) //psd谱
psdAcc=double(res[0]) // 加速度功率谱密度:(m/s^2)^2/HZ
f=double(res[1])
powAcc = psdAcc * f[1]
powVel = powAcc / square(2 *pi * f)
powDis = powVel / square(2 * pi * f)
resolution = fs*1.0 / nfft;
bandwidthLidx = int(bandwidthL / resolution) + 1;
bandwidthHidx = int(bandwidthH / resolution) + 1;
rmsAcc = sqrt(sum(powAcc[(int(bandwidthLidx) - 1):bandwidthHidx]))
rmsVel = sqrt(sum(powVel[(int(bandwidthLidx) - 1):bandwidthHidx]))*1000
rmsDis = sqrt(sum(powDis[(int(bandwidthLidx) - 1):bandwidthHidx]))*1000000
return rmsAcc, rmsVel, rmsDis
}
metrics=<[rms(signalnose,N,sensitivity,gain,window, noverlap, nfft, fs,bandwidthL,bandwidthH) as `rmsAcc`rmsVel`rmsDis]>
go
try{dropAggregator(`tsAggr1)}catch(ex){}
go
try{dropAggregator(`tsAggr2)}catch(ex){}
try{unsubscribeTable(tableName="signal", actionName="act_tsAggr1")}catch(ex){}
go
try{unsubscribeTable(tableName="srms", actionName="act_saveDfs1")}catch(ex){}
go
try{unsubscribeTable(tableName="srms", actionName="act_tsAggr2")}catch(ex){}
go
try{unsubscribeTable(tableName="warn", actionName="act_saveDfs2")}catch(ex){}
go
try{dropStreamTable("signal")}catch(ex){}
go
try{dropStreamTable("srms")}catch(ex){}
go
try{dropStreamTable("warn")}catch(ex){}
go
//输入表
t = streamTable(100:0, `timestamp`source`signalnose,[TIMESTAMP,SYMBOL,DOUBLE])
enableTableShareAndPersistence(table=t, tableName=`signal, cacheSize = 2000000)
go
//输出表
share streamTable(100:0, `datetime`source`rmsAcc`rmsVel`rmsDis,[TIMESTAMP,SYMBOL,DOUBLE,DOUBLE,DOUBLE]) as srms
//定义时序聚合引擎
tsAggr1 = createTimeSeriesAggregator(name="tsAggr1", windowSize=2*60*1000, step=2*60*1000, metrics=metrics, dummyTable=signal, outputTable=srms, timeColumn=`timestamp, keyColumn=`source)
subscribeTable(tableName="signal", actionName="act_tsAggr1", offset=0, handler=append!{tsAggr1}, msgAsTable=true);
go
//报警
share streamTable(100:0, `datetime`source`type`metric,[TIMESTAMP,SYMBOL,INT,STRING]) as warn
tsAggr2 = createAnomalyDetectionEngine(name="tsAggr2", metrics=<[rmsAcc > 0.055, rmsVel >0.32, rmsDis > 34.5]>, dummyTable=srms, outputTable=warn, timeColumn=`datetime, keyColumn=`source, windowSize=2*60*1000, step=2*60*1000)
subscribeTable(tableName="srms", actionName="act_tsAggr2", offset=0, handler=append!{tsAggr2}, msgAsTable=true);
go
//创建RMS分布式表,并订阅了落库
if(existsDatabase("dfs://rmsDB")){
dropDatabase("dfs://rmsDB")
}
db = database("dfs://rmsDB", VALUE, 2022.01.01..2022.12.31)
m = table(1:0,`datetime`source`rmsAcc`rmsVel`rmsDis,[TIMESTAMP,SYMBOL,DOUBLE,DOUBLE,DOUBLE])
db.createPartitionedTable(m, "rms", ["datetime"])
pt_rms = loadTable("dfs://rmsDB", "rms")
def saveSrmsToDFS(mutable dfsRMS, msg){
dfsRMS.append!(select datetime, source, rmsAcc as rmsAcc, rmsVel as rmsVel, rmsDis as rmsDis from msg)
}
subscribeTable(tableName="srms", actionName="act_saveDfs1", offset=-1, handler=saveSrmsToDFS{pt_rms}, msgAsTable=true, batchSize=10000, throttle=1);
//创建warnrms分布式表,并订阅了落库
if(existsDatabase("dfs://warnDB")){
dropDatabase("dfs://warnDB")
}
db = database("dfs://warnDB", VALUE, 2022.01.01..2022.12.31)
m = table(1:0,`datetime`source`type`metric,[TIMESTAMP,SYMBOL,INT,STRING])
db.createPartitionedTable(m, "warnrms", ["datetime"])
pt_warn = loadTable("dfs://warnDB", "warnrms")
def saveWarnToDFS(mutable dfswarnrms, msg){
dfswarnrms.append!(select datetime, source, type, metric from msg)
}
subscribeTable(tableName="warn", actionName="act_saveDfs2", offset=-1, handler=saveWarnToDFS{pt_warn}, msgAsTable=true, batchSize=10000, throttle=1);
//===============================================
//模拟实时生产数据
def fakedata(t){
source=`channel+string(1..16)
num=source.shape()[0]
n=1000*60*10
timestamp=now()
print(timestamp)
for (i in 1..n){
timestamp = timestamp+1
// print(timestamp)
time1=take(timestamp,num)
flag = rand([-1, 1], num)
signalnose =rand(1.,num)
signalnose = signalnose*flag
Table = table(time1 as timestamp, source, signalnose).sortBy!(`source`timestamp)
tableInsert(t, Table)
}
}
submitJob("fakedataplay","fakedataplayjob",fakedata,t)
getRecentJobs(1)
// 实际数据导入
def realdata(t)
{
dataPath="/hdd/hdd1/xyl/ShagnhaiWulianwang/TD06-29B000.csv"
Schema=extractTextSchema(dataPath)
// update Schema set type=`DOUBLE where name = `col16
data=loadText(dataPath,schema=Schema)
data = transpose(data)
deviceNum = 2;
n = 1000*60*10; // 10分钟
source=`channel0`channel1
tmp = table(100:0, `timestamp`source`signalnose,[TIMESTAMP,SYMBOL,DOUBLE])
timestamp = now();
for(i in 1..n){
timestamp=timestamp+1
time1=take(timestamp,deviceNum)
flag = rand([-1, 1], deviceNum);
signalnose = take([data[`col0][i], data[`col1][i]], deviceNum)
signalnose = signalnose*flag
Table = table(time1 as timestamp, source, signalnose).sortBy!(`source`timestamp)
tableInsert(t, Table)
}
}
submitJob("realdataplay","realdataplay",realdata,t)
cancelJob("fakedataplay202301300002")
getRecentJobs(1)
//grafana展示:psd谱 -> grafana
try{undef(`psd, SHARED)}catch(ex){}
data = select * from signal where source == "channel1" and timestamp >= 2023.01.29 03:54:21.652 and timestamp <= 2023.01.29 03:56:21.652 //通道1的振动信号数据
nose_ = data[`signalnose]
temp_= nose_/sensitivity/gain * 9.8
temp_=temp_-mean(temp_)
res_=pwelch(temp_, window, noverlap, nfft, fs)
share table(res_[1] as f, res_[0] as psdvalue) as psd //共享内存表供grafana查看
select * from srms where source = `channel1
select * from warn where source = `channel1
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )