Слияние кода завершено, страница обновится автоматически
/**
createStreamTB.txt
Script to create stream tables for publishing
DolphinDB Inc.
DolphinDB server version: 1.30.18 2022.05.09/2.00.6 2022.05.09
Last modification time: 2022.05.31
*/
// clean up environment
def cleanEnvironment(parallel){
for(i in 1..parallel){
try { unsubscribeTable(tableName="tradeOriginalStream", actionName="processBuyOrder" + string(i)) } catch(ex) { print(ex) }
try { dropStreamEngine("processBuyOrder" + string(i)) } catch(ex) { print(ex) }
try { dropStreamEngine("processSellOrder" + string(i)) } catch(ex) { print(ex) }
try { dropStreamEngine("processCapitalFlow" + string(i)) } catch(ex) { print(ex) }
}
try { unsubscribeTable(tableName="capitalFlowStream", actionName="processCapitalFlow60min") } catch(ex) { print(ex) }
try { dropStreamEngine("processCapitalFlow60min") } catch(ex) { print(ex) }
try{ dropStreamTable(`tradeOriginalStream) } catch(ex){ print(ex) }
try{ dropStreamTable(`capitalFlowStream) } catch(ex){ print(ex) }
try{ dropStreamTable(`capitalFlowStream60min) } catch(ex){ print(ex) }
undef all
}
//calculation parallel, developers need to modify according to the development environment
parallel = 3
cleanEnvironment(parallel)
def createStreamTableFunc(){
//create stream table: tradeOriginalStream
colName = `SecurityID`Market`TradeTime`TradePrice`TradeQty`TradeAmount`BuyNum`SellNum
colType = [SYMBOL, SYMBOL, TIMESTAMP, DOUBLE, INT, DOUBLE, INT, INT]
tradeOriginalStreamTemp = streamTable(20000000:0, colName, colType)
try{ enableTableShareAndPersistence(table=tradeOriginalStreamTemp, tableName="tradeOriginalStream", asynWrite=true, compress=true, cacheSize=20000000, retentionMinutes=1440, flushMode=0, preCache=10000) } catch(ex){ print(ex) }
undef("tradeOriginalStreamTemp")
//create stream table: capitalFlow
colName = `SecurityID`TradeTime`TotalAmount`SellSmallAmount`SellMediumAmount`SellBigAmount`SellSmallCount`SellMediumCount`SellBigCount`BuySmallAmount`BuyMediumAmount`BuyBigAmount`BuySmallCount`BuyMediumCount`BuyBigCount
colType = [SYMBOL, TIMESTAMP, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT]
capitalFlowStreamTemp = streamTable(20000000:0, colName, colType)
try{ enableTableShareAndPersistence(table=capitalFlowStreamTemp, tableName="capitalFlowStream", asynWrite=true, compress=true, cacheSize=20000000, retentionMinutes=1440, flushMode=0, preCache=10000) } catch(ex){ print(ex) }
undef("capitalFlowStreamTemp")
//create stream table: capitalFlowStream60min
colName = `TradeTime`SecurityID`TotalAmount`SellSmallAmount`SellMediumAmount`SellBigAmount`SellSmallCount`SellMediumCount`SellBigCount`BuySmallAmount`BuyMediumAmount`BuyBigAmount`BuySmallCount`BuyMediumCount`BuyBigCount
colType = [TIMESTAMP, SYMBOL, DOUBLE, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT, DOUBLE, DOUBLE, DOUBLE, INT, INT, INT]
capitalFlowStream60minTemp = streamTable(1000000:0, colName, colType)
try{ enableTableShareAndPersistence(table=capitalFlowStream60minTemp, tableName="capitalFlowStream60min", asynWrite=true, compress=true, cacheSize=1000000, retentionMinutes=1440, flushMode=0, preCache=10000) } catch(ex){ print(ex) }
undef("capitalFlowStreamTemp")
}
createStreamTableFunc()
go
setStreamTableFilterColumn(tradeOriginalStream, `SecurityID)
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )