1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/dolphindb-Tutorials_CN

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
calStreamOHLC.dos 14 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
Yunan Gao Отправлено 16.04.2024 09:31 72bdb6d
/**
File name: calStreamOHLC.dos
Application: script to calculate OHLC of Tong-Lian MDL real-time market data.
Author: Xinhai Tang
Company: DolphinDB Inc.
DolphinDB server version: 2.00.11 2024.01.02
Storage engine: TSDB and OLAP
Last modification time: 2024.04.15
DevEnv: support3-single16coreJIT
*/
/**
* step1-Define functions
*/
/* *
* @ Brief
* Print module logs.
* @ param
* msg is log text.
* @ Return
* Print front-end logs and back-end logs
* @ Example
* calculateOHLCLog("It is a test !")
*/
def calculateOHLCLog(msg){
formattedMsg = stringFormat("[DolphinDBModules::calculateOHLC: %W", msg)
print(formattedMsg)
writeLog(formattedMsg)
}
/* *
* @ Brief
* Get a stream table structure.
* @ param
* tableCapacity is table initialization capacity.
* @ Return
* table structure.
* @ Example
* getMDLSnapshotTB(100000)
*/
def getMDLSnapshotTB(tableCapacity=1000000){
colNames = `Market`TradeTime`MDStreamID`SecurityID`SecurityIDSource`TradingPhaseCode`ImageStatus`PreCloPrice`NumTrades`TotalVolumeTrade`TotalValueTrade`LastPrice`OpenPrice`HighPrice`LowPrice`ClosePrice`DifPrice1`DifPrice2`PE1`PE2`PreCloseIOPV`IOPV`TotalBidQty`WeightedAvgBidPx`AltWAvgBidPri`TotalOfferQty`WeightedAvgOfferPx`AltWAvgAskPri`UpLimitPx`DownLimitPx`OpenInt`OptPremiumRatio`OfferPrice`BidPrice`OfferOrderQty`BidOrderQty`BidNumOrders`OfferNumOrders`ETFBuyNumber`ETFBuyAmount`ETFBuyMoney`ETFSellNumber`ETFSellAmount`ETFSellMoney`YieldToMatu`TotWarExNum`WithdrawBuyNumber`WithdrawBuyAmount`WithdrawBuyMoney`WithdrawSellNumber`WithdrawSellAmount`WithdrawSellMoney`TotalBidNumber`TotalOfferNumber`MaxBidDur`MaxSellDur`BidNum`SellNum`LocalTime`SeqNo`OfferOrders`BidOrders
colTypes = [SYMBOL,TIMESTAMP,SYMBOL,SYMBOL,SYMBOL,SYMBOL,INT,DOUBLE,LONG,LONG,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG,DOUBLE,DOUBLE,LONG,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,DOUBLE[],DOUBLE[],LONG[],LONG[],INT[],INT[],INT,LONG,DOUBLE,INT,LONG,DOUBLE,DOUBLE,DOUBLE,INT,LONG,DOUBLE,INT,LONG,DOUBLE,INT,INT,INT,INT,INT,INT,TIME,INT,LONG[],LONG[]]
return streamTable(tableCapacity:0, colNames, colTypes)
}
/* *
* @ Brief
* Get a stream table structure.
* @ param
* tableCapacity is table initialization capacity.
* @ Return
* table structure.
* @ Example
* getMDLSnapshotProcessTB(100000)
*/
def getMDLSnapshotProcessTB(tableCapacity=1000000){
colNames = `SecurityID`TradeTime`UpLimitPx`DownLimitPx`PreCloPrice`HighPrice`LowPrice`LastPrice`PreCloseIOPV`IOPV`DeltasHighPrice`DeltasLowPrice`DeltasVolume`DeltasTurnover`DeltasTradesCount
colTypes = [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, INT]
return streamTable(tableCapacity:0, colNames, colTypes)
}
/* *
* @ Brief
* Get a stream table structure.
* @ param
* tableCapacity is table initialization capacity.
* @ Return
* table structure.
* @ Example
* getMDLStockFundOHLCTempTB(100000)
*/
def getMDLStockFundOHLCTempTB(tableCapacity=1000000){
colNames = `TradeTime`SecurityID`OpenPrice`HighPrice`LowPrice`ClosePrice`Volume`Turnover`TradesCount`PreClosePrice`PreCloseIOPV`IOPV`UpLimitPx`DownLimitPx`FirstBarChangeRate
colTypes = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]
return streamTable(tableCapacity:0, colNames, colTypes)
}
/* *
* @ Brief
* Get a stream table structure.
* @ param
* tableCapacity is table initialization capacity.
* @ Return
* table structure.
* @ Example
* getMDLStockFundOHLCTB(100000)
*/
def getMDLStockFundOHLCTB(tableCapacity=1000000){
colNames = `SecurityID`TradeTime`OpenPrice`HighPrice`LowPrice`ClosePrice`Volume`Turnover`TradesCount`PreClosePrice`PreCloseIOPV`IOPV`UpLimitPx`DownLimitPx`ChangeRate
colTypes = [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, LONG, DOUBLE, INT, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE, DOUBLE]
return streamTable(tableCapacity:0, colNames, colTypes)
}
/* *
* @ Brief
* An aggregate function for the highest price of OHLC.
* @ param
* DeltasHighPrice is the difference between the highest prices of two snapshots.
* HighPrice is the highest price of the current snapshot.
* LastPrice is the latest price of the current snapshot.
* @ Return
* A vector of the highest price.
* @ Example
* select high(DeltasHighPrice, HighPrice, LastPrice) from tb group by SecurityID, TradeDate, minute(TradeTime)
*/
defg high(DeltasHighPrice, HighPrice, LastPrice){
if(sum(DeltasHighPrice)>0.000001){
return max(HighPrice)
}
else{
return max(LastPrice)
}
}
/* *
* @ Brief
* An aggregate function for the lowest price of OHLC.
* @ param
* DeltasLowPrice is the difference between the lowest prices of two snapshots.
* LowPrice is the lowest price of the current snapshot.
* LastPrice is the latest price of the current snapshot.
* @ Return
* A vector of the lowest price.
* @ Example
* select low(DeltasLowPrice, LowPrice, LastPrice) from tb group by SecurityID, TradeDate, minute(TradeTime)
*/
defg low(DeltasLowPrice, LowPrice, LastPrice){
sumDeltas = sum(DeltasLowPrice)
if(sumDeltas<-0.000001 and sumDeltas!=NULL){
return min(iif(LowPrice==0.0, NULL, LowPrice))
}
else{
return min(LastPrice)
}
}
/* *
* @ Brief
* A function for cleaning up the streaming computing environment.
* @ param
* Related table names and engine names.
* @ Return
* Relate log.
* @ Example
* clearEnvMDLStockFundOHLC()
*/
def clearEnvMDLStockFundOHLC(mdlSnapshotTBName="mdlSnapshot",
mdlSnapshotProcessTBName="mdlSnapshotProcess",
mdlSnapshotProcessEngineName="mdlSnapshotProcessEngine",
mdlStockFundOHLCTempEngineName="mdlStockFundOHLCTempEngine",
mdlStockFundOHLCTBName="mdlStockFundOHLC",
mdlStockFundOHLCEngineName="mdlStockFundOHLCEngine")
{
//Cancel related subscriptions
try{unsubscribeTable(tableName=mdlSnapshotTBName, actionName=mdlSnapshotProcessEngineName)} catch(ex){calculateOHLCLog(ex)}
try{unsubscribeTable(tableName=mdlSnapshotProcessTBName, actionName=mdlStockFundOHLCTempEngineName)} catch(ex){calculateOHLCLog(ex)}
try{unsubscribeTable(tableName=mdlStockFundOHLCTBName, actionName=mdlStockFundOHLCTBName)} catch(ex){calculateOHLCLog(ex)}
//Cancel the definition of related stream tables
try{dropStreamTable(mdlSnapshotTBName)} catch(ex){calculateOHLCLog(ex)}
try{dropStreamTable(mdlSnapshotProcessTBName)} catch(ex){calculateOHLCLog(ex)}
try{dropStreamTable(mdlStockFundOHLCTBName)} catch(ex){calculateOHLCLog(ex)}
//Cancel the definition of related stream calculation engines
try{dropStreamEngine(mdlSnapshotProcessEngineName)} catch(ex){calculateOHLCLog(ex)}
try{dropStreamEngine(mdlStockFundOHLCEngineName)} catch(ex){calculateOHLCLog(ex)}
try{dropStreamEngine(mdlStockFundOHLCTempEngineName)} catch(ex){calculateOHLCLog(ex)}
}
/* *
* @ Brief
* A function to define the stream tables and the computing engines for streaming computing.
* @ param
* Related table names and engine names.
* @ Return
* Relate log.
* @ Example
* calRealtimeMDLStockFundOHLC()
*/
def calRealtimeMDLStockFundOHLC( tableCapacity=1000000,
mdlSnapshotTBName="mdlSnapshot",
mdlSnapshotProcessTBName="mdlSnapshotProcess",
mdlSnapshotProcessEngineName="mdlSnapshotProcessEngine",
mdlStockFundOHLCTempEngineName="mdlStockFundOHLCTempEngine",
mdlStockFundOHLCTBName="mdlStockFundOHLC",
mdlStockFundOHLCEngineName="mdlStockFundOHLCEngine")
{
//Clean up the related environment of stream calculation
clearEnvMDLStockFundOHLC( mdlSnapshotTBName,
mdlSnapshotProcessTBName,
mdlSnapshotProcessEngineName,
mdlStockFundOHLCTempEngineName,
mdlStockFundOHLCEngineName)
//Create MDL snapshot table
share(getMDLSnapshotTB(tableCapacity), mdlSnapshotTBName)
//Create MDL processed snapshot table
share(getMDLSnapshotProcessTB(tableCapacity), mdlSnapshotProcessTBName)
/** =============================================
* Create ReactiveStateEngine:mdlSnapshotProcessEngineName
* Processing and cleaning of original market data
* DeltasHighPrice is used to calculate the highest price
* DeltasLowPrice is used to calculate the lowest price
* DeltasVolume is used to calculate the Volume
* DeltasTurnover is used to calculate the Turnover
* DeltasTradesCount is used to calculate the TradesCount
*/
//Original columns in the snapshot table
colNames = `TradeTime`UpLimitPx`DownLimitPx`PreCloPrice`HighPrice`LowPrice`LastPrice`PreCloseIOPV`IOPV
//Derived columns processed based on the original snapshot table
convert = sqlCol(colNames).append!(sqlColAlias(<iif(deltas(HighPrice)>0.000001, 1, 0)>, `DeltasHighPrice)).append!(sqlColAlias(<iif(abs(deltas(LowPrice))>0.000001, -1, 0)>, `DeltasLowPrice)).append!(sqlColAlias(<iif(deltas(TotalVolumeTrade)==NULL, TotalVolumeTrade, deltas(TotalVolumeTrade))>, `DeltasVolume)).append!(sqlColAlias(<iif(deltas(TotalValueTrade)==NULL, TotalValueTrade, deltas(TotalValueTrade))>, `DeltasTurnover)).append!(sqlColAlias(<iif(deltas(NumTrades)==NULL, NumTrades, deltas(NumTrades))>, `DeltasTradesCount))
//Create ReactiveStateEngine: mdlSnapshotProcessEngineName
createReactiveStateEngine(name=mdlSnapshotProcessEngineName, metrics =convert, dummyTable=objByName(mdlSnapshotTBName), outputTable=objByName(mdlSnapshotProcessTBName), keyColumn="SecurityID", filter=<TradeTime.time() between 09:25:00.000:11:31:00.000 or TradeTime.time() between 13:00:00.000:14:57:00.000 or TradeTime.time()>=15:00:00.000>, keepOrder = true)
/** =====The engien of mdlSnapshotProcessEngineName is created=====*/
//Subscribe to the original snapshot table, input incremental data into the ReactiveStateEngine of mdlSnapshotProcessEngineName
subscribeTable(tableName=mdlSnapshotTBName, actionName=mdlSnapshotProcessEngineName, handler=getStreamEngine(mdlSnapshotProcessEngineName), msgAsTable=true, batchSize=100, throttle=0.002, hash=0, reconnect=true)
/** =============================================
* Create ReactiveStateEngine:mdlStockFundOHLCEngineName
* Calculating OHLC based on the output table of DailyTimeSeriesEngine
* In order for engine pipline, mdlStockFundOHLCTBName must be defined first
*/
//Create MDL 1-minute OHLC table
share(getMDLStockFundOHLCTB(100000), mdlStockFundOHLCTBName)
//Define engine calculation methods
convert = <[
TradeTime,
iif(OpenPrice==0, ClosePrice, OpenPrice).nullFill(0.0),
iif(HighPrice==0, ClosePrice, HighPrice).nullFill(0.0),
iif(LowPrice==0, ClosePrice, LowPrice).nullFill(0.0),
ClosePrice.nullFill(0.0),
Volume,
Turnover,
TradesCount,
PreClosePrice,
PreCloseIOPV.nullFill(0.0),
IOPV.nullFill(0.0),
UpLimitPx,
DownLimitPx,
iif(time(TradeTime)==09:30:00.000, FirstBarChangeRate, iif(ratios(ClosePrice)!=NULL, ratios(ClosePrice)-1, 0)).nullFill(0.0)
]>
//Create ReactiveStateEngine: mdlStockFundOHLCEngineName
createReactiveStateEngine(name=mdlStockFundOHLCEngineName, metrics =convert, dummyTable=getMDLStockFundOHLCTempTB(1), outputTable=objByName(mdlStockFundOHLCTBName), keyColumn="SecurityID", keepOrder = true)
/** =====The engien of mdlStockFundOHLCEngineName is created=====*/
/** =============================================
* Create DailyTimeSeriesEngine:mdlStockFundOHLCTempEngineName
* Calculating the temporary intermediate table required for calculating OHLC
* The input is the table of mdlSnapshotProcessTBName
*/
//Define engine calculation methods
barConvert = <[
firstNot(LastPrice, 0),
high(DeltasHighPrice, HighPrice, LastPrice),
low(DeltasLowPrice, LowPrice, LastPrice),
lastNot(LastPrice, 0),
sum(DeltasVolume),
sum(DeltasTurnover),
sum(DeltasTradesCount),
first(PreCloPrice),
first(PreCloseIOPV),
lastNot(IOPV, 0),
last(UpLimitPx),
last(DownLimitPx),
lastNot(LastPrice, 0)\firstNot(LastPrice, 0)-1
]>
//Define engine fill methods
fillList = [0, 0, 0, 'ffill', 0, 0, 0, 'ffill', 'ffill', 'ffill', 'ffill', 'ffill', 0]
createDailyTimeSeriesEngine( name=mdlStockFundOHLCTempEngineName,
windowSize=60000,
step=60000,
metrics=barConvert,
dummyTable=objByName(mdlSnapshotProcessTBName),
outputTable=getStreamEngine(mdlStockFundOHLCEngineName),
timeColumn=`TradeTime,
keyColumn=`SecurityID,
useWindowStartTime=true,
forceTriggerTime=1000,
fill=fillList,
sessionBegin=09:30:00.000 13:00:00.000 15:00:00.000,
sessionEnd=11:31:00.000 14:58:00.000 15:01:00.000,
mergeSessionEnd=true,
forceTriggerSessionEndTime=30000)
/** =====The engien of mdlStockFundOHLCTempEngineName is created=====*/
//Subscribe to the processed snapshot table, input incremental data into the DailyTimeSeriesEngine of mdlStockFundOHLCTempEngineName
subscribeTable(tableName=mdlSnapshotProcessTBName, actionName=mdlStockFundOHLCTempEngineName, handler=getStreamEngine(mdlStockFundOHLCTempEngineName), msgAsTable=true, batchSize=100, throttle=0.01, hash=0, reconnect=true)
}
go
/**
* step2-Declare parameters
*/
tableCapacity = 1000000
mdlSnapshotTBName = "mdlSnapshot"
mdlSnapshotProcessTBName = "mdlSnapshotProcess"
mdlSnapshotProcessEngineName = "mdlSnapshotProcessEngine"
mdlStockFundOHLCTempEngineName = "mdlStockFundOHLCTempEngine"
mdlStockFundOHLCTBName = "mdlStockFundOHLC"
mdlStockFundOHLCEngineName = "mdlStockFundOHLCEngine"
go
/**
* step3-Initialize streaming computing environment
*/
calRealtimeMDLStockFundOHLC(tableCapacity,
mdlSnapshotTBName,
mdlSnapshotProcessTBName,
mdlSnapshotProcessEngineName,
mdlStockFundOHLCTempEngineName,
mdlStockFundOHLCTBName,
mdlStockFundOHLCEngineName)
go
/**
* step4-Historical data replay
*/
replayData = select *
from loadTable("dfs://snapshotDB", "snapshotTB")
where TradeTime.date()=2023.02.01
order by TradeTime
replay(
inputTables=replayData,
outputTables=mdlSnapshot,
dateColumn=`TradeTime,
timeColumn=`TradeTime,
replayRate=-1)
go
/**
* step5-View results
*/
sleep(1000)
result = select * from mdlStockFundOHLC order by SecurityID
select top 10 * from result

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/dolphindb-Tutorials_CN.git
git@api.gitlife.ru:oschina-mirror/dolphindb-Tutorials_CN.git
oschina-mirror
dolphindb-Tutorials_CN
dolphindb-Tutorials_CN
master