HDFS — это наиболее часто используемая распределенная файловая система в Hadoop. Объектом операций с распределенной файловой системой являются данные. В данной статье будут рассмотрены основные концепции и знания о потоках данных в Hadoop, а также рассмотрено, как именно данные передаются между клиентом, HDFS, NameNode и DataNode.### 1. Чтение файла Клиентское приложение открывает файл для чтения, вызывая метод
open()
объектаFileSystem
(шаг 1). В случае HDFS, этот объект является экземпляром распределенной файловой системы (шаг 2).DistributedFileSystem
использует RPC для вызоваnamenode
, чтобы определить местоположение начального блока файла (шаг 3). Для каждого блокаnamenode
возвращает адресdatanode
, где хранится копия этого блока. Далее, этиdatanode
сортируются по расстоянию до клиента (в соответствии с топологией сети кластера). Если клиент сам являетсяdatanode
(например, в задаче MapReduce), и хранит копию соответствующего блока данных, то чтение данных происходит напрямую с локальногоdatanode
.
Рисунок 2
Класс DistributedFileSystem
возвращает объект FSDataInputStream
(входной поток, поддерживающий позиционирование в файле) клиенту для чтения данных. Класс FSDataInputStream
, в свою очередь, обертывает объект DFSInputStream
, который управляет вводом-выводом с datanode
и namenode
.
Затем клиент вызывает метод read()
для этого входного потока (шаг 3). DFSInputStream
связывается с ближайшим datanode
, адрес которого хранится в DFSInputStream
для первых нескольких блоков файла. Повторное вызов метода read()
позволяет передать данные от datanode
клиенту (шаг 4). Когда конец блока достигнут, DFSInputStream
закрывает соединение с этим datanode
и ищет следующий оптимальный datanode
(шаг 5).Клиенту требуется только чтение последовательного потока, и это происходит для него прозрачно.
Когда клиент читает данные из потока, блоки читаются в том же порядке, в котором они были открыты в DFSInputStream и datanode. Он также может запросить у namenode адрес следующего блока данных, если это необходимо. Как только клиент завершает чтение, он вызывает метод close() для FSDataInputStream (шаг 6).
При чтении данных, если DFSInputStream сталкивается с ошибкой при коммуникации с datanode, он пытается прочитать данные с другого ближайшего datanode для этого блока. Он также помнит этот неисправный datanode, чтобы избежать повторного чтения последующих блоков на этом узле. DFSInputStream также проверяет целостность данных, полученных от datanode, с помощью контрольной суммы. Если обнаружены поврежденные блоки, он уведомляет namenode перед попыткой чтения его копии с другого datanode.
Основная идея этого дизайна заключается в том, что namenode сообщает клиенту оптимальный datanode для каждого блока, и позволяет клиенту напрямую подключаться к этому datanode для получения данных. Поскольку поток данных распределен по всем datanode в кластере, это позволяет HDFS масштабироваться до большого количества параллельных клиентов.В то же время, namenode требуется только отвечать на запросы о местоположении блоков (эта информация хранится в оперативной памяти, поэтому она очень эффективна), а не на запросы данных, что позволяет избежать развития namenode как точки узкого места по мере увеличения количества клиентов.
Клиент создает новый файл, вызывая функцию create() у объекта DistributedFileSystem (шаг 1 на рисунке 3). Объект DistributedFileSystem создает RPC-запрос к namenode для создания нового файла в пространстве имен файловой системы, при этом соответствующие блоки данных еще не созданы (шаг 2). Namenode выполняет различные проверки, чтобы убедиться, что файл еще не существует и у клиента есть права на создание этого файла. Если все проверки пройдены, namenode регистрирует запись о создании нового файла; в противном случае создание файла завершается ошибкой, и клиент получает исключение IOException. Объект DistributedFileSystem возвращает клиенту объект FSDataOutputStream, который позволяет начать запись данных. Как и при чтении, объект FSDataOutputStream также упаковывает объект DFSOutputStream, который отвечает за обработку коммуникации между datanode и namenode.
Рисунок 3
При записи данных клиентом (шаг 3), объект DFSOutputStream разбивает данные на пакеты и помещает их в внутреннюю очередь, называемую "очередью данных" (data queue).Объект DataStreamer обрабатывает очередь данных, его задача — запросить у namenode выделение новых блоков для хранения копий данных на основе списка datanode. Эта группа datanode образует трубу — предположим, что количество копий равно 3, поэтому в трубе три узла. Объект DataStreamer потоково передает пакеты данных в первый datanode в трубе, который сохраняет пакет и передает его второму datanode в трубе. Второй datanode сохраняет пакет и передает его третьему (и последнему) datanode в трубе (шаг 4).
Объект DFSOutputStream также поддерживает внутреннюю очередь пакетов данных, ожидающих подтверждения от datanode, называемую "очередью подтверждений" (ack queue). Только после получения подтверждений от всех datanode в трубе пакет удаляется из очереди подтверждений (шаг 5).
Если datanode отказывает во время записи данных, выполняются следующие действия (для клиента, записывающего данные, это прозрачно). Сначала закрывается труба, все пакеты из очереди подтверждений добавляются в начало очереди данных, чтобы гарантировать, что ни один пакет не будет утерян у последнего datanode в трубе. Для текущего блока данных, хранящегося на другом нормальном datanode, назначается новый идентификатор, который передается namenode для удаления части блока данных, хранящегося на неисправном datanode после его восстановления. Неисправный datanode удаляется из трубы, а оставшиеся блоки данных записываются на два других нормальных datanode в трубе.namenode обнаруживает недостаток копий блока и создает новую копию на другом узле. Дальнейшие блоки данных продолжают обрабатываться в нормальном режиме.
Во время записи блока могут одновременно выйти из строя несколько узлов datanode, но это случается крайне редко. Как только будет создано количество копий, равное значению dfs.replication.min (по умолчанию 1), запись считается успешной, и блок будет асинхронно копироваться в кластере до тех пор, пока количество копий не достигнет целевого значения (по умолчанию dfs.replication равно 3).
После завершения записи данных клиентом вызывается метод close() для завершения потока данных. Этот метод записывает все оставшиеся пакеты данных в конвейер и ожидает подтверждения от namenode о завершении записи файла (шаг 7). Namenode уже знает, какие блоки данных составляют файл (через запрос DataStreamer на распределение блоков данных), поэтому ему нужно только дождаться минимального количества копий блока перед возвратом успешного ответа.### 3. Модель согласованности
Модель согласованности файловой системы (coherency model) описывает видимость данных при чтении/записи файлов. HDFS для повышения производительности отказывается от некоторых требований POSIX, поэтому некоторые операции могут вести себя немного иначе, чем ожидалось. После создания нового файла он становится сразу доступен в пространстве имен файловой системы, как показано ниже:
Path path = new Path("p");
Fs.create(p);
assertThat(fs.exists(p), is(true));
Однако содержимое файла не гарантируется к моментальному появлению, даже если данные были обновлены и сохранены. Поэтому длина файла отображается как 0:
Path path = new Path("p");
OutputStream out = fs.create(path);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(path).getLen(), is(0L));
Когда записанные данные превышают размер одного блока, первый блок данных становится видимым для нового читателя. Это правило распространяется и на последующие блоки. В целом, текущий блок данных, который находится в процессе записи, не видим для других читателей. HDFS предоставляет метод для принудительного синхронизации всех кэшей и узлов данных, вызывая метод sync() для FSDataOutputStream. После успешного выполнения sync() метода, HDFS гарантирует, что все данные, записанные до текущего момента, будут доступны для всех новых читателей и переданы во все узлы данных:
Path path = new Path("p");
FSDataOutputStream out = fs.create(path);
out.write("content".getBytes("UTF-8"));
out.flush();
out.hflush();
assertThat(fs.getFileStatus(path).getLen(), is((long)"content".length()));
Эта операция аналогична системному вызову fsync в POSIX, который подтверждает отправку буферизованных данных для файлового дескриптора.
Эта модель согласованности тесно связана с проектированием приложений. Если метод hsync() не вызывается, следует быть готовым к потере блоков данных при сбоях на стороне клиента или системы. Это неприемлемо для многих приложений, поэтому необходимо вызывать метод hsync() в подходящих местах, например, после записи определенного количества записей или байтов.Несмотря на то, что hsync() был спроектирован для минимизации нагрузки на HDFS, он всё равно имеет значительные дополнительные затраты по производительности, поэтому необходимо находить баланс между целостностью данных и пропускной способностью.
Вы можете оставить комментарий после Вход в систему
Неприемлемый контент может быть отображен здесь и не будет показан на странице. Вы можете проверить и изменить его с помощью соответствующей функции редактирования.
Если вы подтверждаете, что содержание не содержит непристойной лексики/перенаправления на рекламу/насилия/вульгарной порнографии/нарушений/пиратства/ложного/незначительного или незаконного контента, связанного с национальными законами и предписаниями, вы можете нажать «Отправить» для подачи апелляции, и мы обработаем ее как можно скорее.
Опубликовать ( 0 )