1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/dolphindb-Tutorials_CN

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
quant5_2.dos 2.7 КБ
Копировать Редактировать Web IDE Исходные данные Просмотреть построчно История
xiaodanzhang Отправлено 11.09.2020 07:42 a391e19
//重复运行时清理环境
def clearEnv(){
try{
dropAggregator(`tsAggr1)
dropAggregator(`tsAggr2)
unsubscribeTable(,`level2,`act_tsAggr1)
unsubscribeTable(,`level2,`act_tsAggr2)
unsubscribeTable(,`level2,`act_factor)
unsubscribeTable(,`level2,`newestLevel2data)
undef(`level2,SHARED)
undef(`OHLC1,SHARED)
undef(`OHLC2,SHARED)
undef(`FACTOR,SHARED)
}catch(ex){}
}
clearEnv()
//定义发布流表
share streamTable(100:0, `symbol`datetime`last`askPrice1`bidPrice1`askVolume1`bidVolume1`volume, [SYMBOL,DATETIME,DOUBLE,DOUBLE,DOUBLE,INT,INT,INT]) as level2
//5.2 用流计算生成K线
modal = table(100:0, `symbol`datetime`last`askPrice1`bidPrice1`askVolume1`bidVolume1`volume, [SYMBOL,DATETIME,DOUBLE,DOUBLE,DOUBLE,INT,INT,INT])
//五分钟K线
share streamTable(100:0, `datetime`symbol`open`high`low`close`volume,[DATETIME,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG]) as OHLC1
tsAggr1 = createTimeSeriesAggregator(name="tsAggr1", windowSize=300, step=300, metrics=<[first(last),max(last),min(last),last(last),sum(volume)]>, dummyTable=modal, outputTable=OHLC1, timeColumn=`datetime, keyColumn=`symbol)
subscribeTable(tableName="level2", actionName="act_tsAggr1", offset=0, handler=append!{tsAggr1}, msgAsTable=true);
//一分钟K线
share streamTable(100:0, `datetime`symbol`open`high`low`close`volume,[DATETIME,SYMBOL,DOUBLE,DOUBLE,DOUBLE,DOUBLE,LONG]) as OHLC2
tsAggr2 = createTimeSeriesAggregator(name="tsAggr2", windowSize=300, step=60, metrics=<[first(last),max(last),min(last),last(last),sum(volume)]>, dummyTable=modal, outputTable=OHLC2, timeColumn=`datetime, keyColumn=`symbol)
subscribeTable(tableName="level2", actionName="act_tsAggr2", offset=0, handler=append!{tsAggr2}, msgAsTable=true);
//回放数据到流表,触发K线计算
dbPath = "dfs://level2Replay"
if(!existsDatabase(dbPath)){
dbDate = database("", VALUE, 2020.01.01..2020.12.31)
dbSymbol=database("", HASH, [SYMBOL, 10])
db = database(dbPath, COMPO, [dbDate, dbSymbol])
modal = table(1:0, `symbol`datetime`last`askPrice1`bidPrice1`askVolume1`bidVolume1`volume, [SYMBOL,DATETIME,DOUBLE,DOUBLE,DOUBLE,INT,INT,INT])
pt=db.createPartitionedTable(modal,`quotes, `datetime`symbol)
data = select symbol, datetime(datetime(date(date))+second(time)) as datetime, last, askPrice1, bidPrice1, askVolume1, bidVolume1, curVol as volume from loadTable("dfs://level2","quotes")
pt.append!(data)
}
quotes = loadTable(dbPath,"quotes")
//设置每次提取到内存数据量=1小时
repartitionSchema = time(cutPoints(08:00:00..18:00:00,10))
inputDS = replayDS(<select * from quotes>, `datetime, `datetime, repartitionSchema)
submitJob("replay_quotes", "replay_quotes_stream", replay, [inputDS], [`level2], `datetime, `datetime, 10, false, 2)

Опубликовать ( 0 )

Вы можете оставить комментарий после Вход в систему

1
https://api.gitlife.ru/oschina-mirror/dolphindb-Tutorials_CN.git
git@api.gitlife.ru:oschina-mirror/dolphindb-Tutorials_CN.git
oschina-mirror
dolphindb-Tutorials_CN
dolphindb-Tutorials_CN
master