Слияние кода завершено, страница обновится автоматически
/*
version()
server: 1.30.15 2021.11.22
last modification time: 2021.11.30
last modification developer: DolpinDB
*/
//login account
login("admin", "123456")
//create database and table
def createSnapshotTB(){
if(existsDatabase("dfs://snapshot") == false){
dbDate = database("", VALUE, 2020.01.01..2020.01.03)
dbSymbol = database("", HASH, [SYMBOL, 30])
db = database("dfs://snapshot", COMPO, [dbDate, dbSymbol])
schemaTable = table(
array(SYMBOL, 0) as SecurityID,
array(DATE, 0) as Date,
array(TIME, 0) as Time,
array(LONG, 0) as PreClosePx,
array(LONG, 0) as OpenPx,
array(LONG, 0) as HighPx,
array(LONG, 0) as LowPx,
array(LONG, 0) as LastPx,
array(LONG, 0) as TotalVolumeTrade,
array(LONG, 0) as TotalValueTrade,
array(LONG, 0) as BidPrice1,
array(LONG, 0) as BidPrice2,
array(LONG, 0) as BidPrice3,
array(LONG, 0) as BidPrice4,
array(LONG, 0) as BidPrice5,
array(LONG, 0) as BidPrice6,
array(LONG, 0) as BidPrice7,
array(LONG, 0) as BidPrice8,
array(LONG, 0) as BidPrice9,
array(LONG, 0) as BidPrice10,
array(LONG, 0) as BidOrderQty1,
array(LONG, 0) as BidOrderQty2,
array(LONG, 0) as BidOrderQty3,
array(LONG, 0) as BidOrderQty4,
array(LONG, 0) as BidOrderQty5,
array(LONG, 0) as BidOrderQty6,
array(LONG, 0) as BidOrderQty7,
array(LONG, 0) as BidOrderQty8,
array(LONG, 0) as BidOrderQty9,
array(LONG, 0) as BidOrderQty10,
array(LONG, 0) as OfferPrice1,
array(LONG, 0) as OfferPrice2,
array(LONG, 0) as OfferPrice3,
array(LONG, 0) as OfferPrice4,
array(LONG, 0) as OfferPrice5,
array(LONG, 0) as OfferPrice6,
array(LONG, 0) as OfferPrice7,
array(LONG, 0) as OfferPrice8,
array(LONG, 0) as OfferPrice9,
array(LONG, 0) as OfferPrice10,
array(LONG, 0) as OfferOrderQty1,
array(LONG, 0) as OfferOrderQty2,
array(LONG, 0) as OfferOrderQty3,
array(LONG, 0) as OfferOrderQty4,
array(LONG, 0) as OfferOrderQty5,
array(LONG, 0) as OfferOrderQty6,
array(LONG, 0) as OfferOrderQty7,
array(LONG, 0) as OfferOrderQty8,
array(LONG, 0) as OfferOrderQty9,
array(LONG, 0) as OfferOrderQty10
)
db.createPartitionedTable(schemaTable, `snapshot, `Date`SecurityID)
writeLog("---------------------------------------------------------------------")
writeLog("dfs://snapshot created successfully !")
writeLog("---------------------------------------------------------------------")
}
else{
writeLog("---------------------------------------------------------------------")
writeLog("dfs://snapshot has already created !")
writeLog("---------------------------------------------------------------------")
}
}
createSnapshotTB()
go
//Define persistent stream table
snapshotStreamTemp = streamTable(
array(SYMBOL, 0) as SecurityID,
array(DATE, 0) as Date,
array(TIME, 0) as Time,
array(LONG, 0) as PreClosePx,
array(LONG, 0) as OpenPx,
array(LONG, 0) as HighPx,
array(LONG, 0) as LowPx,
array(LONG, 0) as LastPx,
array(LONG, 0) as TotalVolumeTrade,
array(LONG, 0) as TotalValueTrade,
array(LONG, 0) as BidPrice1,
array(LONG, 0) as BidPrice2,
array(LONG, 0) as BidPrice3,
array(LONG, 0) as BidPrice4,
array(LONG, 0) as BidPrice5,
array(LONG, 0) as BidPrice6,
array(LONG, 0) as BidPrice7,
array(LONG, 0) as BidPrice8,
array(LONG, 0) as BidPrice9,
array(LONG, 0) as BidPrice10,
array(LONG, 0) as BidOrderQty1,
array(LONG, 0) as BidOrderQty2,
array(LONG, 0) as BidOrderQty3,
array(LONG, 0) as BidOrderQty4,
array(LONG, 0) as BidOrderQty5,
array(LONG, 0) as BidOrderQty6,
array(LONG, 0) as BidOrderQty7,
array(LONG, 0) as BidOrderQty8,
array(LONG, 0) as BidOrderQty9,
array(LONG, 0) as BidOrderQty10,
array(LONG, 0) as OfferPrice1,
array(LONG, 0) as OfferPrice2,
array(LONG, 0) as OfferPrice3,
array(LONG, 0) as OfferPrice4,
array(LONG, 0) as OfferPrice5,
array(LONG, 0) as OfferPrice6,
array(LONG, 0) as OfferPrice7,
array(LONG, 0) as OfferPrice8,
array(LONG, 0) as OfferPrice9,
array(LONG, 0) as OfferPrice10,
array(LONG, 0) as OfferOrderQty1,
array(LONG, 0) as OfferOrderQty2,
array(LONG, 0) as OfferOrderQty3,
array(LONG, 0) as OfferOrderQty4,
array(LONG, 0) as OfferOrderQty5,
array(LONG, 0) as OfferOrderQty6,
array(LONG, 0) as OfferOrderQty7,
array(LONG, 0) as OfferOrderQty8,
array(LONG, 0) as OfferOrderQty9,
array(LONG, 0) as OfferOrderQty10
)
enableTableShareAndPersistence(table=snapshotStreamTemp, tableName="snapshotStream", asynWrite=true, compress=true, cacheSize=5000000, retentionMinutes=1440, flushMode=0, preCache=100000)
undef("snapshotStreamTemp")
writeLog("---------------------------------------------------------------------")
writeLog("sharedTable1:snapshotStream created successfully !")
writeLog("---------------------------------------------------------------------")
snapshotStreamProcessTemp = streamTable(
array(SYMBOL, 0) as SecurityID,
array(DATE, 0) as Date,
array(TIME, 0) as Time,
array(LONG, 0) as PreClosePx,
array(LONG, 0) as OpenPx,
array(LONG, 0) as HighPx,
array(LONG, 0) as LowPx,
array(LONG, 0) as LastPx,
array(LONG, 0) as TotalVolumeTrade,
array(LONG, 0) as TotalValueTrade,
array(LONG, 0) as BidPrice1,
array(LONG, 0) as BidPrice2,
array(LONG, 0) as BidPrice3,
array(LONG, 0) as BidPrice4,
array(LONG, 0) as BidPrice5,
array(LONG, 0) as BidPrice6,
array(LONG, 0) as BidPrice7,
array(LONG, 0) as BidPrice8,
array(LONG, 0) as BidPrice9,
array(LONG, 0) as BidPrice10,
array(LONG, 0) as BidOrderQty1,
array(LONG, 0) as BidOrderQty2,
array(LONG, 0) as BidOrderQty3,
array(LONG, 0) as BidOrderQty4,
array(LONG, 0) as BidOrderQty5,
array(LONG, 0) as BidOrderQty6,
array(LONG, 0) as BidOrderQty7,
array(LONG, 0) as BidOrderQty8,
array(LONG, 0) as BidOrderQty9,
array(LONG, 0) as BidOrderQty10,
array(LONG, 0) as OfferPrice1,
array(LONG, 0) as OfferPrice2,
array(LONG, 0) as OfferPrice3,
array(LONG, 0) as OfferPrice4,
array(LONG, 0) as OfferPrice5,
array(LONG, 0) as OfferPrice6,
array(LONG, 0) as OfferPrice7,
array(LONG, 0) as OfferPrice8,
array(LONG, 0) as OfferPrice9,
array(LONG, 0) as OfferPrice10,
array(LONG, 0) as OfferOrderQty1,
array(LONG, 0) as OfferOrderQty2,
array(LONG, 0) as OfferOrderQty3,
array(LONG, 0) as OfferOrderQty4,
array(LONG, 0) as OfferOrderQty5,
array(LONG, 0) as OfferOrderQty6,
array(LONG, 0) as OfferOrderQty7,
array(LONG, 0) as OfferOrderQty8,
array(LONG, 0) as OfferOrderQty9,
array(LONG, 0) as OfferOrderQty10,
array(DOUBLE, 0) as Return,
array(INT, 0) as Volume,
array(INT, 0) as Amount
)
enableTableShareAndPersistence(table=snapshotStreamProcessTemp, tableName="snapshotStreamProcess", asynWrite=true, compress=true, cacheSize=5000000, retentionMinutes=1440, flushMode=0, preCache=100000)
undef("snapshotStreamProcessTemp")
writeLog("---------------------------------------------------------------------")
writeLog("sharedTable2:snapshotStreamProcess created successfully !")
writeLog("---------------------------------------------------------------------")
snapshotAggr1minTemp = streamTable(
array(TIMESTAMP, 0) as UpdateTime,
array(SYMBOL, 0) as SecurityID,
array(DOUBLE, 0) as Open,
array(DOUBLE, 0) as High,
array(DOUBLE, 0) as Low,
array(DOUBLE, 0) as Close,
array(INT, 0) as Volume,
array(INT, 0) as Amount,
array(LONG, 0) as TotalVolumeTrade,
array(LONG, 0) as TotalValueTrade,
array(DOUBLE, 0) as BidPxMean,
array(DOUBLE, 0) as OfferPxMean,
array(DOUBLE, 0) as BidQtyMean,
array(DOUBLE, 0) as OfferQtyMean,
array(DOUBLE, 0) as AbsReturn
)
enableTableShareAndPersistence(table=snapshotAggr1minTemp, tableName="snapshotAggr1min", asynWrite=true, compress=true, cacheSize=2000000, retentionMinutes=1440, flushMode=0, preCache=100000)
undef("snapshotAggr1minTemp")
writeLog("---------------------------------------------------------------------")
writeLog("sharedTable3:snapshotAggr1min created successfully !")
writeLog("---------------------------------------------------------------------")
go
//create ReactiveStateEngine
sqlColNames = snapshotStream.schema().colDefs.name[1:]
metrics = sqlCol(sqlColNames).append!(parseExpr("ratios(LastPx)-1")).append!(parseExpr("deltas(TotalVolumeTrade)")).append!(parseExpr("deltas(TotalValueTrade)"))
snapshotProcessing = createReactiveStateEngine(name="snapshotProcessing", metrics=metrics, dummyTable=snapshotStream, outputTable=snapshotStreamProcess, keyColumn="SecurityID")
writeLog("---------------------------------------------------------------------")
writeLog("ReactiveStateEngine:snapshotProcessing created successfully !")
writeLog("---------------------------------------------------------------------")
subscribeTable(tableName="snapshotStream", actionName="snapshotProcessing", offset=-1, handler=append!{snapshotProcessing}, msgAsTable=true, hash=0, reconnect=true)
undef("snapshotProcessing")
writeLog("---------------------------------------------------------------------")
writeLog("subscribe1:snapshotStream subscribed successfully !")
writeLog("---------------------------------------------------------------------")
go
//create TimeSeriesEngine
metrics =<[
first(LastPx\10000),
max(LastPx\10000),
min(LastPx\10000),
last(LastPx\10000),
sum(Volume),
sum(Amount),
last(TotalVolumeTrade),
last(TotalValueTrade),
avg(BidPrice1+BidPrice2+BidPrice3+BidPrice4+BidPrice5+BidPrice6+BidPrice7+BidPrice8+BidPrice9+BidPrice10),
avg(OfferPrice1+OfferPrice2+OfferPrice3+OfferPrice4+OfferPrice5+OfferPrice6+OfferPrice7+OfferPrice8+OfferPrice9+OfferPrice10),
avg(BidOrderQty1+BidOrderQty2+BidOrderQty3+BidOrderQty4+BidOrderQty5+BidOrderQty6+BidOrderQty7+BidOrderQty8+BidOrderQty9+BidOrderQty10),
avg(OfferOrderQty1+OfferOrderQty2+OfferOrderQty3+OfferOrderQty4+OfferOrderQty5+OfferOrderQty6+OfferOrderQty7+OfferOrderQty8+OfferOrderQty9+OfferOrderQty10),
sum(abs(Return))
]>
snapshotAggr= createTimeSeriesEngine(name="snapshotAggr1min", windowSize=60000, step=60000, metrics=metrics, dummyTable=snapshotStreamProcess, outputTable=snapshotAggr1min, timeColumn=`Date`Time, useSystemTime=false, keyColumn="SecurityID", garbageSize=5000, useWindowStartTime=true, fill=["ffill", "ffill", "ffill", "ffill", 0, 0, "ffill", "ffill", 0.0, 0.0, 0.0, 0.0, 0.0])
subscribeTable(tableName="snapshotStreamProcess", actionName="snapshotAggr1min", offset=-1, handler=append!{snapshotAggr}, msgAsTable=true, batchSize=800, throttle=1, hash=1, reconnect=true)
undef("snapshotAggr")
writeLog("---------------------------------------------------------------------")
writeLog("subscribe2:snapshotStreamProcess subscribed successfully !")
writeLog("---------------------------------------------------------------------")
//real time storage in database
subscribeTable(tableName="snapshotStream", actionName="snapshotToDatabase", offset=-1, handler=loadTable("dfs://snapshot", "snapshot"), msgAsTable=true, batchSize=800, throttle=1, hash=2, reconnect=true)
writeLog("---------------------------------------------------------------------")
writeLog("subscribe3:snapshotStream subscribed successfully !")
writeLog("---------------------------------------------------------------------")
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )