1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/allwefantasy-spark-binlog

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
В этом репозитории не указан файл с открытой лицензией (LICENSE). При использовании обратитесь к конкретному описанию проекта и его зависимостям в коде.
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

Spark Binlog Library

Библиотека для запроса Binlog с использованием Apache Spark Streaming, для Spark SQL, DataFrames и MLSQL.

  1. jianshu: How spark-binlog works.
  2. medium: How spark-binlog works.

Требования

Для этой библиотеки требуется Spark 2.4+ (протестировано). Некоторые более старые версии Spark могут работать, но они официально не поддерживаются.

Связывание

Вы можете связать эту библиотеку в своей программе по следующим координатам:

Scala 2.11

Это последние стабильные версии.

MySQL Binlog:

groupId: tech.mlsql
artifactId: mysql-binlog_2.11
version: 1.0.4

HBase WAL:

groupId: tech.mlsql
artifactId: hbase-wal_2.11
version: 1.0.4

Ограничения

  1. mysql-binlog поддерживает только события вставки/обновления/удаления. Остальные события будут игнорироваться.
  2. hbase-wal поддерживает только события Put/Delete. Остальные события будут игнорироваться.

Использование MySQL Binlog

Пример должен работать с delta-plus.

Код MLSQL:

set streamName="binlog";

load binlog.`` where 
host="127.0.0.1"
and port="3306"
and userName="xxxxx"
and password="xxxxx"
and databaseNamePattern="mlsql_console"
and tableNamePattern="script_file"
as table1;

save append table1  
as rate.`mysql_{db}.{table}` 
options mode="Append"
and idCols="id"
and duration="5"
and syncType="binlog"
and checkpointLocation="/tmp/cpl-binlog2";

Код DataFrame:

val spark = SparkSession.builder()
      .master("local[*]")
      .appName("Binlog2DeltaTest")
      .getOrCreate()

val df = spark.readStream.
  format("org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource").
  option("host","127.0.0.1").
  option("port","3306").
  option("userName","root").
  option("password","123456").
  option("databaseNamePattern","test").
  option("tableNamePattern","mlsql_binlog").
  load()

val query = df.writeStream.
  format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").
  option("__path__","/tmp/datahouse/{db}/{table}").
  option("path","{db}/{table}").
  option("mode","Append").
  option("idCols","id").
  option("duration","3").
  option("syncType","binlog").
  option("checkpointLocation", "/tmp/cpl-binlog2").
  outputMode("append")
  .trigger(Trigger.ProcessingTime("3 seconds"))
  .start()

query.awaitTermination()

Перед запуском потокового приложения убедитесь, что вы полностью синхронизировали таблицу.

Код MLSQL:

connect jdbc where
 url="jdbc:mysql://127.0.0.1:3306/mlsql_console?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false"
 and driver="com.mysql.jdbc.Driver"
 and user="xxxxx"
 and password="xxxx"
 as db_cool;
 
load jdbc.`db_cool.script_file`  as script_file;

run script_file as TableRepartition.`` where partitionNum="2" and partitionType="range" and partitionCols="id"
as rep_script_file;

save overwrite rep_script_file as delta.`mysql_mlsql_console.script_file`;

load delta.`mysql_mlsql_console.script_file`  as output;

Код DataFrame:

import org.apache.spark.sql.SparkSession
val spark = SparkSession.builder()
  .master("local[*]")
  .appName("wow")
  .getOrCreate()

val mysqlConf = Map(
  "url" -> "jdbc:mysql://localhost:3306/mlsql_console?characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&tinyInt1isBit=false",
  "driver" -> "com.mysql.jdbc.Driver",
  "user" -> "xxxxx",
  "password" -> "xxxx",
  "dbtable" -> "script_file"
)

import org.apache.spark.sql.functions.col
var df = spark.read.format("jdbc").options(mysqlConf).load()
df = df.repartitionByRange(2, col("id") )
df.write
  .format("org.apache.spark.sql.delta.sources.MLSQLDeltaDataSource").
  mode("overwrite").
  save("/tmp/datahouse/mlsql_console/script_file")
spark.close()

Использование HBase WAL

Код DataFrame:

val spark = SparkSession.builder()
      .master("local[*]")
      .appName("HBase WAL Sync")
      .getOrCreate()

    val df = spark.readStream. ```
format("org.apache.spark.sql.mlsql.sources.hbase.MLSQLHBaseWALDataSource").
      option("walLogPath", "/Users/allwefantasy/Softwares/hbase-2.1.8/WALs").
      option("oldWALLogPath", "/Users/allwefantasy/Softwares/hbase-2.1.8/oldWALs").
      option("startTime", "1").
      option("databaseNamePattern", "test").
      option("tableNamePattern", "mlsql_binlog").
      load()

    val query = df.writeStream.
      format("console").
      option("mode", "Append").
      option("truncate", "false").
      option("numRows", "100000").
      option("checkpointLocation", "/tmp/cpl-binlog25").
      outputMode("append")
      .trigger(Trigger.ProcessingTime("10 seconds"))
      .start()

    query.awaitTermination()

RoadMap

We hope we can support more DBs including traditional DB e.g Oracle and NoSQL e.g. HBase(WAL),ES,Cassandra in future.

How to get the initial offset

You can mannually set binlog offset, For example:

bingLogNamePrefix="mysql-bin"
binlogIndex="4"
binlogFileOffset="4"

Try using command like following to get the offset you want:

mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File             | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin.000014 | 34913156 |              |                  |                   |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.04 sec)

In this example, we knows that:

bingLogNamePrefix      binlogFileOffset   binlogFileOffset
mysql-bin        .     000014             34913156

this means you should configure parameters like this:

bingLogNamePrefix="mysql-bin"
binlogIndex="14"
binlogFileOffset="34913156"

Or you can use mysqlbinlog command.

mysqlbinlog \ 
--start-datetime="2019-06-19 01:00:00" \ 
--stop-datetime="2019-06-20 23:00:00" \ 
--base64-output=decode-rows \
-vv  master-bin.000004

Questions

Q1

People may meet some log like following:

Trying to restore lost connectioin to .....
Connected to ....

Please check the server_id is configured in my.cnf of your MySQL Server.

Q2

When you have started your stream to consume the binlog, but it seem nothong happen or just print :

Batch: N
-------------------------------------------
+-----+
|value|
+-----+
+-----+

Please check spark log:

20/06/18 11:57:00 INFO MicroBatchExecution: Streaming query made progress: {
  "id" : "e999af90-8d0a-48e2-b9fc-fcf1e140f622",
  "runId" : "547ce891-468a-43c5-bb62-614b38f60c39",
  "name" : null,
  "timestamp" : "2020-06-18T03:57:00.002Z",
  "batchId" : 1,
  "numInputRows" : 1,
  "inputRowsPerSecond" : 0.4458314757021846,
  "processedRowsPerSecond" : 2.9673590504451037,
  "durationMs" : {
    "addBatch" : 207,
    "getBatch" : 3,
    "getOffset" : 15,
    "queryPlanning" : 10,
    "triggerExecution" : 337,
    "walCommit" : 63
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "MLSQLBinLogSource(ExecutorBinlogServer(192.168.111.14,52612),....",
    "startOffset" : 160000000004104,
    "endOffset" : 170000000000154,
    "numInputRows" : 0,
    "inputRowsPerSecond" : 0,
    "processedRowsPerSecond" : 0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSinkProvider@4f82b82f"
  }
}

As we can see, the startOffset/f is changing but the numInputRows is not chagned. Please try a table with a simple schema to make sure the binlog connection works fine.

If the simple schema table works fine, this is may caused by some special sql type. Please address an issue and paste spark log and your target table schema.

You can use code like this to test in your local machine:

package tech.mlsql.test.binlogserver

import java.sql.Timestamp

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.Trigger
import org.scalatest.FunSuite


object Main{
  def main(args: Array[String]): ```
Unit = {
    val spark = SparkSession.builder()
          .master("local[*]")
          .appName("MySQL B Sync")
          .getOrCreate()

        val df = spark.readStream.
          format("org.apache.spark.sql.mlsql.sources.MLSQLBinLogDataSource").
          option("host", "127.0.0.1").
          option("port", "3306").
          option("userName", "xxxx").
          option("password", "xxxx").
          option("databaseNamePattern", "wow").
          option("tableNamePattern", "users").
          option("bingLogNamePrefix", "mysql-bin").
          option("binlogIndex", "16").
          option("binlogFileOffset", "3869").
          option("binlog.field.decode.first_name", "UTF-8").
          load()

        // print the binlog(json format)
        val query = df.writeStream.
              format("console").
              option("mode", "Append").
              option("truncate", "false").
              option("numRows", "100000").
              option("checkpointLocation", "/tmp/cpl-mysql6").
              outputMode("append")
              .trigger(Trigger.ProcessingTime("10 seconds"))
              .start()

        query.awaitTermination()
  }
}

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

Описание недоступно Развернуть Свернуть
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/allwefantasy-spark-binlog.git
git@api.gitlife.ru:oschina-mirror/allwefantasy-spark-binlog.git
oschina-mirror
allwefantasy-spark-binlog
allwefantasy-spark-binlog
master