华锐 AMD 高速行情传输平台是华锐技术推出的行情分发平台,行情消费者可以通过华锐高速行情转码 API (AMA) 对接 AMD,接收转码行情。为了对接该平台,DolphinDB 开发了调用 AMA 的 AMD 插件。目前已经支持的数据源包括:
最新支持的数据源列表以 AMD 插件手册 为准。
DolphinDB 仅提供对接 AMA 的 AMD 插件,数据源和接入服务需咨询数据服务商或证券公司。
AMD 插件目前支持的 server 版本:
本教程基于 AMD Plugin release200.10 开发,故须要使用 DolphinDB 2.00.10 版本 server。若为其它版本 server,请切换至相应插件分支下载插件包。本教程部署形态为 Linux 单节点,若集群使用,可参考配置到任一数据节点。
用户可以根据 DolphinDB server 版本下载对应的已经编译好的插件文件,如 DolphinDB 2.00.10 server 对应的 AMD Plugin release200.10 的官方下载链接。
注意到 bin 目录下为 AMA 版本号文件夹,当前支持 3.9.8 和 4.0.1 版本,如果需要其他版本的 AMA 对应的插件,需要自行编译或联系 DolphinDB 技术支持帮助编译。
本教程使用 4.0.1 版本 AMA 为例,下载仓库的 bin/4.0.1 目录下所有文件到 <DolphinDB 安装路径>/plugins/amdquote
,如果不存在 amdquote 文件夹需自行创建。
下载好插件文件后,需要添加动态库地址到环境变量中。注意插件的安装路径需要根据实际环境修改,本例中插件的安装路径为 /DolphinDB/server/plugins/amdquote
,启动 server 前需要执行如下命令:
export LD_LIBRARY_PATH="$LD_LIBRARY_PATH:/DolphinDB/server/plugins/amdquote"
可以考虑将该命令添加到 .bashrc 或启动节点的 shell 脚本以自动执行。
可以通过两种方法加载插件:
preloadModules
配置项loadPlugin
方法添加如下配置项到配置文件 dolphindb.cfg 即可实现节点启动时自动加载 AMD 插件。
preloadModules=plugins::amdquote
在脚本中通过如下方法加载 AMD 插件:
// 加载插件
loadPlugin("/DolphinDB/server/plugins/amdquote/PluginAmdQuote.txt")
插件不支持重复加载,重复加载会报错。故如果已经配置 preloadModules,或者已经配置 loadPlugin 到启动脚本,则无需重复调用 loadPlugin。可通过 try-catch 捕获该重复调用的错误,以避免中断脚本的执行。
传入 loadPlugin 方法的插件路径支持安装目录下的相对路径,上述脚本可写为 loadPlugin("plugins/amdquote/PluginAmdQuote.txt")
本教程示例脚本中,订阅了上海和深圳市场的快照、逐笔成交、逐笔委托数据到 3 张流表中,并对订阅数据做了一些实时转换,然后分别存储到 3 张分布式表里。具体数据与表的对应关系如下:
数据源 | 流表 | 分布式库表 |
---|---|---|
上海市场股票快照 深圳市场股票快照 上海市场可转债快照 深圳市场可转债快照 上海市场 ETF 基金快照 深圳市场 ETF 基金快照 |
snapshot | dfs://amd snapshot |
上海市场股票逐笔委托 深圳市场股票逐笔委托 |
order | dfs://amd order |
上海市场股票逐笔成交 深圳市场股票逐笔成交 |
execution | dfs://amd execution |
另外可以通过订阅逐笔数据到跨进程共享内存表(IPC表),然后通过 C++ API 订阅 IPC 表,从而实现低延时接收行情数据到 C++ 程序进行处理。
整个数据流转的过程如下图所示:
AMD 插件不支持订阅一个数据源同时输出到多张表,上图中逐笔成交订阅只能要么输出到流表 execution,要么输出到跨进程共享内存表 IPCExecution。本例将统计两种方式数据的全过程穿透时间作性能分析。
通过 amdQuote::connect(username, password, ips, ports, options) 方法建立与 AMD 行情服务器的连接,参考脚本如下:
// 加载插件
try{loadPlugin("plugins/amdquote/PluginAmdQuote.txt")} catch(ex){print(ex)}
// 连接行情服务器
option = dict(STRING, ANY)
option[`ReceivedTime] = true
option[`DailyIndex] = true
handle = amdQuote::connect(
"myUsername",
"myPassword",
["110.110.10.10"],
[7890],
option
)
其中 option 为可选参数,类型为字典,表示扩展参数。receivedTime 表示是否添加插件收到行情数据时的时间戳的数据列。dailyIndex 表示是否添加每天按 channel_no 分组递增的数据列。
为简化分布式库表和流数据表的创建脚本,可以在 amdQuote::connect(username, password, ips, ports, options) 方法建立连接后调用 amdQuote::getSchema(type) 方法获取行情数据各个表的表结构,参考脚本如下:
// 获取行情数据的表结构,以 snapshot 为例
snapshotTbName = "snapshot"
snapshotSchema = amdQuote::getSchema(`snapshot)
enableTableShareAndPersistence(table=streamTable(20000000:0, snapshotSchema.name, snapshotSchema.type), tableName=snapshotTbName, cacheSize=20000000)
本例教程中,希望对没有日期字段的表的开头增加一个日期字段 tradeDate,在自定义数据处理方法中更新其值为表的时间戳字段的日期部分,故修改上述脚本为:
// 获取行情数据的表结构并建流表,以 snapshot 为例
snapshotTbName = "snapshot"
snapshotSchema = amdQuote::getSchema(`snapshot)
colName = append!(["tradeDate"], snapshotSchema.name)
colType = append!(["DATE"], snapshotSchema.type)
enableTableShareAndPersistence(table=streamTable(20000000:0, colName, colType), tableName=snapshotTbName, cacheSize=20000000)
同理可以使用 amdQuote::getSchema(type) 方法返回的表结构建立分布式库表,参考脚本如下:
// 获取行情数据的表结构并建分布式库表,以 snapshot 为例
snapshotSchema = amdQuote::getSchema(`snapshot)
dbName = "dfs://amd"
snapshotTbName = "snapshot"
if (!existsDatabase(dbName)) {
dbDate = database("", VALUE, 2023.01.01..2023.12.31)
dbCode = database("", HASH, [SYMBOL, 10])
db = database(dbName, COMPO, [dbDate, dbCode], engine="TSDB")
} else {
db = database(dbName)
}
if (!existsTable(dbName, snapshotTbName)) {
colName = append!(["tradeDate"], snapshotSchema.name)
colType = append!(["DATE"], snapshotSchema.type)
tbSchema = table(1:0, colName, colType)
pt = db.createPartitionedTable(table=tbSchema, tableName=snapshotTbName, partitionColumns=`tradeDate`securityCode, sortColumns=`securityCode`origTime, keepDuplicates=ALL)
}
本例教程中,希望对原始的行情数据做如下处理:
可以通过订阅方法 amdQuote::subscribe(handle, type, streamTable, marketType, codeList, transform) 的 transform 参数实现自定义处理接收到的行情数据。transform 参数要求为一个一元函数,其参数是一个表,为接收到的原始行情数据。根据需求编写自定义处理方法,参考脚本如下:
// 订阅处理方法,以 snapshot 为例
def handleSnapshotSubs(mutable msg, reorderedColNames) {
// 增加一个日期字段 tradeDate,其值为对应的时间戳字段的日期部分
update msg set tradeDate = date(origTime)
// 股票代码字段增加后缀,上海市场的后缀为".SH",深圳市场的后缀为".SZ"
update msg set securityCode = securityCode + ".SZ" where marketType=102
update msg set securityCode = securityCode + ".SH" where marketType=101
// 所有价格字段值除以 100
update msg set lastPrice = lastPrice / 100
update msg set openPrice = openPrice / 100
update msg set highPrice = highPrice / 100
update msg set lowPrice = lowPrice / 100
update msg set preClosePrice = preClosePrice / 100
update msg set offerPrice1 = offerPrice1 / 100
update msg set offerPrice2 = offerPrice2 / 100
update msg set offerPrice3 = offerPrice3 / 100
update msg set offerPrice4 = offerPrice4 / 100
update msg set offerPrice5 = offerPrice5 / 100
update msg set offerPrice6 = offerPrice6 / 100
update msg set offerPrice7 = offerPrice7 / 100
update msg set offerPrice8 = offerPrice8 / 100
update msg set offerPrice9 = offerPrice9 / 100
update msg set offerPrice10 = offerPrice10 / 100
update msg set bidPrice1 = bidPrice1 / 100
update msg set bidPrice2 = bidPrice2 / 100
update msg set bidPrice3 = bidPrice3 / 100
update msg set bidPrice4 = bidPrice4 / 100
update msg set bidPrice5 = bidPrice5 / 100
update msg set bidPrice6 = bidPrice6 / 100
update msg set bidPrice7 = bidPrice7 / 100
update msg set bidPrice8 = bidPrice8 / 100
update msg set bidPrice9 = bidPrice9 / 100
update msg set bidPrice10 = bidPrice10 / 100
update msg set weightedAvgOfferPrice = weightedAvgOfferPrice / 100
update msg set weightedAvgBidPrice = weightedAvgBidPrice / 100
update msg set highLimited = highLimited / 100
update msg set lowLimited = lowLimited / 100
// 调整列顺序为与流表、分布式表一致
reorderColumns!(msg, reorderedColNames)
return msg
}
注意到该函数为二元函数,而 transform 参数要求为一元函数,可以通过部分应用来固定一个参数值使其变为一元函数,详见 3.6 AMD 订阅一节。
注意:传入的一元函数中不能存在对 DFS 表的操作,例如:读取或写入 DFS 表,获取 DFS 表的 schema 等。故上述脚本的分布式表的字段顺序 reorderedColNames 需要通过参数传入,而不能直接在 transform 处理方法中读取分布式表的字段顺序。
通过 subscribeTable 方法订阅建立的行情数据流表入库,参考脚本如下:
// 入库订阅,以 snapshot 为例
pt = loadTable(dbName, snapshotTbName)
subscribeTable(tableName=snapshotTbName, actionName="saveSnapshotToDFS", offset=-2, handler=pt, msgAsTable=true, batchSize=200000, throttle=60)
创建完相关订阅后,可以通过执行如下脚本查看已创建的订阅信息:
getStreamingStat().pubTables
tableName | subscriber | msgOffset | actions |
---|---|---|---|
snapshot | localhost:8893 | 0 | saveSnapshotToDFS |
通过 amdQuote::subscribe(handle, type, streamTable, marketType, codeList, transform) 方法订阅 AMD 实时行情数据到指定流表,参考脚本如下:
// AMD 订阅,以 snapshot为例
reorderedColNames = loadTable(dbName, snapshotTbName).schema().colDefs.name
amdQuote::subscribe(handle, `snapshot, snapshot, 101, , handleSnapshotSubs{reorderedColNames=reorderedColNames})
amdQuote::subscribe(handle, `snapshot, snapshot, 102, , handleSnapshotSubs{reorderedColNames=reorderedColNames})
对部分重要参数值的解释:
创建完相关订阅后,可以通过执行如下脚本查看已创建的订阅信息:
amdQuote::getStatus(handle)
dataType | marketType |
---|---|
snapshot | 101 |
snapshot | 102 |
除了股票的快照行情数据,还希望订阅可转债和 ETF 基金的快照行情数据到 snapshot 流表里。但是可转债和 ETF 的代码表每天变化,故需要每天更新订阅的代码表。AMD 插件提供了 amdQuote::getCodeList() 方法获取所有证券的代码表,以及 amdQuote::getETFCodeList() 方法专门获取 ETF 的代码表,故可以通过配置每日定时作业获取新的代码表来重新订阅。对于可转债的代码表,由于其代码有规律可循,可以通过 where 条件对所有证券的代码表进行过滤得到。参考脚本如下:
amdQuote::getCodeList() 的结果包含 amdQuote::getETFCodeList() 的结果。
def subscribeConvertibleBond(handle, reorderedColNames) {
// 通过代码规律过滤得到可转债代码表
codeList = amdQuote::getCodeList()
convertibleBondCodeSh = exec securityCode from codeList where marketType = 101 and (securityCode like "110%" or securityCode like "111%" or securityCode like "113%" or securityCode like "118%" or securityCode like "1320%")
convertibleBondCodeSz = exec securityCode from codeList where marketType = 102 and (securityCode like "123%" or securityCode like "127%" or securityCode like "128%")
amdQuote::subscribe(handle, `bondSnapshot, objByName("snapshot"), 101, convertibleBondCodeSh, handleSnapshotSubs{reorderedColNames=reorderedColNames})
amdQuote::subscribe(handle, `bondSnapshot, objByName("snapshot"), 102, convertibleBondCodeSz, handleSnapshotSubs{reorderedColNames=reorderedColNames})
}
def subscribeEtfFund(handle, reorderedColNames) {
// 获取ETF代码表
codeList = exec securityCode from amdQuote::getETFCodeList()
amdQuote::subscribe(handle, `fundSnapshot, objByName("snapshot"), 101, codeList, handleSnapshotSubs{reorderedColNames=reorderedColNames})
amdQuote::subscribe(handle, `fundSnapshot, objByName("snapshot"), 102, codeList, handleSnapshotSubs{reorderedColNames=reorderedColNames})
}
def subscribeAmdDaily() {
option = dict(STRING, ANY)
option[`ReceivedTime] = true
option[`DailyIndex] = true
handle = amdQuote::connect(
"myUsername",
"myPassword",
["110.110.10.10"],
[7890],
option
)
reorderedColNames = loadTable("dfs://amd", "snapshot").schema().colDefs.name
subscribeConvertibleBond(handle, reorderedColNames)
subscribeEtfFund(handle, reorderedColNames)
}
scheduleJob("subscribeAmdDaily", "subscribeAmdDaily", subscribeAmdDaily, 09:11m, startDate=2022.12.13, endDate=2032.12.13, frequency='D')
重复调用 amdQuote::subscribe 会自动取消之前的相同 type 参数的订阅
对于合并类型行情 "bondExecution","orderExecution","fundOrderExecution",支持按 channel 订阅各个 channel 的数据到各自的流表,从而可以对不同 channel 的数据分别做相应的处理;也可以订阅指定 channel 的数据到同一张流表。以订阅 "orderExecution" 行情为例,分别订阅 channel 1 和 2 的数据到流表 orderExecution1 和 orderExecution2,参考订阅脚本如下:
// 以 orderExecution 为例
executionSchema = amdQuote::getSchema(`orderExecution)
enableTableShareAndPersistence(table=streamTable(20000000:0, executionSchema.name, executionSchema.type), tableName=`orderExecution1, cacheSize=20000000)
enableTableShareAndPersistence(table=streamTable(20000000:0, executionSchema.name, executionSchema.type), tableName=`orderExecution2, cacheSize=20000000)
d = dict(INT, ANY)
d[1] = orderExecution1
d[2] = orderExecution2
amdQuote::subscribe(handle, `orderExecution, d, 101)
amdQuote::subscribe(handle, type, streamTable, marketType, codeList, transform) 方法的 streamTable 参数支持跨进程共享内存表(IPC 表),可以直接将行情数据写入到 IPC 表中,相关脚本如下:
// 订阅到跨进程共享内存表
dbName = "dfs://amd"
executionTbName = "execution"
tbName = "IPCExecutionTb"
executionSchema = amdQuote::getSchema(`execution)
colName = append!(["tradeDate"], executionSchema.name)
colType = append!(["DATE"], executionSchema.type)
tmpColDefs = table(1:0, colName, colType).schema().colDefs
if (!existsTable(dbName, executionTbName)) {
colName = append!(["tradeDate"], executionSchema.name)
colType = append!(["DATE"], executionSchema.type)
tbSchema = table(1:0, colName, colType)
pt = db.createPartitionedTable(table=tbSchema, tableName=executionTbName, partitionColumns=`tradeDate`securityCode, sortColumns=`securityCode`execTime, keepDuplicates=ALL)
}
try { dropIPCInMemoryTable(tbName) } catch(ex) { print(ex) }
IPCExecutionTb = createIPCInMemoryTable(1000000, tbName, tmpColDefs.name, tmpColDefs.typeInt)
def handleExecutionSubs(mutable msg, reorderedColNames) {
// 增加一个日期字段 tradeDate,其值为对应的时间戳字段的日期部分
update msg set tradeDate = date(execTime)
update msg set securityCode = securityCode + ".SZ" where marketType = 102
update msg set securityCode = securityCode + ".SH" where marketType = 101
// 所有价格字段值除以 100
update msg set execPrice = execPrice / 100
// 调整列顺序为与流表、分布式表一致
reorderColumns!(msg, reorderedColNames)
return msg
}
reorderedColNames = loadTable(dbName, executionTbName).schema().colDefs.name
amdQuote::subscribe(handle, `execution, IPCExecutionTb, 101, , handleExecutionSubs{reorderedColNames=reorderedColNames})
amdQuote::subscribe(handle, `execution, IPCExecutionTb, 102, , handleExecutionSubs{reorderedColNames=reorderedColNames})
可以通过 C++ API 订阅 IPC 表,从而实现低延时接收行情数据到 C++ 程序进行处理。相较于通过网络传输订阅,通过 IPC 表订阅的方式时延更低,但要求发布端和接收端在同一物理机上。参考核心 C++ 代码如下:
IPCInMemoryStreamClient memTableClient;
// 创建一个存储数据的 table,要求和 createIPCInMemoryTable 中列的类型和名称一一对应
vector<string> colNames = {"marketType", "securityCode", "execTime", "channelNo", "applSeqNum", "execPrice", "execVolume", "valueTrade", "bidAppSeqNum", "offerApplSeqNum", "side", "execType", "mdStreamId", "bizIndex", "varietyCategory", "receivedTime", "dailyIndex", "perPenetrationTime"};
vector<DATA_TYPE> colTypes = {DT_INT, DT_SYMBOL, DT_TIMESTAMP, DT_INT, DT_LONG, DT_LONG, DT_LONG, DT_LONG, DT_LONG, DT_LONG, DT_CHAR, DT_CHAR, DT_STRING, DT_LONG, DT_CHAR, DT_NANOTIMESTAMP, DT_INT, DT_NANOTIME};
int rowNum = 0, indexCapacity=10000;
// 创建一个和共享内存表结构相同的表
TableSP outputTable = Util::createTable(colNames, colTypes, rowNum, indexCapacity);
// 是否覆盖前面旧的数据
bool overwrite = true;
ThreadSP thread = memTableClient.subscribe(tableName, dumpData, outputTable, overwrite);
thread->join();
完整参考代码见附件main.cpp。
可以通过配置用户启动脚本 startup.dos 实现节点启动时自动订阅 AMD 实时行情数据入库。单节点模式下配置在 dolphindb.cfg,集群模式下配置在 cluster.cfg。参考配置如下:
startup=/DolphinDB/server/startup.dos
在配置的 startup.dos 文件内配置订阅 AMD 相关业务脚本,即可实现节点启动时自动订阅 AMD 实时行情数据入库,参考脚本见附件 startup.dos(包含建库表)。
amdQuote::connect 方法的 option 参数支持配置 "OutputElapsed" 项,配置为 true 时行情数据将包含插件接收数据的时延列 perPenetrationTime。可以通过该列来统计 AMD 插件接收数据的性能,在相同时间区间内,取该列值的均值得到平均时延。
时延的定义:若配置了 transform 参数,则为 AMD 回调函数返回数据到开始 transform 处理前这段时间;否则为 AMD 回调函数返回数据到 append 到共享流表前这段时间。即网络时延和用户自定义 transform 方法耗时没有包含在内。
另外对于 IPC 表,将统计从收到行情数据到 C++ API 收到数据的全过程穿透时间,可以通过统计 C++ 端时间戳 - recivedTime 字段的平均值得到平均穿透时间,并和共享流表 + 网络连接订阅的方式做性能对比。
本教程通过模拟的回放数据来测试性能,数据量级说明如下表:
数据源 | 平均速率(条/s) |
---|---|
snapshot | 5000 |
execution | 50000 |
order | 50000 |
orderExecution | 50000 |
流表的平均时延如下表:
流表名 | 平均时延(μs) |
---|---|
snapshot | 175 |
execution | 475 |
order | 320 |
orderExecution1 | 253 |
orderExecution2 | 263 |
通过IPC表订阅和通过网络订阅的平均穿透时间如下表:
API 订阅方式 | 平均穿透时间(μs) |
---|---|
IPC 表订阅 | 681 |
网络订阅 | 19866 |
可见AMD插件接收数据的平均时延与通过IPC表订阅的平均穿透时间在百微秒级别,性能非常优秀,且通过 IPC 表订阅比通过网络传输订阅时延明显更低。
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )