分布式时序数据库 DolphinDB 具有很好的扩展性。用户可以使用内置的脚本语言或者 C++ 插件来编写自定义函数(UDF)和自定义聚合函数(UDAF)。
本教程主要讲解:
DolphinDB 中使用关键词 def 定义一个函数,关键词 defg 定义一个聚合函数。
下面的例子是 DolphinDB 中几何平均数的实现方法。为了防止数据的溢出,这里使用对数的方法来求几何平均。
defg geometricMean(x){
return x.log().avg().exp()
}
> geometricMean(2 4 8);
4
自定义聚合函数和内置聚合函数一样,除了单独作为函数调用外,也可以用于 SQL 语句。
下例为一个内存表的分组聚合计算:
t = table(1 1 1 2 2 2 as id, 2.0 4.0 8.0 1.0 3.0 9.0 as value)
select geometricMean(value) as gval from t group by id
对于分布式表应用自定义的聚合函数进行 SQL 语句的分组聚合计算时,情况将变得复杂。
当分布式表的分区字段跟 group by 的字段一致时,只需要在每个分区上分别运行该 SQL 语句,然后合并各个分区的运行结果即可。
当分区字段和分组字段不一致时,则需要使用 map reduce 来重新实现聚合函数。
以上文定义 geometricMean 为例,基本思路如下:
map:在每个分区上计算对数均值以及非空数据的个数;
计算这些对数均值的加权平均,并用 exp 还原几何平均。
示例:
// 普通的聚合函数声明
defg geometricMean(x){
return x.log().avg().exp()
}
// 分布式实现
def logAvg(x) : log(x).avg()
defg geometricMeanReduce(logAvgs, counts) : wavg(logAvgs, counts).exp()
mapr geometricMean(x){logAvg(x), count(x) -> geometricMeanReduce}
使用 mapr 关键字将 map reduce 的实现与聚合函数关联。关键词 mapr 之后声明的是自定义聚合函数的 signature。map reduce 部分用一对花括号 {}
括起来。->
之前部分是 map 函数,之后部分是 reduce 函数。
在此例中,有两个 map 函数(logAvg 和 count),多个 map 函数之间用逗号分隔。注意,map 函数的参数必须在前面的聚合函数 signature 中定义过。
注意:把上面的代码粘贴到 DolphinDB 的初始化启动脚本 dolphindb.dos 中,重启 dolphindb server 后就可以在分布式 SQL 中使用这个聚合函数了。
下例用内存分区表来测试这个聚合函数。分区表用日期分区,SQL 按照股票 sym 分组,这样就能够触发聚合函数 geometricMean 的分布式实现了。
t = table(2020.09.01 + 0 0 1 1 3 3 as date, take(`IBM`MSFT, 6) as sym, 1.0 2.0 3.0 4.0 9.0 8.0 as value)
db = database("", RANGE, [2020.09.01, 2020.09.03, 2020.09.10])
stock = db.createPartitionedTable(t, "stock", "date").append!(t)
select geometricMean(value) as gval from stock group by sym
sym gval
---- ----
IBM 3
MSFT 4
结合此例,此处来深入分析一下 DolphinDB 的 map reduce 实现。
cgroup by 的典型应用场景是按时间维度将数据分成几个组,然后累计计算每个组的统计值。具体来说,先计算第一个组的统计值,然后计算前两个组的所有数据的统计值,接着计算前三个组的所有数据统计值,依此类推。若严格根据此定义计算,效率很低。
一个改进的办法是分别计算每个组的一些统计值,然后根据这些统计值计算 running aggregate。这个优化算法,与 map reduce 的思路非常类似。区别是在 map reduce 的场景下,只计算一个聚合值;在 cgroup by 场景下,需要计算多个组的 running aggregate。
继续以 geometricMean 为例,演示聚合函数的 cgroup by 实现。下例增加了自定义函数 geometricMeanRunning,并且扩展了 mapr 语句:
mapr 语句的第二部分 copy, copy->geometricMeanRunning
是 cgroup by 的实现。map reduce 和 cgroup by 的实现用分号分隔。cgroup by 的 map 实现跟 map reduce 完全一致,所以这里直接使用了 copy 函数,也就是将 map 的结果直接传给 geometricMeanRunning。
def geometricMeanRunning(logAvgs, counts) : cumwavg(logAvgs, counts).exp()
defg geometricMeanReduce(logAvgs, counts) : wavg(logAvgs, counts).exp()
mapr geometricMean(x){logAvg(x), count(x) -> geometricMeanReduce; copy, copy->geometricMeanRunning}
注意:上述变化更新到 dolphindb.dos,重启后可以在分布式 cgroup by 语句中调用聚合函数 geometricMean。
t = table(2020.09.01 + 0 0 1 1 3 3 as date, take(`IBM`MSFT, 6) as sym, 1.0 2.0 3.0 4.0 9.0 8.0 as value)
select geometricMean(value) as gval from t cgroup by date order by date
date gval
---------- -----------------
2020.09.01 1.414213562373095
2020.09.02 2.213363839400643
2020.09.04 3.464101615137754
几何平均的实现相对简单。下面我们给出一个更为复杂的聚合函数的完整实现。这个聚合函数用于计算失衡的 t-value,基本形式定义在函数 tval 中。
defg tval(x, w){
wavgX = wavg(x, w)
wavgX2 = wavg(x*x, w)
return wavgX/sqrt(wavgX2 - wavgX.square())
}
def wsum2(x,w): wsum(x*x, w)
defg tvalReduce(wsumX, wsumX2, w){
wavgX = wsumX.sum()\w.sum()
wavgX2 = wsumX2.sum\w.sum()
return wavgX/sqrt(wavgX2 - wavgX.square())
}
def tvalRunning(wsumX, wsumX2, w){
wavgX = wsumX\w
wavgX2 = wsumX2\w
return wavgX/sqrt(wavgX2 - wavgX.square())
}
mapr tval(x, w){wsum(x, w), wsum2(x, w), contextSum(w, x)->tvalReduce; cumsum, cumsum, cumsum->tvalRunning}
分布式的实现,要计算三个部分:x 的加权和 wsum(x, w),x 的平方的加权和 wsum2(x, w),以及权重和 conextSum(w, x)。注意计算权重和时没有使用 sum(w),而使用了 contextSum(w, x)。这是考虑到数据中可能有空值。构建这三个 map 函数后,使用 reduce 函数 tvalReduce 即可完成 tval 的分布式实现。cgroup by 的实现,需要对上面三个 map 函数的结果做一个 cumsum 的变换,然后用 tvalRunning 实现 running aggregate。
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )