Слияние кода завершено, страница обновится автоматически
def createResultTable(){
return table(
array(SYMBOL, 0) as SecurityID,
array(DATETIME, 0) as TradeTime,
array(DOUBLE, 0) as smallBuyOrderAmount,
array(DOUBLE, 0) as smallSellOrderAmount,
array(DOUBLE, 0) as totalOrderAmount,
array(DOUBLE, 0) as factor)
}
def createTradeSchema(){
return table(
array(SYMBOL, 0) as SecurityID,
array(INT, 0) as BuyNo,
array(INT, 0) as SellNo,
array(DATETIME, 0) as TradeTime,
array(DOUBLE, 0) as TradeAmount)
}
def createResult1Schema() {
return table(
array(INT, 0) as BuyNo,
array(SYMBOL, 0) as SecurityID,
array(INT, 0) as SellNo,
array(DATETIME, 0) as TradeTime,
array(DOUBLE, 0) as TradeAmount,
array(DOUBLE, 0) as BuyCumAmount,
array(DOUBLE, 0) as PrevBuyCumAmount,
array(INT, 0) as BuyOrderFlag,
array(INT, 0) as PrevBuyOrderFlag)
}
def createResult2Schema() {
return table(
array(INT, 0) as SellNo,
array(INT, 0) as BuyNo,
array(SYMBOL, 0) as SecurityID,
array(DATETIME, 0) as TradeTime,
array(DOUBLE, 0) as TradeAmount,
array(DOUBLE, 0) as BuyCumAmount,
array(DOUBLE, 0) as PrevBuyCumAmount,
array(INT, 0) as BuyOrderFlag,
array(INT, 0) as PrevBuyOrderFlag,
array(DOUBLE, 0) as SellCumAmount,
array(DOUBLE, 0) as PrevSellCumAmount,
array(INT, 0) as SellOrderFlag,
array(INT, 0) as PrevSellOrderFlag)
}
def cleanStreamEngines(engineNames){
for(name in engineNames){
try{
dropStreamEngine(name)
}
catch(ex){}
}
}
@state
def factorOrderCumAmount(tradeAmount){
cumsumTradeAmount = cumsum(tradeAmount)
prevCumsumTradeAmount = prev(cumsumTradeAmount)
orderFlag = iif(cumsumTradeAmount<100000, 0, 1)
prevOrderFlag = prev(orderFlag)
return cumsumTradeAmount, prevCumsumTradeAmount, orderFlag, prevOrderFlag
}
@state
def factorSmallOrderNetAmountRatio(tradeAmount, sellCumAmount, sellOrderFlag, prevSellCumAmount, prevSellOrderFlag, buyCumAmount, buyOrderFlag, prevBuyCumAmount, prevBuyOrderFlag){
cumsumTradeAmount = cumsum(tradeAmount)
smallSellCumAmount, bigSellCumAmount = dynamicGroupCumsum(sellCumAmount, prevSellCumAmount, sellOrderFlag, prevSellOrderFlag, 2)
smallBuyCumAmount, bigBuyCumAmount = dynamicGroupCumsum(buyCumAmount, prevBuyCumAmount, buyOrderFlag, prevBuyOrderFlag, 2)
f = (smallBuyCumAmount - smallSellCumAmount) \ cumsumTradeAmount
return smallBuyCumAmount, smallSellCumAmount, cumsumTradeAmount, f
}
def createStreamEngine(result){
tradeSchema = createTradeSchema()
result1Schema = createResult1Schema()
result2Schema = createResult2Schema()
engineNames = ["rse1", "rse2", "rse3"]
cleanStreamEngines(engineNames)
metrics3 = <[TradeTime, factorSmallOrderNetAmountRatio(tradeAmount, sellCumAmount, sellOrderFlag, prevSellCumAmount, prevSellOrderFlag, buyCumAmount, buyOrderFlag, prevBuyCumAmount, prevBuyOrderFlag)]>
rse3 = createReactiveStateEngine(name=engineNames[2], metrics=metrics3, dummyTable=result2Schema, outputTable=result, keyColumn="SecurityID")
metrics2 = <[BuyNo, SecurityID, TradeTime, TradeAmount, BuyCumAmount, PrevBuyCumAmount, BuyOrderFlag, PrevBuyOrderFlag, factorOrderCumAmount(TradeAmount)]>
rse2 = createReactiveStateEngine(name=engineNames[1], metrics=metrics2, dummyTable=result1Schema, outputTable=rse3, keyColumn="SellNo")
metrics1 = <[SecurityID, SellNo, TradeTime, TradeAmount, factorOrderCumAmount(TradeAmount)]>
return createReactiveStateEngine(name=engineNames[0], metrics=metrics1, dummyTable=tradeSchema, outputTable=rse2, keyColumn="BuyNo")
}
result = createResultTable()
rse = createStreamEngine(result)
insert into rse values(`000155, 1000, 1001, 2020.01.01T09:30:00, 20000)
insert into rse values(`000155, 1000, 1002, 2020.01.01T09:30:01, 40000)
insert into rse values(`000155, 1000, 1003, 2020.01.01T09:30:02, 60000)
insert into rse values(`000155, 1004, 1003, 2020.01.01T09:30:03, 30000)
select * from result
/*
SecurityID TradeTime smallBuyOrderAmount smallSellOrderAmount totalOrderAmount factor
---------- ------------------- ------------------- -------------------- ---------------- ------
000155 2020.01.01T09:30:00 20000 20000 20000 0
000155 2020.01.01T09:30:01 60000 60000 60000 0
000155 2020.01.01T09:30:02 0 120000 120000 -1
000155 2020.01.01T09:30:03 30000 150000 150000 -0.8
*/
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )