Расширяемость
Благодаря тому, что очередь сообщений отделяет процесс обработки от процесса отправки, легко увеличить частоту отправки и обработки сообщений. Для этого не нужно изменять код или настраивать параметры. Расширение функциональности похоже на простое нажатие кнопки включения.
Приложение продолжает работать даже при резком увеличении трафика, но постоянное использование ресурсов для обработки таких всплесков трафика было бы расточительством. Использование очереди сообщений позволяет ключевым компонентам выдерживать внезапные скачки нагрузки без полного сбоя из-за чрезмерной нагрузки.
Если часть системы выходит из строя, это не влияет на работу всей системы. Очередь сообщений снижает зависимость между процессами, поэтому даже если процесс, обрабатывающий сообщения, даёт сбой, сообщения в очереди могут быть обработаны после восстановления системы.
В большинстве сценариев порядок обработки данных важен. Большинство очередей сообщений упорядочены, и они гарантируют обработку данных в определённом порядке. Kafka обеспечивает упорядоченность сообщений внутри раздела.
Любая важная система имеет элементы, требующие разной обработки. Например, загрузка изображения занимает меньше времени, чем применение фильтра. Очередь сообщений помогает задачам выполняться максимально эффективно — обработка сообщений происходит как можно быстрее. Буфер помогает контролировать и оптимизировать скорость потока данных через систему.
Часто пользователям не требуется немедленная обработка сообщений. Очередь сообщений предоставляет механизм асинхронной обработки, позволяя пользователям помещать сообщения в очередь, не обрабатывая их сразу. Можно отправлять любое количество сообщений, а затем обрабатывать их по мере необходимости.
Kafka-сервер, отвечающий за хранение и пересылку сообщений; один брокер представляет собой один узел Kafka.
Kafka использует концепцию тем, аналогичную таблицам в базах данных. Тема похожа на таблицу, а раздел — на регион. Несколько разделов могут хранить данные одной темы на разных серверах. Разделы представляют собой каталоги на сервере. Основная информация о разделах хранится в файле .log. Подобно базам данных, разделы обеспечивают высокую производительность.
Важно отметить:
Тема может содержать несколько разделов, и каждый раздел хранит часть данных темы. Все данные из всех разделов составляют одну тему. На одном брокере может быть создано несколько разделов, независимо от количества брокеров. Каждый раздел имеет уникальный номер, начиная с 0. Благодаря наличию нескольких разделов Kafka обеспечивает параллельную обработку, что является одним из факторов высокой пропускной способности.
Сообщения в разделе упорядочены, но глобальный порядок не гарантируется.
Раздел — это каталог на сервере, который содержит данные одной или нескольких тем. Данные распределяются по разным серверам или узлам. Разделы похожи на регионы в HBase. Они обеспечивают основу для обработки больших объёмов данных.
Разделы имеют следующие особенности:
Данные в разделах упорядочены. Это одна из причин высокой производительности Kafka. Несколько разделов позволяют использовать несколько потоков для параллельной обработки, что значительно ускоряет работу.
Аналогично таблицам и регионам в базах данных, тема — это логическая концепция, а раздел — физическая единица хранения данных. Эта структура обеспечивает надёжную основу для обработки огромных объёмов информации.
Обратите внимание:
Каждый раздел состоит из нескольких сегментов, которые представляют собой файлы одинакового размера. Чтение и запись данных происходят последовательно. Каждый сегмент имеет расширение .log и содержит наименьший offset в этом сегменте. При поиске сообщения по его offset используется бинарный поиск для определения сегмента, в котором находится сообщение.
Смещение — это позиция сообщения в журнале раздела, которая также служит уникальным идентификатором сообщения. Одновременно смещение является информацией, необходимой для синхронизации между ведущим и ведомым узлами.
Производитель — это компонент Kafka, ответственный за создание сообщений. Сообщения классифицируются по темам и сохраняются в брокере Kafka. Производители отправляют данные в систему обмена сообщениями.
Потребитель — это компонент Kafka, предназначенный для чтения данных из Kafka. Потребители должны принадлежать к определённой группе потребителей. Без указания группы все потребители принадлежат к группе по умолчанию. Каждая группа потребителей имеет уникальный идентификатор, называемый group ID. Все потребители в группе координируют потребление данных из одной или нескольких подписок на темы. Однако каждый раздел может использоваться только одним потребителем из группы. Разные группы могут потреблять данные из одного раздела. Количество разделов определяет максимальное количество одновременных потребителей в каждой группе.
Например, если есть два раздела, то даже если в группе четыре потребителя, два из них будут бездействовать. Если есть четыре раздела, каждый потребитель будет потреблять из отдельного раздела, обеспечивая максимальную параллельность в четыре потребителя.
Каждая группа потребителей может состоять из одного или нескольких потребителей. Если группа не указана, все потребители будут принадлежать к группе по умолчанию. У каждой группы есть уникальный идентификатор — group ID. Потребители в одной группе совместно используют данные из одной подписки на тему. Но каждый раздел может использовать только один потребитель из группы. Различные группы могут использовать данные из одного и того же раздела. Число разделов определяет максимальное число одновременных потребителей для каждой группы.
Пример:
На левом рисунке, если есть только два раздела, даже если в группе четыре потребителя, двое из них останутся без работы. На правом рисунке, где есть четыре раздела, каждый потребитель потребляет из отдельного раздела, достигая максимальной параллельности четырёх потребителей.
Рассмотрим другой пример: разные группы потребителей могут получать данные из одной и той же темы, разделённой на четыре раздела и распределённой между двумя узлами. Слева группа потребителей 1 имеет двух потребителей, каждому из которых приходится потреблять данные из двух разделов, чтобы полностью обработать сообщения. Справа группа потребителей 2 имеет четырёх потребителей, каждый из которых потребляет данные из своего раздела.
Количество разделов должно быть больше или равно количеству потребителей в группе. Рекомендуется, чтобы количество разделов было кратно количеству потребителей.
Таким образом, большее количество разделов позволяет большему числу потребителей одновременно обрабатывать данные, ускоряя процесс потребления.
Резюме:
Группа потребителей состоит из одного или более потребителей, и каждый потребитель в группе обрабатывает сообщения только один раз.
Для каждой темы количество разделов определяет максимальное количество потребителей в группе, которое должно быть меньше или равно количеству разделов.
Например, для темы с четырьмя разделами максимальное количество потребителей должно быть четыре, предпочтительно кратное четырём.
Один раздел — один потребитель.
Вывод: увеличение количества разделов позволяет увеличить количество параллельных потребителей и ускорить обработку данных.
Группа потребителей включает одного или нескольких потребителей, принадлежащих к этой группе. Если группа не определена, потребители относятся к группе по умолчанию. Каждая группа имеет уникальный ID — group ID. Группа потребителей координирует потребление данных из одной или нескольких подписок на темы. Каждый раздел может быть использован только одним потребителем в группе. Разные группы могут использовать один и тот же раздел. Количество разделов определяет максимальное количество параллельных потребителей в каждой группе. Оптимизация и способы её достижения
Благодаря оптимизации мы можем обнаружить, что CPU только дважды переключился между контекстами и трижды скопировал данные. В Linux-системах системный вызов «sendfile()» позволяет напрямую копировать данные из буфера ядра в буфер сокета, минуя копирование в пространство пользователя.
Разделение на разделы и сегменты
Как уже упоминалось, Kafka использует разделение на разделы, каждый из которых соответствует отдельному физическому сегменту. Это обеспечивает быстрый поиск при помощи бинарного поиска. Такой подход не только повышает эффективность запросов при чтении данных, но и предоставляет возможность параллельной работы.
Сжатие данных
Kafka поддерживает различные протоколы сжатия, такие как Gzip и Snappy, для уменьшения объёма передаваемых данных и снижения нагрузки на пропускную способность сети.
Вопрос 2: Как происходит разделение журнала на сегменты?
В Kafka каждый раздел имеет максимальный размер файла журнала в 1 ГБ. Это сделано для удобства загрузки файлов журнала в память для операций. Файлы журнала имеют следующие имена:
Цифры после имени файла, например, 9936472, представляют начальное смещение в этом файле журнала. Это означает, что в данном сегменте раздела было записано около 1 миллиона записей.
У брокера Kafka есть параметр log.segment.bytes, который ограничивает размер каждого сегмента журнала до 1 ГБ. Когда файл журнала достигает своего предела, автоматически создаётся новый сегмент для продолжения записи. Этот процесс называется «log rolling». Активным сегментом журнала называется тот, в который в данный момент записываются данные.
Если вы знакомы с предыдущими статьями о HDFS, то знаете, что NameNode также имеет ограничения на редактирование журналов. Таким образом, эти фреймворки учитывают подобные проблемы.
Вопрос 3: Как устроена сеть в Kafka?
Архитектура сети Kafka и её оптимизация тесно связаны. Именно благодаря этому Kafka может поддерживать высокую степень параллелизма.
Все запросы от клиентов сначала направляются к Acceptor. Внутри брокера работают три потока (по умолчанию), называемые процессорами. Acceptor не обрабатывает запросы, а просто упаковывает их в виде socketChannel и отправляет процессорам. Отправка происходит по очереди, начиная с первого процессора и далее по кругу.
Потребители получают запросы через очередь. Эти запросы содержат данные. Процессоры извлекают ответные данные из response и возвращают их клиентам.
Внутри пула потоков по умолчанию имеется восемь потоков, которые обрабатывают запросы. Если требуется дополнительная оптимизация, можно увеличить количество процессоров и потоков в пуле. Запрос и ответ образуют своего рода буфер, чтобы процессоры могли обрабатывать запросы быстрее, чем они поступают. Это похоже на усиленную версию модели реактора.
Шаг 1: Установите JDK.
Шаг 2: Установите Zookeeper.
Шаг 3: Скачайте Kafka с сайта Apache.
Шаг 4: Установите Kafka.
tar -xzvf kafka_2.12-2.0.0.tgz
Шаг 5: Настройте переменные среды.
export ZK=/usr/local/src/apache-zookeeper-3.7.0-bin
export PATH=$PATH:$ZK/bin
export KAFKA=/usr/local/src/kafka
export PATH=$PATH:$KAFKA/bin
Шаг 6: Запустите Kafka.
nohup kafka-server-start.sh 自己的配置文件路径/server.properties &
Шаг 1: Добавьте зависимость.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
Шаг 2: Настройте файл application.yml.
kafka:
bootstrap:
servers: localhost:9092
topic:
user: topic-user
group:
id: group-user
Шаг 3: Создайте класс KafkaProducer.
/**
* Kafka消息生产类
*/
@Log
@Component
public class KafkaProducer {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.topic.user}")
private String topicUser;// topic名称
/**
* 发送用户消息
* @param user 用户信息
*/
public void sendUserMessage(User user) {
GsonBuilder builder = new GsonBuilder();
builder.setPrettyPrinting();
builder.setDateFormat("yyyy-MM-dd HH:mm:ss");
String message = builder.create().toJson(user);
kafkaTemplate.send(topicUser, message);
log.info("\n生产消息至Kafka\n" + message);
}
}
Шаг 4: Создайте класс KafkaConsumer и вызовите его через контроллер.
public class KafkaConsumerDemo {
@Value("${kafka.topic.user}")
private String topicUser;// topic名称
public void consume() {
Properties props = new Properties();
// 必须设置的属性
props.put("bootstrap.servers", "127.0.0.1:9092");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "group-user");
// 自动提交offset,每1s提交一次(提交后的消息不再消费,避免重复消费问题)
props.put("enable.auto.commit", "true");// 自动提交offset:true【PS:只有当消息提交后,此消息才不会被再次接受到】
``` **Третий шаг: внедрение HBaseTemplate в класс service**
```java
@Service
@Slf4j
public class HBaseService {
@Autowired
private HbaseTemplate hbaseTemplate;
public List<Result> getRowKeyAndColumn(String tableName, String startRowkey, String stopRowkey,
String column, String qualifier) {
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
if (StringUtils.isNotBlank(column)) {
log.debug("{}", column);
filterList.addFilter(new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(column))));
}
if (StringUtils.isNotBlank(qualifier)) {
log.debug("{}", qualifier);
filterList.addFilter(new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(qualifier))));
}
Scan scan = new Scan();
if (filterList.getFilters().size() > 0) {
scan.setFilter(filterList);
}
scan.setStartRow(Bytes.toBytes(startRowkey));
scan.setStopRow(Bytes.toBytes(stopRowkey));
return hbaseTemplate.find(tableName, scan, (rowMapper, rowNum) -> rowMapper);
}
public List<Result> getListRowkeyData(String tableName, List<String> rowKeys, String familyColumn, String column) {
return rowKeys.stream().map(rk -> {
if (StringUtils.isNotBlank(familyColumn)) {
if (StringUtils.isNotBlank(column)) {
return hbaseTemplate.get(tableName, rk, familyColumn, column, (rowMapper, rowNum) -> rowMapper);
} else {
return hbaseTemplate.get(tableName, rk, familyColumn, (rowMapper, rowNum) -> rowMapper);
}
}
return hbaseTemplate.get(tableName, rk, (rowMapper, rowNum) -> rowMapper);
}).collect(Collectors.toList());
}
}
С увеличением объёма данных одна операционная система не может вместить все данные, поэтому необходимо распределять их по большему количеству управляемых дисков. Однако это неудобно для управления и обслуживания, и требуется система для управления файлами на нескольких машинах. Именно эту задачу решает распределённая файловая система. HDFS — это один из видов распределённых файловых систем.
HDFS (Hadoop Distributed File System) — это файловая система, предназначенная для хранения файлов через древовидную структуру каталогов. Кроме того, она является распределённой системой, объединяющей множество серверов для выполнения своих функций. В кластере сервера имеют разные роли.
Сценарии использования: HDFS подходит для сценариев, где данные записываются один раз и считываются многократно, но не поддерживают модификацию файлов. Она хорошо подходит для анализа данных, но не для использования в качестве сетевого диска.
Преимущества и недостатки HDFS
Преимущества:
Недостатки:


Размер блоков файлов в HDFS
В HDFS файлы физически хранятся в виде блоков (Block), размер которых можно настроить с помощью параметра dfs.blocksize. По умолчанию размер блока составляет 128 МБ в версии Hadoop 2.x и 64 МБ в старых версиях.
Если время поиска целевого блока составляет 100 мс, то время поиска составляет 100 мс. Соотношение времени поиска и времени передачи должно составлять 100:1 для оптимального состояния, поэтому время передачи составляет 1 мс. Текущая скорость передачи жёсткого диска составляет примерно 100 МБ/с, что приблизительно равно 128 МБ.
Операции клиента
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.URI;
public class HdfsClient {
FileSystem fileSystem = null;
@Before
public void init() {
try {
fileSystem = FileSystem.get(URI.create("hdfs://hadoop102:9000"), new Configuration(), "djm");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* Загрузка файла
*/
@Test
public void put() {
try {
fileSystem.copyFromLocalFile(new Path("C:\\Users\\Administrator\\Desktop\\Hadoop 入门.md"), new Path("/"));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* Скачивание файла
*/
@Test
public void download() {
try {
// useRawLocalFileSystem указывает, следует ли включать проверку файлов
fileSystem.copyToLocalFile(false, new Path("/Hadoop 入门.md"),
new Path("C:\\Users\\Administrator\\Desktop\\Hadoop 入门1.md"), true);
} catch (IOException e) {}
}
}
``` ```
e.printStackTrace();
}
/**
* 删除文件
*/
@Test
public void delete() {
try {
// recursive表示是否递归删除
fileSystem.delete(new Path("/Hadoop 入门.md"), true);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 文件重命名
*/
@Test
public void rename() {
try {
fileSystem.rename(new Path("/tmp"), new Path("/temp"));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 查看文件信息
*/
@Test
public void ls() {
try {
RemoteIterator<locatedfilestatus> listFiles = fileSystem.listFiles(new Path("/etc"), true);
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
if (fileStatus.isFile()) {
// 仅输出文件信息
System.out.print(fileStatus.getPath().getName() + " " +
fileStatus.getLen() + " " + fileStatus.getPermission() + " " + fileStatus.getGroup() + " ");
// 获取文件块信息
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
for (BlockLocation blockLocation : blockLocations) {
// 获取节点信息
String[] hosts = blockLocation.getHosts();
for (String host : hosts) {
System.out.print(host + " ");
}
}
System.out.println();
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
@After
public void exit() {
try {
fileSystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
``` **Данные анализа**
**Apache Hive**
На рисунке представлена архитектура Apache Hive.
**Apache Storm**
В Apache Storm необходимо сначала разработать структуру для вычислений в реальном времени, которую мы называем топологией. Затем эта топологическая структура отправляется в кластер, где главный узел отвечает за распределение кода рабочим узлам, которые выполняют код. В топологической структуре есть два типа ролей: spout и bolt. Данные передаются между spouts, которые отправляют данные потока в виде кортежей; болты отвечают за преобразование данных потока.
**Apache Spark**
Spark Streaming — это расширение основного API Spark, которое не обрабатывает один поток данных за раз, как Storm. Вместо этого он разбивает поток данных на сегменты перед обработкой. Мы называем абстракцию непрерывного потока данных DStream (дискретизированный поток). DStream — это RDD (распределённый набор данных) с небольшими пакетами, который можно преобразовать с помощью произвольных функций и скользящих оконных вычислений для выполнения параллельных операций.
**Apache Flink**
Apache Flink — это фреймворк и распределённый механизм обработки, предназначенный для вычислений с отслеживанием состояния в потоках данных без ограничений и с ограничениями. Flink разработан для работы во всех распространённых кластерных средах с высокой скоростью в памяти и способностью выполнять вычисления любого масштаба.
Flink представляет собой вычислительную среду для потоков данных и пакетной обработки. Потоки данных рассматриваются как особый случай пакетной обработки с низкой задержкой (в миллисекундах) и гарантированной доставкой сообщений без потери или дублирования.
Flink творчески объединяет потоковую и пакетную обработку, рассматривая входные потоки данных как неограниченные, а пакетную обработку — как особый вид потоковой обработки с ограниченными входными данными. Программы Flink состоят из двух основных блоков: Stream и Transformation, где Stream представляет промежуточный результат данных, а Transformation — операцию, которая выполняет вычисления над одним или несколькими входными потоками данных и выдаёт один или несколько выходных потоков.
*Обработка неограниченных и ограниченных данных*
Данные могут обрабатываться как неограниченный поток или ограниченный поток:
— Неограниченные потоки имеют начало, но не имеют определённого конца. Они никогда не заканчиваются и постоянно предоставляют данные. Неограниченные потоки должны обрабатываться непрерывно, то есть события должны обрабатываться сразу после их поступления. Невозможно дождаться, пока все входные данные поступят, поскольку входные данные неограниченны и никогда не завершатся. Обработка неограниченных данных обычно требует упорядоченного приёма событий, чтобы можно было сделать выводы о целостности результатов.
— Ограниченные потоки имеют определённое начало и конец. Перед выполнением любых вычислений ограниченные данные можно собрать, обработав их. Обработка ограниченных потоков не требует упорядоченного сбора, так как ограниченные наборы данных всегда можно отсортировать. Обработка ограниченных данных также называется пакетной обработкой.
Apache Flink хорошо справляется с обработкой неограниченных и ограниченных наборов данных. Контроль над временем и состоянием позволяет среде выполнения Flink работать с любым типом приложений на неограниченных потоках. Ограниченные данные обрабатываются специализированными алгоритмами и структурами данных, предназначенными для фиксированных размеров данных, что приводит к выдающимся результатам производительности.
*Слоистый API*
Flink предоставляет три уровня API. Каждый уровень обеспечивает баланс между краткостью и выразительностью и подходит для разных сценариев использования.
*Сценарии использования*
Apache Fink является лучшим выбором для разработки и запуска широкого спектра различных типов приложений благодаря своим богатым возможностям. Возможности Flink включают поддержку потоковой и пакетной обработки, сложное управление состоянием, семантику обработки событий и обеспечение согласованности состояния. Кроме того, Flink можно развернуть на различных платформах ресурсов, таких как YARN, Apache Mesos и Kubernetes, или как независимый кластер на голом железе. Настроенный для обеспечения высокой доступности, Flink не имеет единой точки отказа. Flink доказал свою способность масштабироваться до тысяч ядер и терабайт данных приложений, обеспечивая высокую пропускную способность и низкую задержку, а также поддерживая некоторые из самых требовательных приложений потоковой обработки в мире.
Вот наиболее распространённые типы приложений, поддерживаемых Flink:
1. Приложения, управляемые событиями.
2. Приложения для анализа данных.
3. Приложения для конвейеров данных.
*Приложения, управляемые событиями*
Приложения, управляемые событиями, представляют собой приложения с отслеживанием состояния, которые получают события из одного или нескольких потоков событий, реагируют на входящие события путём запуска вычислений, обновления состояния или внешних действий.
Приложения, управляемые событиями, основаны на приложениях с отслеживанием состояния для потоковой обработки. В этой архитектуре данные и вычисления объединены, что позволяет осуществлять локальный доступ к данным (в памяти или на диске). Проверки могут быть периодически записаны в удалённое постоянное хранилище для обеспечения отказоустойчивости. Более того, совместное использование базы данных несколькими приложениями является обычным явлением в многоуровневых архитектурах. Поэтому любые изменения в базе данных требуют координации, поскольку каждое приложение, управляемое событиями, отвечает за свои собственные данные, что снижает необходимость в координации для изменений представления данных или расширения приложения.
Для приложений, управляемых событиями, отличительной особенностью Flink является savepoint. Savepoint — это согласованное состояние, которое можно использовать в качестве отправной точки для совместимых приложений. С заданной точкой сохранения можно обновлять или масштабировать приложение или запускать несколько версий приложения для A/B-тестирования.
Типичные приложения, управляемые событиями:
— Обнаружение мошенничества.
— Обнаружение аномалий.
— Уведомления на основе правил.
— Мониторинг бизнес-процессов.
— Веб-приложения (социальные сети).
*Приложения для анализа данных*
Традиционно анализ выполнялся в виде пакетных запросов или приложений к ограниченному набору данных записанных событий. Чтобы объединить последние данные с результатами анализа, необходимо добавить их в аналитический набор данных, а затем повторно запустить запрос или приложение, результаты которого записываются в систему хранения или публикуются в виде отчётов.
Благодаря сложной потоковой обработке движок анализ также может выполняться в режиме реального времени. Потоковые запросы или приложения не считывают ограниченный набор данных, а вместо этого получают поток событий в реальном времени и генерируют и обновляют результаты при использовании событий. Результаты либо записываются во внешнюю базу данных, либо сохраняются как внутреннее состояние. Приложения панели мониторинга могут считывать последние результаты из внешней базы данных или напрямую запрашивать внутреннее состояние приложения.
Apache Flink поддерживает потоковые и пакетные аналитические приложения:
Типичными приложениями для анализа данных являются:
— Мониторинг качества телекоммуникационных сетей.
— Анализ обновлений продуктов и оценка мобильных приложений.
— Специальный анализ больших объёмов данных в потребительских технологиях.
— Крупномасштабный графический анализ.
*Конвейерные приложения*
Извлечение-преобразование-загрузка (ETL) — это распространённый метод преобразования и перемещения данных между системами хранения. Обычно ETL-задания запускаются периодически для копирования данных из транзакционных систем баз данных в аналитические базы данных или хранилища данных.
Функции конвейера аналогичны ETL-задачам. Они преобразуют и обогащают данные и могут перемещать данные из одной системы хранения в другую. Однако они работают в непрерывном потоковом режиме, а не запускаются периодически. Таким образом, они могут считывать записи из источников, которые постоянно производят данные, и передавать их с низкими задержками в место назначения. Например, конвейер может отслеживать появление новых файлов в каталоге файловой системы и записывать данные в журнал событий. Другое приложение может материализовать поток событий в базу данных или постепенно создавать и улучшать поисковый индекс.
Разница между периодическими ETL-заданиями и непрерывными конвейерами данных заключается в следующем:
Преимущество непрерывных конвейеров данных по сравнению с периодическими ETL-заданиями заключается в сокращении времени ожидания для перемещения данных в место назначения. Кроме того, конвейеры данных более универсальны и применимы к большему количеству сценариев, поскольку они способны непрерывно потреблять и производить данные.
Типичными конвейерными приложениями являются:
— Создание и поддержание поисковых индексов в реальном времени для электронной коммерции.
— Непрерывное ETL для электронной коммерции. **Товарная система реального времени**
Система основана на Flink и предлагает рекомендации товаров в реальном времени. Flink используется для анализа данных о товарах, их популярности и истории просмотров пользователями. Данные хранятся в Redis, HBase и используются для формирования рекомендаций.
**Архитектура системы**
В системе есть несколько модулей:
* **Модуль сбора данных**: собирает данные о действиях пользователей с товарами (просмотр, покупка, добавление в корзину и т. д.) и сохраняет их в HBase.
* **Обработка данных**: включает в себя несколько задач Flink, которые обрабатывают данные и формируют рекомендации. Задачи включают в себя:
* Сбор истории просмотров товаров пользователями и использование этих данных для рекомендаций на основе совместной фильтрации.
* Анализ интересов пользователей к товарам и формирование рекомендаций на основе контекста.
* Расчёт изображений пользователей на основе их предпочтений по цвету, стране происхождения и стилю.
* Запись изображений товаров на основе возраста и пола покупателей.
* Формирование списка популярных товаров на основе текущей популярности и сохранение его в Redis.
* Импорт данных из Kafka в HBase для сохранения полной информации о действиях пользователей.
* **Механизм рекомендаций**: использует данные из модулей обработки данных для формирования рекомендаций товаров. Рекомендации формируются на основе популярности товаров, изображений товаров и пользователей, а также совместной фильтрации.
**Рекомендации на основе популярности**
Рекомендации формируются на основе текущего списка популярных товаров. Список формируется с использованием механизма окон Flink. Затем список фильтруется на основе характеристик пользователя, таких как возраст, пол и интересы. Для каждого товара в списке рекомендуются несколько похожих товаров.
**Рекомендации на основе изображений товаров**
Для формирования рекомендаций используются изображения товаров и характеристики пользователей. Изображения товаров содержат информацию о возрасте покупателей, поле и предпочтениях. Характеристики пользователей включают их предпочтения по цвету, стране и стилю. На основе этих данных рассчитываются оценки сходства между товарами и формируются рекомендации.
**Рекомендации на основе совместной фильтрации**
Этот метод основан на анализе истории покупок и действий пользователей с товарами. На основе этой информации рассчитываются оценки схожести между товарами, и формируются рекомендации.
**Интерфейс рекомендаций**
Интерфейс рекомендаций состоит из трёх колонок:
* Популярные товары.
* Рекомендации на основе совместной фильтрации.
* Рекомендации на основе изображений. **TypeExtractor.createTypeInfo(UserBehavior.class);**
Поскольку Java отражение извлекает поля в неопределённом порядке, необходимо явно указать порядок полей в файле.
String[] fieldOrder = new String[]{"userId", "itemId", "categoryId", "behavior", "timestamp"};
**Create PojoCsvInputFormat**
PojoCsvInputFormat<UserBehavior> csvInput = new PojoCsvInputFormat<>(filePath, pojoType, fieldOrder);
Далее мы используем PojoCsvInputFormat для создания источника ввода.
DataStream<UserBehavior> dataSource = env.createInput(csvInput, pojoType);
Это создаёт DataStream типа UserBehavior.
#### EventTime и Watermark
Когда мы говорим «подсчитать количество кликов за последний час», что означает «час» во Flink? Это может быть либо ProcessingTime, либо EventTime, в зависимости от пользователя.
- **ProcessingTime**: время обработки события. То есть время, определяемое машиной.
- **EventTime**: время возникновения события. Обычно это время, которое несёт сама информация.
В этом примере нам нужно подсчитать количество кликов на основе бизнес-времени, поэтому нам нужно сделать две вещи:
1. Скажите Flink, что мы хотим обрабатывать данные в режиме EventTime. По умолчанию Flink использует ProcessingTime, поэтому мы должны явно установить его.
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
2. Укажите, как получить бизнес-время и создать Watermark. Watermark используется для отслеживания событий бизнеса и может рассматриваться как часы в мире EventTime. Поскольку наши исходные данные уже были организованы и не имеют беспорядка, то есть временные метки событий монотонно увеличиваются, мы можем использовать временную метку каждой записи в качестве Watermark. Здесь мы используем AscendingTimestampExtractor для извлечения временных меток и создания Watermark.
Обратите внимание: в реальных бизнес-сценариях обычно существует беспорядок, поэтому обычно используется BoundedOutOfOrdernessTimestampExtractor.
DataStream<UserBehavior> timedData = dataSource
.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<UserBehavior>() {
@Override
public long extractAscendingTimestamp(UserBehavior userBehavior) {
// Исходные данные представлены в секундах, преобразуем их в миллисекунды
return userBehavior.timestamp * 1000;
}
});
Таким образом, мы получаем поток данных с временными метками, после чего можем выполнять некоторые операции с окнами.
#### Фильтрация кликов
Прежде чем начать работу с окнами, давайте вспомним требования: «каждые 5 минут выводить количество кликов за последний час для первых N товаров». Поскольку в исходных данных есть данные о различных действиях, таких как клики, покупки, покупки и сбор, но нам нужно только подсчитать клики, мы сначала используем FilterFunction для фильтрации данных о кликах.
DataStream<UserBehavior> pvData = timedData
.filter(new FilterFunction<UserBehavior>() {
@Override
public boolean filter(UserBehavior userBehavior) throws Exception {
// Фильтруем только данные о кликах
return userBehavior.behavior.equals("pv");
}
});
#### Подсчёт количества кликов с помощью окон
Поскольку мы хотим подсчитывать количество кликов каждые 5 минут в течение последнего часа, размер окна составляет один час, а окно перемещается каждые 5 минут. То есть мы хотим подсчитать количество кликов для каждого товара в следующих окнах: [09:00, 10:00), [09:05, 10:05), [09:10, 10:10) и так далее. Это распространённый запрос скользящего окна (Sliding Window).
DataStream<ItemViewCount> windowedData = pvData
// Группировка товаров
.keyBy("itemId")
// Скользящее окно для каждого товара (окно 1 час, перемещение каждые 5 минут)
.timeWindow(Time.minutes(60), Time.minutes(5))
// Выполняем инкрементную агрегацию, которая может использовать AggregateFunction для предварительной агрегации данных и уменьшения нагрузки на состояние
.aggregate(new CountAgg(), new WindowResultFunction());
CountAgg
Здесь CountAgg реализует интерфейс AggregateFunction, который подсчитывает количество записей в окне, то есть увеличивается на единицу при появлении новой записи.
/**
* COUNT статистическая функция агрегирования, каждый раз, когда появляется запись, она увеличивается на 1
**/
public static class CountAgg implements AggregateFunction<UserBehavior, Long, Long> {
@Override
public Long createAccumulator() {
return 0L;
}
@Override
public Long add(UserBehavior userBehavior, Long acc) {
return acc + 1;
}
@Override
public Long getResult(Long acc) {
return acc;
}
@Override
public Long merge(Long acc1, Long acc2) {
return acc1 + acc2;
}
}
WindowFunction
.aggregate(AggregateFunction af, WindowFunction wf) Второй параметр WindowFunction выводит результат агрегации каждого ключа в каждом окне с другой информацией. Здесь WindowResultFunction упаковывает основной ключ товара ID, окно и количество кликов в ItemViewCount для вывода.
/**
* Используется для вывода результатов окна
**/
public static class WindowResultFunction implements WindowFunction<Long, ItemViewCount, Tuple, TimeWindow> {
@Override
public void apply(
Tuple key, // Окно основного ключа, т. е. itemId
TimeWindow window, // окно
Iterable<Long> aggregateResult, // Результат функции агрегации, т.е. значение count
Collector<ItemViewCount> collector // Тип вывода — ItemViewCount
) throws Exception {
Long itemId = ((Tuple1<Long>) key).f0;
Long count = aggregateResult.iterator().next();
collector.collect(ItemViewCount.of(itemId, window.getEnd(), count));
}
}
/**
* Количество кликов на товар (тип вывода операции окна)
**/
public static class ItemViewCount {
public long itemId; // Идентификатор товара
public
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )