1 В избранное 0 Ответвления 0

OSCHINA-MIRROR/wxzz-CSharpFlink

Присоединиться к Gitlife
Откройте для себя и примите участие в публичных проектах с открытым исходным кодом с участием более 10 миллионов разработчиков. Приватные репозитории также полностью бесплатны :)
Присоединиться бесплатно
Клонировать/Скачать
Внести вклад в разработку кода
Синхронизировать код
Отмена
Подсказка: Поскольку Git не поддерживает пустые директории, создание директории приведёт к созданию пустого файла .keep.
Loading...
README.md

CSharpFlink: вычислительная среда в реальном времени

Официальный сайт: http://www.ineuos.net/. Технический блог: https://www.cnblogs.com/lsjwq/. Адрес GitHub: https://github.com/wxzz/CSharpFlink. Адрес Gitee: https://gitee.com/wxzz/CSharpFlink. QQ автора: 504547114. Техническая группа QQ: 54256083.

1. Проект

Мы имеем общенациональную промышленную публичную облачную платформу, которая передаёт данные через выделенные линии или каналы 4G в режиме реального времени. Платформа обрабатывает около 1 миллиарда записей данных каждый день и предоставляет пользователям онлайн-сервисы в реальном времени и услуги анализа данных в автономном режиме. Платформа стабильно работает уже почти 3 года. Также мы предоставляем услуги по созданию частных облаков для промышленных предприятий.

Мы планируем использовать Flink в качестве части вычислительной среды бэкэнда нашей облачной платформы. Мы хотим реализовать основные функции, такие как агрегация точек данных, вычисление выражений и т. д., а также обеспечить поддержку машинного обучения и сложных пользовательских алгоритмов.

После почти года исследований и разработок мы уже реализовали основные функции, такие как агрегирование и логические операции. Однако мы считаем, что Flink довольно тяжёлый, и его использование и обслуживание требуют высокого уровня квалификации.

Исходя из этого, мы самостоятельно разработали набор компонентов для вычислений в реальном времени CSharpFlink, который поддерживает настраиваемые источники данных, вычисления и хранение.

2. Применение

Основные области применения включают в себя:

  • Агрегацию точек данных в пределах временных окон в реальном времени, таких как максимальное значение, минимальное значение, среднее значение, сумма, медиана, дисперсия, стандартное отклонение и т.д., с возможностью настройки вторичной разработки.
  • Повторное вычисление данных после обновления или дополнения в течение определённого периода времени в прошлом.
  • Вычисление выражений с использованием настраиваемых скриптов на C#, которые могут использоваться для оповещения в реальном времени или обработки данных.
  • Распределённое развёртывание с основной и подчинённой структурой, где основная нода отвечает за распределение задач, а подчинённые ноды выполняют задачи и сохраняют результаты.

3. Особенности фреймворка

Особенности фреймворка включают:

  • Использование новейшей версии NET 5.0 для обеспечения кроссплатформенности.
  • Возможность повторного вычисления данных после обновлений или дополнений в течение заданного периода времени в прошлом, например, для данных за последние 5 секунд.
  • Поддержка вычислений выражений в реальном времени по расписанию или при изменении значений данных для выполнения вычислений.
  • Вторичная разработка на языке C# для интеграции с различными источниками данных, создания собственных операторов и выбора способов хранения данных.
  • Возможность одиночного или распределённого развёртывания.

4. Структура фреймворка

Базовая структура компонентов фреймворка представлена на схеме ниже: CSharpFlink框架图示意图

5. Каталог кода

Разработка проекта осуществляется с помощью Visual Studio 2019. Решение проекта представлено файлом CSharpFlink.sln. Каталог кода включает следующие компоненты:

  • Cache — управление локальным кешем задач для основной и подчинённых нод.
  • Calculate — операции ввода, процесса и вывода задач, а также их управление.
  • Channel — операции IO для распределённой структуры основной и подчинённых нод.
  • Common — общая библиотека классов.
  • Config — глобальные операции с конфигурационными файлами.
  • Execution — среда выполнения всего проекта.
  • Expression — операции вычисления выражений.
  • Log — операции и управление журналами.
  • Model — метаданные точек данных.
  • Node — управление основными и подчинёнными нодами.
  • Protocol — протокол взаимодействия между основными и подчинёнными нодами в распределённой структуре.
  • Sink — интерфейс для сохранения результатов вычислений.
  • Source — интерфейсы для подключения к различным источникам данных, таким как MQTT, Kafka, RabbitMQ и базы данных.
  • Task — интерфейс задач оконных и табличных вычислений, управление задачами на основных и подчинённых нодах.
  • Window — операции оконных задач.
  • Worker — интерфейс подчинённых нод.

6. Конфигурационные файлы

Конфигурационный файл по умолчанию называется cfg\global.cfg. Можно создать собственный конфигурационный файл, следуя инструкциям командной строки. Описание конфигурационного файла включает:

  • MaxDegreeOfParallelism — степень параллелизма задач, от которой зависит генерация задач основной нодой и выполнение задач подчинёнными нодами.
  • MasterListenPort — порт прослушивания основной ноды для соединения подчинённых нод.
  • MasterIp — IP-адрес основной ноды, используемый подчинёнными нодами для соединения.
  • NodeType — режим работы ноды, включая Master, Slave и Both.
  • RemoteInvokeInterval — интервал времени для удалённого вызова подчинённых нод, измеряется в миллисекундах.
  • RepeatRemoteInvokeInterval — интервал времени после неудачной попытки вызова подчинённой ноды, измеряется в миллисекундах.
  • SlaveExcuteCalculateInterval — интервал времени выполнения вычислений подчинёнными нодами, измеряется в миллисекундах.
  • MaxFrameLength — максимальный размер данных при передаче между основной и подчинёнными нодами, измеряется в байтах.
  • WorkerPower — коэффициент мощности подчинённых нод; если он больше 1, то ноды будут отправлять несколько задач одновременно.

7. Развёртывание задач

Для вторичной разработки обратитесь к разделу «Вторичная разработка». После успешного тестирования программы, созданной в рамках проекта, её можно развернуть в каталоге «tasks». Для запуска «CSharpFlink» основной программы автоматически загрузятся и вызовутся все задачи.

Можно настроить собственные задачи, следуя инструкциям командной строки.

8. Инструкции командной строки

Чтобы запустить «CSharpFlink», можно указать конфигурационный файл или список задач с помощью командной строки. Команды включают: -h — показать помощь по командной строке. -c — загрузить указанный конфигурационный файл. Например: CSharpFlink -c c:/my.cfg -t — загрузить список задач. Например: CSharpFlink -t c:/mytask.dll Пример использования:

dotnet CSharpFlink.dll -c c:/master.cfg -t c:/mytask.dll

9. Развёртывание

В каталоге «release» находится скомпилированная программа. Её можно скопировать в разные папки, изменить конфигурационный файл «cfg\global.cfg» для каждой ноды на Master или Slave, а затем запустить «dotnet CSharpFline.dll» в каждой папке.

Исходный код «TestTask.dll» можно найти в разделе «Вторичная разработка».

10. Вторичная разработка

Вторичная разработка фокусируется на источниках данных, процессах вычислений и сохранении результатов. Процесс включает:

  • Подключение к источникам данных, таким как mqtt, kafka, rabbitmq и базы данных, путём наследования интерфейса SourceFunction. Пример: RandomSourceFunction.cs.
  • Создание процессов вычислений, таких как обработка и преобразование данных, путём наследования интерфейса Calculate.Calculate. Пример: Avg.cs для агрегации и ExpressionCalculate.cs для вычислений выражений. Используйте AddWindowTask или AddExpressionTask для создания экземпляров. Хранение результатов вычислений данных: возможность настройки хранения на любом носителе

Для этого необходимо реализовать интерфейс SinkFunction. Подробнее см.: класс SinkFunction.cs.

Пример применения для вычисления 10 000 данных в секунду на одном компьютере (демонстрация случая)

  • Обычное вычисление на персональном компьютере с 10 000 данными в секунду, процессор: 4 ядра I5-7400 2,7 ГГц, память: 16 ГБ. Случайное окно времени для точек данных и вычислительных операторов. Использование центрального процессора и памяти на главном узле: 15–35%, 1500 МБ–2048 МБ. Использование центрального процессора и памяти рабочими узлами: 0,1–2,5%, 18 МБ–30 МБ. На этом компьютере развёрнут один главный узел и 10 вычислительных узлов. Главный узел генерирует задачу вычисления для 10 000 точек данных, каждая точка данных создаёт новый результат каждую секунду. Максимальное значение, минимальное значение, среднее значение или сумма значений в окне вычислений.*

Структура развёртывания:

Развёртывание CSharpFlink

Отображение процесса:

Процесс CSharpFlink

Эффект выполнения:

Демонстрация случая CSharpFlink

iNeuOS промышленный интернет-общедоступный номер:

iNeuOS промышленный интернет-общедоступный номер

Комментарии ( 0 )

Вы можете оставить комментарий после Вход в систему

Введение

вычислительная среда в реальном времени Развернуть Свернуть
C#
Apache-2.0
Отмена

Обновления

Пока нет обновлений

Участники

все

Недавние действия

Загрузить больше
Больше нет результатов для загрузки
1
https://api.gitlife.ru/oschina-mirror/wxzz-CSharpFlink.git
git@api.gitlife.ru:oschina-mirror/wxzz-CSharpFlink.git
oschina-mirror
wxzz-CSharpFlink
wxzz-CSharpFlink
main