Слияние кода завершено, страница обновится автоматически
/**
streamComputingReproduction.txt
Script to reproduce the stream computing
DolphinDB Inc.
DolphinDB server version: 1.30.18 2022.05.09 / 2.00.6 2022.05.09
Last modification time: 2022.05.10
*/
/**
Attention:
1. There are one places in the script that need to be modified according to the environment
*/
//clean up environment
def cleanEnvironment(){
try{ unsubscribeTable(tableName=`snapshotStream, actionName="aggrFeatures10min") } catch(ex){ print(ex) }
try{ unsubscribeTable(tableName=`aggrFeatures10min, actionName="predictRV") } catch(ex){ print(ex) }
try{ dropStreamEngine("aggrFeatures10min") } catch(ex){ print(ex) }
try{ dropStreamTable(`snapshotStream) } catch(ex){ print(ex) }
try{ dropStreamTable(`aggrFeatures10min) } catch(ex){ print(ex) }
try{ dropStreamTable(`result1min) } catch(ex){ print(ex) }
undef all
}
cleanEnvironment()
go
//login account
login("admin", "123456")
/**
modified location 1: modelSavePath, csvDataPath
*/
modelSavePath = "/hdd/hdd9/machineLearning/realizedVolatilityModel_1.30.18.bin"
//modelSavePath = "/hdd/hdd9/machineLearning/realizedVolatilityModel_2.00.6.bin"
csvDataPath = "/hdd/hdd9/machineLearning/testSnapshot.csv"
//load modle
model = loadModel(modelSavePath)
//define stream table
name = `SecurityID`TradeTime`PreClosePx`OpenPx`HighPx`LowPx`LastPx`TotalVolumeTrade`TotalValueTrade`BidPrice0`BidPrice1`BidPrice2`BidPrice3`BidPrice4`BidPrice5`BidPrice6`BidPrice7`BidPrice8`BidPrice9`BidOrderQty0`BidOrderQty1`BidOrderQty2`BidOrderQty3`BidOrderQty4`BidOrderQty5`BidOrderQty6`BidOrderQty7`BidOrderQty8`BidOrderQty9`OfferPrice0`OfferPrice1`OfferPrice2`OfferPrice3`OfferPrice4`OfferPrice5`OfferPrice6`OfferPrice7`OfferPrice8`OfferPrice9`OfferOrderQty0`OfferOrderQty1`OfferOrderQty2`OfferOrderQty3`OfferOrderQty4`OfferOrderQty5`OfferOrderQty6`OfferOrderQty7`OfferOrderQty8`OfferOrderQty9
type =`SYMBOL`TIMESTAMP`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`INT`INT`INT`INT`INT`INT`INT`INT`INT`INT
share streamTable(100000:0, name, type) as snapshotStream
share streamTable(100000:0 , `TradeTime`SecurityID`BAS`DI0`DI1`DI2`DI3`DI4`DI5`DI6`DI7`DI8`DI9`Press`RV,`TIMESTAMP`SYMBOL`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE`DOUBLE) as aggrFeatures10min
share streamTable(100000:0 , `TradeTime`SecurityID`PredictRV`CostTime, `TIMESTAMP`SYMBOL`DOUBLE`INT) as result1min
go
//define function to process data with matrix operation
defg featureEngine(bidPrice,bidQty,offerPrice,offerQty){
bas = offerPrice[0]\bidPrice[0]-1
wap = (bidPrice[0]*offerQty[0] + offerPrice[0]*bidQty[0])\(bidQty[0]+offerQty[0])
di = (bidQty-offerQty)\(bidQty+offerQty)
bidw=(1.0\(bidPrice-wap))
bidw=bidw\(bidw.rowSum())
offerw=(1.0\(offerPrice-wap))
offerw=offerw\(offerw.rowSum())
press=log((bidQty*bidw).rowSum())-log((offerQty*offerw).rowSum())
rv=std(log(wap)-log(prev(wap)))*sqrt(24*252*size(wap))
return avg(bas),avg(di[0]),avg(di[1]),avg(di[2]),avg(di[3]),avg(di[4]),avg(di[5]),avg(di[6]),avg(di[7]),avg(di[8]),avg(di[9]),avg(press),rv
}
metrics=<featureEngine(
matrix(BidPrice0,BidPrice1,BidPrice2,BidPrice3,BidPrice4,BidPrice5,BidPrice6,BidPrice7,BidPrice8,BidPrice9),
matrix(BidOrderQty0,BidOrderQty1,BidOrderQty2,BidOrderQty3,BidOrderQty4,BidOrderQty5,BidOrderQty6,BidOrderQty7,BidOrderQty8,BidOrderQty9),
matrix(OfferPrice0,OfferPrice1,OfferPrice2,OfferPrice3,OfferPrice4,OfferPrice5,OfferPrice6,OfferPrice7,OfferPrice8,OfferPrice9),
matrix(OfferOrderQty0,OfferOrderQty1,OfferOrderQty2,OfferOrderQty3,OfferOrderQty4,OfferOrderQty5,OfferOrderQty6,OfferOrderQty7,OfferOrderQty8,OfferOrderQty9)) as `BAS`DI0`DI1`DI2`DI3`DI4`DI5`DI6`DI7`DI8`DI9`Press`RV>
//register stream computing engine
createTimeSeriesEngine(name="aggrFeatures10min", windowSize=600000, step=60000, metrics=metrics, dummyTable=snapshotStream, outputTable=aggrFeatures10min, timeColumn=`TradeTime, useWindowStartTime=true, keyColumn=`SecurityID)
//subscribe data
subscribeTable(tableName="snapshotStream", actionName="aggrFeatures10min", offset=-1, handler=getStreamEngine("aggrFeatures10min"), msgAsTable=true, batchSize=2000, throttle=1, hash=0, reconnect=true)
//define handler
def predictRV(mutable result1min, model, msg){
startTime = now()
predicted = model.predict(msg)
temp = select TradeTime, SecurityID, predicted as PredictRV, (now()-startTime) as CostTime from msg
result1min.append!(temp)
}
//subscribe data
subscribeTable(tableName="aggrFeatures10min", actionName="predictRV", offset=-1, handler=predictRV{result1min, model}, msgAsTable=true, hash=1, reconnect=true)
go
//replay history data
data = select SecurityID, TradeTime, PreClosePx, OpenPx, HighPx, LowPx, LastPx, TotalVolumeTrade, TotalValueTrade, BidPrice0, BidPrice1, BidPrice2, BidPrice3, BidPrice4, BidPrice5, BidPrice6, BidPrice7, BidPrice8, BidPrice9, BidOrderQty0, BidOrderQty1, BidOrderQty2, BidOrderQty3, BidOrderQty4, BidOrderQty5, BidOrderQty6, BidOrderQty7, BidOrderQty8, BidOrderQty9, OfferPrice0, OfferPrice1, OfferPrice2, OfferPrice3, OfferPrice4, OfferPrice5, OfferPrice6, OfferPrice7, OfferPrice8, OfferPrice9, OfferOrderQty0, OfferOrderQty1, OfferOrderQty2, OfferOrderQty3, OfferOrderQty4, OfferOrderQty5, OfferOrderQty6, OfferOrderQty7, OfferOrderQty8, OfferOrderQty9
from loadText(filename=csvDataPath, schema=table(snapshotStream.schema().colDefs.name, snapshotStream.schema().colDefs.typeString)
)
order by TradeTime, SecurityID
submitJob("replay_trade", "trade", replay{data, snapshotStream, `TradeTime, `TradeTime, 20000, true, 1})
data = NULL
getRecentJobs()
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )