Слияние кода завершено, страница обновится автоматически
//重复运行时清理环境
def clearEnv(){
try{
dropAggregator(`tsAggr1)
dropAggregator(`tsAggr2)
unsubscribeTable(,`level2,`act_tsAggr1)
unsubscribeTable(,`level2,`act_tsAggr2)
unsubscribeTable(,`level2,`act_factor)
unsubscribeTable(,`level2,`newestLevel2data)
undef(`level2,SHARED)
undef(`OHLC1,SHARED)
undef(`OHLC2,SHARED)
undef(`FACTOR,SHARED)
}catch(ex){}
}
clearEnv()
//定义发布流表
share streamTable(100:0, `symbol`datetime`last`askPrice1`bidPrice1`askVolume1`bidVolume1`volume, [SYMBOL,DATETIME,DOUBLE,DOUBLE,DOUBLE,INT,INT,INT]) as level2
//5.4 使用键值表缓存最新报价和交易价格
newestLevel2 = keyedTable(`symbol, 100:0, `symbol`datetime`last`askPrice1`bidPrice1`askVolume1`bidVolume1`volume, [SYMBOL,DATETIME,DOUBLE,DOUBLE,DOUBLE,INT,INT,INT])
subscribeTable(tableName="level2", actionName="newestLevel2data", offset=0, handler=append!{newestLevel2}, msgAsTable=true)
//回放数据到流表,通过订阅写入键值表
dbPath = "dfs://level2Replay"
if(!existsDatabase(dbPath)){
dbDate = database("", VALUE, 2020.01.01..2020.12.31)
dbSymbol=database("", HASH, [SYMBOL, 10])
db = database(dbPath, COMPO, [dbDate, dbSymbol])
modal = table(1:0, `symbol`datetime`last`askPrice1`bidPrice1`askVolume1`bidVolume1`volume, [SYMBOL,DATETIME,DOUBLE,DOUBLE,DOUBLE,INT,INT,INT])
pt=db.createPartitionedTable(modal,`quotes, `datetime`symbol)
data = select symbol, datetime(datetime(date(date))+second(time)) as datetime, last, askPrice1, bidPrice1, askVolume1, bidVolume1, curVol as volume from loadTable("dfs://level2","quotes")
pt.append!(data)
}
quotes = loadTable(dbPath,"quotes")
//设置每次提取到内存数据量=1小时
repartitionSchema = time(cutPoints(08:00:00..18:00:00,10))
inputDS = replayDS(<select * from quotes>, `datetime, `datetime, repartitionSchema)
submitJob("replay_quotes", "replay_quotes_stream", replay, [inputDS], [`level2], `datetime, `datetime, 10, false, 2)
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )