Слияние кода завершено, страница обновится автоматически
/*
* 模拟数据,如若您有分钟k线数据可以直接入库,或者也可可以从快照或tick中计算取数
* 数据跨度为1年,4000只股票,每只股票工作日的分钟线从9:30 ~11:30, 13:00 ~15:00
* 一共9个字段,一年的数据压缩前大约为20G,分月导入。
* 此处创建一个按月VALUE分区,按股票HASH的组合分区,每个分区压缩前大约500M
* 本例涉及的因子是日内收益偏度因子和doubleEMA,分钟频率的数据一般采用SQL模式计算,这样可以在分区内部并行计算。
*/
//创建库表定义,如若已存在,则会drop已有的库重新建库建表
def createMinuteDbTable(dbName,tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
dbDate = database("", VALUE,2020.01M..2020.12M )
dbSym= database("", HASH, [SYMBOL,3])
db = database(dbName, COMPO, [dbDate, dbSym])
t =table(1:0, `securityid`tradetime`open`close`high`low`vol`val`vwap, [SYMBOL,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,DOUBLE])
db.createPartitionedTable(t, tbName, `tradetime`securityid)
}
//模拟数据定义,并导入库表。模拟12个月的数据加上入库大约耗时1分钟。
def genKminute(n){
tradeDate= select * from table(distinct(businessDay(temporalAdd(2020.01.01,n, "M")..monthEnd(temporalAdd(2020.01.01,n, "M")))).sort() as tradeDate) where tradeDate>=temporalAdd(2020.01.01,n, "M")
tradeMin = table(09:30:00.000+0..120*60*1000 join (13:00:00.000+0..120*60*1000) as tradeMin)
tradetime = select concatDateTime(tradeDate,tradeMin) as tradetime from cj(tradeDate,tradeMin)
securityid ="sz"+lpad(string(000001..004000), 6, `0)
tmpTable = cj(table(securityid as securityid),tradetime)
open = rand(100.0, size(tradetime)*4000)
high = open + rand(1.0,size(tradetime)*4000)
low = high - rand(2.0,size(tradetime)*4000)
close = open + norm(0,2,size(tradetime)*4000)
vol = rand(100000,size(tradetime)*4000)
val = close*vol
vwap = close
resTable = tmpTable join table(open,close, high, low, vol, val, vwap)
tradeDate=NULL
tradeMin = NULL
tradetime =NULL
securityid =NULL
tmpTable = NULL
open =NULL
high = NULL
low = NULL
close =NULL
vol = NULL
val = NULL
vwap = NULL
db = loadTable("dfs://k_minute_level","k_minute")
db.append!(resTable)
}
def writeInMinuteByMonth(numOfMonth){
for (n in 0..(numOfMonth-1)){
submitJob("genKminute_"+string(n),"genKminute_"+string(n),genKminute,n)
}
}
/*
* 因子定义:
* 此节用到了两个因子,分别为日内收益率偏度dayReturnSkew以及DoubleEMA
*/
//dayReturnSkew
defg dayReturnSkew(close){
return skew(ratios(close))
}
//DoubleEMA
def sum_diff(x, y){
return (x-y)\(x+y)
}
def factorDoubleEMA(price){
ema_20 = ema(price, 20)
ema_40 = ema(price, 40)
sum_diff_1000 = 1000 * sum_diff(ema_20, ema_40)
return ema(sum_diff_1000,10) - ema(sum_diff_1000, 20)
}
/*
因子数据库表结构定义:
单值存储表loadTable("dfs://K_FACTOR_VERTICAL","factor_k")
存储分钟频率、日频因子。采用OLAP引擎。按年,以及因子名VALUE分区
多值宽表loadTable("dfs://K_FACTOR_WIDE","factor_k_wide")
宽表的列分别为时间列,因子名,各标的名称。采用TSDB引擎。按年,以及因子名VALUE分区
*/
def createFactorVerticalDbTable(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
dbDate = database("", RANGE, 2000.01M + (0..30)*12)
dbSymbol=database("", VALUE, `factor1`factor2)
db = database(directory=dbName, partitionType=COMPO, partitionScheme=[dbDate,dbSymbol])
mtable=table(1:0, `tradedate`securityid`factorname`val, [TIMESTAMP,SYMBOL,SYMBOL,DOUBLE]);
k_day = db.createPartitionedTable(table=mtable, tableName=tbName, partitionColumns=`tradedate`factorname)
}
def createFactorWideDbTable(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
dbDate = database("", RANGE, 2000.01M + (0..30)*12)
dbSymbol=database("", VALUE, `factor1`factor2)
db = database(directory=dbName, partitionType=COMPO, partitionScheme=[dbDate,dbSymbol],engine="TSDB")
baseColNames = `tradetime`factorname join ("sz"+lpad(string(000001..004000), 6, `0))
baseColType = ([TIMESTAMP,SYMBOL]).append!(take(DOUBLE,4000))
mtable=table(1:0, baseColNames,baseColType);
min_factor = db.createPartitionedTable(table=mtable, tableName=tbName, partitionColumns=`tradetime`factorname,sortColumns=`factorname`tradetime,compressMethods={tradetime:"delta"},keepDuplicates=LAST)
}
//创建因子函数配置表
def createFactorConfigTable(){
db=database("dfs://k_minute_level")
if(existsTable("dfs://k_minute_level", "factor_func_config"))
db.dropTable("factor_func_config")
factor_tab_model=table(1:0,`func_name`factor_name`start_date`end_date,[STRING,STRING,DATE,DATE])
factor_func_config=db.createTable(table=factor_tab_model, tableName=`factor_func_config)
factor_func_config=loadTable("dfs://k_minute_level",`factor_func_config)
func_table = table(take(`factorDoubleEMA,20) as func_name, string(1..20) as factorname, 2020.01.01..2020.01.20 as start_date,2020.01.02..2020.01.21 as end_date)
factor_func_config.append!(func_table)
}
//定义一个根据因子名,实践动态调用并保存数据到宽表的通用接口函数
def write_in_wide_database(funcN,factorN,start_date, end_date){
res = select tradetime, securityid, `doubleEMA as factorname, funcByName(funcN)(close) as factor_value from loadTable("dfs://k_minute_level","k_minute") where date(tradetime) between start_date : end_date context by securityid
if(res.size()==0) return
pivot_res = select factor_value from res pivot by tradetime,securityid
pivot_res[`factorname]=string(factorN)
reorderColumns!(pivot_res,`tradetime`factorname)
res_t = loadTable("dfs://K_FACTOR_WIDE","factor_k_wide")
res_t.append!(pivot_res)
}
//建表操作函数
def createAllTables(){
createMinuteDbTable("dfs://k_minute_level","k_minute")
createFactorVerticalDbTable("dfs://K_FACTOR_VERTICAL", "factor_k")
createFactorWideDbTable("dfs://K_FACTOR_WIDE", "factor_k_wide")
}
//分钟收益率因子计算
def getMinReturn(){
minReturn = select `dayReturnSkew as factorname, dayReturnSkew(close) as val from loadTable("dfs://k_minute_level","k_minute") where date(tradetime) between 2020.01.02 : 2020.01.31 group by date(tradetime) as tradetime, securityid map
return minReturn
}
//分钟收益率因子按宽表入库函数
def writeMinReturnFactorInWideTable(minReturn){
pivot_res = select val from minReturn pivot by TradeTime, SecurityID
pivot_res[`factorname]=`flow
reorderColumns!(pivot_res,`TradeTime`factorname)
loadTable("dfs://K_FACTOR_WIDE","factor_k_wide").append!(pivot_res)
}
//计算doubleEMA因子
def getDoubleEMA(){
doubleEMA = select tradetime, securityid, `doubleEMA as factorname, factorDoubleEMA(close) as val from loadTable("dfs://k_minute_level","k_minute") where date(tradetime) between 2020.01.01 : 2020.01.31 context by securityid
return doubleEMA
}
//DoubleEMA因子按宽表入库函数
def writeDoubleEMAFactorInWideTable(doubleEMA){
pivot_res = select val from doubleEMA pivot by TradeTime, SecurityID
pivot_res[`factorname]=`flow
reorderColumns!(pivot_res,`TradeTime`factorname)
loadTable("dfs://K_FACTOR_WIDE","factor_k_wide").append!(pivot_res)
}
login("admin","123456")
//1. 创建分钟频数据库表、因子单值模型库表,因子多值模型库表
createAllTables()
//2. 模拟写入12个月数据
writeInMinuteByMonth(12)
sleep(40000)
//allJobs=select * from getRecentJobs() order by endTime desc
//3.1 计算因子并写入单值模型存储
//计算分钟收益率因子
minReturn=getMinReturn()
//计算doubleEMA因子
doubleEMA=getDoubleEMA()
//因子写入纵表
loadTable("dfs://K_FACTOR_VERTICAL","factor_k").append!(minReturn)
loadTable("dfs://K_FACTOR_VERTICAL","factor_k").append!(doubleEMA)
//3.2 计算并写入宽表多值模型存储
//分钟收益率因子按宽表入库
writeMinReturnFactorInWideTable(minReturn)
//DoubleEMA因子按宽表入库
writeDoubleEMAFactorInWideTable(doubleEMA)
//4 因子计算工程化
//将计算函数保存到数据库,以便系统调度使用
addFunctionView(factorDoubleEMA)
//创建因子调度配置表及配置数据
createFactorConfigTable()
//daily_cal:用来批量提交job,计算增量数据因子,并保存到宽表
def daily_cal(){
//加载因子函数配置表
factor_func_config=select * from loadTable("dfs://k_minute_level","factor_func_config")
for (i in 0..(size(factor_func_config)-1)){
funcN=factor_func_config[`func_name][i]
factorN=factor_func_config[`factor_name][i]
start_date=factor_func_config[`start_date][i]
end_date=factor_func_config[`end_date][i]
submitJob("submitjob_sche"+funcN+factorN, "write_in_wide_database"+funcN+factorN+start_date, write_in_wide_database,funcN,factorN,start_date, end_date)
}
}
//通过配置schedule 定时启动daily_cal ,适用案例时注意调整时间
scheduleJob(jobId=`daily1, jobDesc="Daily Job 1", jobFunc=daily_cal, scheduleTime=16:51m, startDate=2022.05.08, endDate=2022.05.08, frequency='D');
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )