Слияние кода завершено, страница обновится автоматически
/*
* 模拟数据,如若您有日线数据可以直接入库,或者也可可以从快照或tick中计算取数
* 数据跨度为10年,4000只股票
* 一共9个字段,10年的数据压缩前大约为1G,分年导入。
* 此处创建一个按年VALUE分区的数据库,每个分区压缩前大约100M
* 本例涉及的因子是Alpha 1和 Alpha 98,日频数据一般采用面板模式计算,或者SQL模式计算,继而可以直接写入对应的单值或者多值结果表。
*/
//创建库表定义,如若已存在,则会drop已有的库重新建库建表
def createDayDbTable(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
db = database(dbName, RANGE, 2000.01M + (0..30)*12 )
t =table(1:0, `securityid`tradetime`open`close`high`low`vol`val`vwap, [SYMBOL,TIMESTAMP,DOUBLE,DOUBLE,DOUBLE,DOUBLE,INT,DOUBLE,DOUBLE])
db.createPartitionedTable(t, tbName, `tradetime)
}
//模拟数据数据定义
def genKday(n){
tradetime = select * from table(timestamp(distinct(businessDay(temporalAdd(2010.01.01,n, "y")..yearEnd(temporalAdd(2010.01.01,n, "y"))))).sort() as tradetime) where tradetime >=temporalAdd(2010.01.01,n, "y")
securityid ="sz"+lpad(string(000001..004000), 6, `0)
tmpTable = cj(table(securityid as securityid),tradetime)
open = rand(100.0, size(tradetime)*4000)
high = open + rand(1.0,size(tradetime)*4000)
low = high - rand(2.0,size(tradetime)*4000)
close = open + norm(0,2,size(tradetime)*4000)
vol = rand(100000,size(tradetime)*4000)
val = close*vol
vwap = close
resTable = tmpTable join table(open,close, high, low, vol, val, vwap)
tradeDate=NULL
tradeMin = NULL
tradetime =NULL
securityid =NULL
tmpTable = NULL
open =NULL
high = NULL
low = NULL
close =NULL
vol = NULL
val = NULL
vwap = NULL
db = loadTable("dfs://k_day_level","k_day")
db.append!(resTable)
}
def writeInDayByYear(numOfYear){
for (n in 0..(numOfYear - 1)){
submitJob("genKday_"+string(n),"genKday_"+string(n),genKday,n)
}
}
//因子定义:包括Alpha 1和Alpha 98在SQL模式及面板模式两种
//Alpha 1
//面板
def alpha1Panel(close){
return rowRank(X=mimax(pow(iif(ratios(close) - 1 < 0, mstd(ratios(close) - 1, 20),close), 2.0), 5), percent=true) - 0.5
}
//SQL模式
def alpha1SQL(t){
res = select tradetime, securityid, mimax(pow(iif(ratios(close) - 1 < 0, mstd(ratios(close) - 1, 20), close), 2.0), 5) as val from t context by securityid
return select tradetime, securityid, `alpha1 as factorname, rank(val, percent=true) - 0.5 as val from res context by tradetime
}
//Alpha 98
//面板
def prepareDataForDDBPanel(raw_data, start_time, end_time){
t = select tradetime,securityid, vwap,vol,open from raw_data where date(tradetime) between start_time : end_time
return dict(`vwap`open`vol, panel(t.tradetime, t.securityid, [t.vwap, t.open, t.vol]))
}
def alpha98Panel(vwap, open, vol){
return rowRank(X = mavg(mcorr(vwap, msum(mavg(vol, 5), 26), 5), 1..7),percent=true) - rowRank(X=mavg(mrank(9 - mimin(mcorr(rowRank(X=open,percent=true), rowRank(X=mavg(vol, 15),percent=true), 21), 9), true, 7), 1..8),percent=true)
}
//SQL模式
def alpha98SQL(t){
update t set adv5 = mavg(vol, 5), adv15 = mavg(vol, 15) context by securityid
update t set rank_open = rank(X = open,percent=true), rank_adv15 =rank(X=adv15,percent=true) context by date(tradetime)
update t set decay7 = mavg(mcorr(vwap, msum(adv5, 26), 5), 1..7), decay8 = mavg(mrank(9 - mimin(mcorr(rank_open, rank_adv15, 21), 9), true, 7), 1..8) context by securityid
return select tradetime,securityid, `alpha98 as factorname, rank(X =decay7,percent=true)-rank(X =decay8,percent=true) as val from t context by date(tradetime)
}
//面板形式计算Alpha 1因子
def calculateAlpha1InPanel(){
input = exec close from loadTable("dfs://k_day_level","k_day") where date(tradetime) between 2010.01.01 : 2020.01.31 pivot by tradetime, securityid
resAlpha1Panel = alpha1Panel(input)
return resAlpha1Panel
}
//SQL形式计算Alpha 1因子
def calculateAlpha1InSQL(){
input = select tradetime,securityid, close from loadTable("dfs://k_day_level","k_day") where date(tradetime) between 2010.01.01 : 2010.12.31
alpha1DDBSql = alpha1SQL(input)
return alpha1DDBSql
}
//面板形式计算Alpha 98因子
def calculateAlpha98InPanel(){
raw_data = loadTable("dfs://k_day_level","k_day")
start_time = 2010.01.01
end_time = 2019.12.31
input = prepareDataForDDBPanel(raw_data, start_time, end_time)
alpha98DDBPanel = alpha98Panel(input.vwap, input.open, input.vol)
return alpha98DDBPanel
}
//SQL形式计算Alpha 98因子
def calculateAlpha98InSQL(){
//SQL模式
input = select tradetime,securityid, vwap,vol,open from loadTable("dfs://k_day_level","k_day") where date(tradetime) between 2010.01.01 : 2019.12.31
alpha98DDBSql = alpha98SQL(input)
return alpha98DDBSql
}
/*
因子数据库表结构定义:
单值存储表loadTable("dfs://K_FACTOR_VERTICAL","factor_k")
存储分钟频率、日频因子。采用OLAP引擎。按年,以及因子名VALUE分区
多值宽表loadTable("dfs://K_FACTOR_WIDE","factor_k_wide")
宽表的列分别为时间列,因子名,各标的名称。采用TSDB引擎。按年,以及因子名VALUE分区
*/
def createFactorVerticalDbTable(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
dbDate = database("", RANGE, 2000.01M + (0..30)*12)
dbSymbol=database("", VALUE, `factor1`factor2)
db = database(directory=dbName, partitionType=COMPO, partitionScheme=[dbDate,dbSymbol])
mtable=table(1:0, `tradedate`securityid`factorname`val, [TIMESTAMP,SYMBOL,SYMBOL,DOUBLE]);
k_day = db.createPartitionedTable(table=mtable, tableName=tbName, partitionColumns=`tradedate`factorname)
}
def createFactorWideDbTable(dbName, tbName){
if(existsDatabase(dbName)){
dropDatabase(dbName)
}
dbDate = database("", RANGE, 2000.01M + (0..30)*12)
dbSymbol=database("", VALUE, `factor1`factor2)
db = database(directory=dbName, partitionType=COMPO, partitionScheme=[dbDate,dbSymbol],engine="TSDB")
baseColNames = `tradetime`factorname join ("sz"+lpad(string(000001..004000), 6, `0))
baseColType = ([TIMESTAMP,SYMBOL]).append!(take(DOUBLE,4000))
mtable=table(1:0, baseColNames,baseColType);
min_factor = db.createPartitionedTable(table=mtable, tableName=tbName, partitionColumns=`tradetime`factorname,sortColumns=`factorname`tradetime,compressMethods={tradetime:"delta"},keepDuplicates=LAST)
}
//创建表函数
def createAllTables(){
createDayDbTable("dfs://k_day_level","k_day")
createFactorVerticalDbTable("dfs://K_FACTOR_VERTICAL", "factor_k")
createFactorWideDbTable("dfs://K_FACTOR_WIDE", "factor_k_wide")
}
//Alpha1因子转为宽表入库
def writeAlpha1InWideTable(resAlpha1Panel){
resWideAlpha1 = table(resAlpha1Panel.rowNames() as tradetime, resAlpha1Panel).rename!(array(STRING,0,100).append!(["tradetime"]).append!(resAlpha1Panel.colNames()))
resWideAlpha1[`factorname]=`alpha1
reorderColumns!(resWideAlpha1,`tradetime`factorname)
loadTable("dfs://K_FACTOR_WIDE","factor_k_wide").append!(resWideAlpha1)
}
//Alpha98因子转为宽表入库
def writeAlpha98InWideTable(alpha98DDBPanel){
alpha98DDBPanel_wide = table(alpha98DDBPanel.rowNames() as tradetime, alpha98DDBPanel).rename!(array(STRING,0,100).append!(["tradetime"]).append!(alpha98DDBPanel.colNames()))
alpha98DDBPanel_wide[`factorname]=`alpha98
reorderColumns!(alpha98DDBPanel_wide,`tradetime`factorname)
loadTable("dfs://K_FACTOR_WIDE","factor_k_wide").append!(alpha98DDBPanel_wide)
}
//1. 创建分钟频数据库表、因子单值模型库表,因子多值模型库表
createAllTables()
//2. 模拟写入10年数据
writeInDayByYear(10)
sleep(5000)
allJobs=select * from getRecentJobs() order by endTime desc
//3. 计算并写入因子库
// 3.1 计算
//面板形式计算Alpha 1因子
resAlpha1Panel=calculateAlpha1InPanel()
//SQL形式计算Alpha 1因子
alpha1DDBSql=calculateAlpha1InSQL()
//面板形式计算Alpha 98因子
alpha98DDBPanel=calculateAlpha98InPanel()
//SQL形式计算Alpha 98因子
alpha98DDBSql=calculateAlpha98InSQL()
//3.2 写入因子结果库表
//写入单值模型存储:SQL模式的因子结果可以直接写入单值模型
loadTable("dfs://K_FACTOR_VERTICAL","factor_k").append!(alpha1DDBSql)
loadTable("dfs://K_FACTOR_VERTICAL","factor_k").append!(alpha98DDBSql)
//写入多值模型存储:面板模式计算的因子,只需要转成table,可以直接写入多指模型
//Alpha1因子转为宽表入库
writeAlpha1InWideTable(resAlpha1Panel)
//Alpha98因子转为宽表入库
writeAlpha98InWideTable(alpha98DDBPanel)
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )