Logo Море(!) аналитической информации!
IT-консалтинг Software Engineering Программирование СУБД Безопасность Internet Сети Операционные системы Hardware
Обучение от Mail.Ru Group.
Онлайн-университет
для программистов с
гарантией трудоустройства.
Набор открыт!
2010 г.

MapReduce: внутри, снаружи или сбоку от параллельных СУБД?

Сергей Кузнецов

Назад Содержание Вперёд

2. MapReduce: модель и реализации

Программная модель MapReduce была придумана несколько лет тому назад в компании Google [24], и там же была выполнена первая реализация этой модели на основе распределенной файловой системы той же компании GFS (Google File System) [32]. Эта реализация активно используется в программных продуктах самой Google, но является сугубо проприетарной и недоступна для использования вне Google.

Альтернативная, свободно доступная реализация Hadoop MapReduce [33] (с открытыми исходными текстами) была выполнена в проекте Hadoop [34] сообщества Apache [35]. Она основана на использовании распределенной файловой системы HDFS (Hadoop Distributed File System) [36], также разработанной в проекте Hadoop. Реальную популярность MapReduce принесла именно реализация Hadoop в силу своей доступности и открытости, а широкое использование Hadoop MapReduce в различных исследовательских и исследовательских проектах приносит несомненную пользу этой системе, стимулируя разработчиков к ее постоянному совершенствованию.

Однако реализация Hadoop MapReduce полностью основана на спецификациях Google, и поэтому каноническим описанием технологии была и остается статья [24]. Заметим, что при этом в документации Hadoop MapReduce [37] используется терминология, несколько отличная от [24]. В этом разделе из уважения к первенству Google я буду использовать термины из [24], а в следующих разделах там, где будет иметься конкретно реализация Hadoop MapReduce, будет использоваться терминология Hadoop (это не должно привести к путанице).

2.1. Общая модель программирования MapReduce
В этой модели вычисления производятся над множествами входных пар "ключ-значение", и в результате каждого вычисления также производится некоторое множество результирующих пар "ключ-значение". Для представления вычислений в среде MapReduce используются две основные функции: Map и Reduce. Обе функции явно кодируются разрабочиками приложений в среде MapReduce.

Функция Map в цикле обрабатывает каждую пару из множества входных пар и производит множество промежуточных пар "ключ-значение". Среда MapReduce групирует все промежуточные значения с одним и тем же ключом I и передает их функции Reduce.

Функция Reduce получает значение ключа I и множество значений, связанных с этим ключом. В типичных ситуациях каждая группа обрабатывается (в цикле) таким образом, что в результате одного вызова функции образуется не более одного результирующего значения.

2.2. Реализация в распределенной среде
Реализации MapReduce от Google и Hadoop ориентированы на использование в кластерной распределенной среде со следующими основными характеристиками:
  • узлы среды выполнения MR-приложений обычно представляют собой компьютеры общего назначения с операционной системой Linux;

  • используется стандартное сетевое оборудование с адаптерами, расчитанными на скорости передачи в 100 мегабит в секунду или 1 гигабит в секунду, но средняя пропускная способность существенно ниже;

  • кластер состоит из сотен или тысяч машин, так что вполне вероятны отказы отдельных узлов;

  • для хранения данных используются недорогие дисковые устройства, подключенные напрямую к отдельным машинам;

  • для управления данными, хранящимися на этих дисках, используется распределенная файловая система;

  • пользователи представляют свои задания в систему планирования; каждое задание состоит из некоторого набора задач, которые отображаются планировщиком на некоторый набор узлов кластера.

2.2.1. Выполнение MR-приложения
Вызовы Map распределяются по нескольким узлам кластера путем разделения входных данных на M непересекающихся групп (split). Входные группы могут параллельно обрабатываться на разных машинах. Вызовы Reduce распределяются путем разделения пространства ключей промежуточных ключей на R частей с использованием некоторой функции разделения (например, функции хэширования). Число разделов R и функция разделения задаются пользователем.

Выполнение MR-программы происходит следующим образом. Сначала среда MapReduce расщепляет входной файл на M частей, размер которых может задаваться пользователем. Затем сразу в нескольких узлах кластера запускается основная программа MapReduce. Один из экземпляров этой программы играет специальную роль и называется распорядителем (master). Остальные экземпляры являются исполнителями (worker), которым распорядитель назначает работу. Распорядитель должен назначить исполнителям для выполнения M задач Map и R задач Reduce.

Исполнитель, которому назначена задача Map, читает содержимое соответствующей группы, разбирает пары "ключ-значение" входных данных и передает каждую пару в определенную пользователем функцию Map. Промежуточные пары "ключ-значение", производимые функцией Map, буферизуются в основной памяти. Периодически буферизованные пары, разделяемые на R областей на основе функции разделения, записываются в локальную дисковую память исполнителя. Координаты этих сохраненных на диске буферизованных пар отсылаются распорядителю, который, в свою очередь, передает эти координаты исполнителям задачи Reduce. I-ый Reduce-исполнитель снабжается координатами всех i-ых областей буферизованных пар, произведенных всеми M Map-исполнителями.

После получения этих координат от распорядителя исполнитель задачи Reduce с использованием механизма удаленных вызовов процедур переписывает данные с локальных дисков исполнителей задачи Map в свою память или на локальный диск (в зависимости от объема данных). После переписи всех промежуточных данных выполняется их сортировка по значениям промежуточного ключа для образования групп с одинаковым значением ключа. Если объем промежуточных данных слишком велик для выполнения сортировки в основной памяти, используется внешняя сортировка.

Далее Reduce-исполнитель организует цикл по отсортированным промежуточным данным и для каждого уникального значения ключа вызывает пользовательскую функцию Reduce с передачей ей в качестве аргумента значения ключа и соответствущее множество значений. Результирующие пары функции Reduce добавляются в окончательный результирующий файл данного Reduce-исполнителя. После завершения всех задач Map и Reduce распорядитель активизирует программу пользователя, вызывавшую MapReduce.

После успешного завершения выполнения задания MapReduce результаты размещаются в R файлах распределенной файловой системы (имена этих результирующих файлов задаются пользователем). Обычно не требуется объединять их в один файл, потому что часто полученные файлы используются в качестве входных для запуска следующего MR-задания или в каком-либо другом распределенном приложении, которое может получать входные данные из нескольких файлов.

2.2.2. Отказоустойчивость
Поскольку технология MapReduce предназначена для обработки громадных объемов данных с использованием сотен и тысяч машин, в ней обязательна должна присутствовать устойчивость к отказам отдельных машин.

Отказ исполнителя

Распорядитель периодически посылает каждому исполнителю контрольные сообщения. Если некоторый исполнитель не отвечает на такое сообщение в течение некоторого установленного времени, распорядитель считает его вышедшим из строя. В этом случае все задачи Map, уже выполненные и еще выполнявшиеся этим исполнителем, переводятся в свое исходное состояние, и можно заново планировать их выполнение другими исполнителями. Аналогично распорядитель поступает со всеми задачами Reduce, выполнявшимися отказавшим исполнителем к моменту отказа.

Завершившиеся задачи Map выполняются повторно по той причине, что их результирующие пары сохранялись на локальном диске отказавшего исполнителя и поэтому недоступны в других узлах. Завершившиеся задачи Reduce повторно выполнять не требуется, поскольку их результирующие пары сохраняются в глобальной распределенной файловой системе. Если некоторая задача Map выполнялась исполнителем A, а потом выполняется исполнителем B, то об этом факте оповещаются все исполнители, выполняющие задачи Reduce. Любая задача Reduce, которая не успела прочитать данные, произведенные исполнителем A, после этого будет читать данные от исполнителя B.

Отказ распорядителя

В реализациях MapReduce от Google и Hadoop какая-либо репликация распорядителя не производится. Считается, что поскольку распорядитель выполняется только в одном узле кластера, его отказ маловероятен, и если он случается, то аварийно завершается все выполнение MapReduce. Однако в [24] отмечается, что несложно организовать периодический сброс в распределенную файловую систему всего состояния распорядителя, чтобы в случае отказа можно было запустить его новый экземпляр в другом узле с данной контрольной точки.

Семантика при наличии отказов

Если обеспечиваемые пользователями функции Map и Reduce являются детерминированными (т.е. всегда выдают одни и те же результаты при одинаковых входных данных), то при их выполнении в среде распределенной реализации MapReduce при любых условиях обеспечивает тот же результат, как при последовательном выполнении всей программы при отсутствии каких-либо сбоев.

Это свойство обеспечивается за счет атомарности фиксации результатов задач Map и Reduce. Каждая выполняемая задача записывает свои результаты в частные временные файлы. Задача Reduce производит один такой файл, а задача MapR файлов, по одной на каждую задачу Reduce. По завершении задачи Map исполнитель посылает распорядителю сообщение, в котором указываются имена R временных файлов. При получении такого сообщения распорядитель запоминает эти имена файлов в своих структурах данных. Повторные сообщения о завершении одной и той же задачи Map игнорируются.

При завершении задачи Reduce ее исполнитель атомарным образом переименовывает временный файл результатов в окончательный файл. Если одна и та же задача Reduce выполняется несколькими исполнителями, то для одного и того же окончательного файла будет выполнено несколько операций переименования. Если в используемой распределенной файловой системе операция переименования является атомарной, то в результате в файловой системе соохранятся результаты только какого-либо одного выполнения задачи Reduce.

2.2.3. Резервные задачи
Чаще всего к увеличению общего времени выполнения задания MapReduce приводит наличие "отстающих" ("straggler") – узлов кластера, в которых выполнение одной из последних задач Map или Reduce занимает необычно долгое время (например, из-за некритичной неисправности дискового устройства).

Для смягчения проблемы "остающих" в MapReduce применяется следующий общий механизм. Когда задание близится к завершению, для всех еще не завершившихся задач назначаются дополнительные, резервные исполнители. Задача считается выполненной, когда завершается ее первичное или резервное выполнение. Этот мезанизм настраивается таким образом, чтобы потребление вычислительных ресурсов возрастало не более чем на несколько процентов. В результате удается существенно сократить время выполнения крупных MR-заданий.

2.3. Расширенные средства
В большинстве приложений MapReduce оказывается достаточно просто написать свои функции Map и Reduce. Однако в ряде случаев оказываются полезными некоторые расширения базовых функциональных возможностей.
2.3.1. Функция разделения
Пользователи MapReduce явно указывают требуемое число задач Reduce R (и соответствующих результирующих файлов). Данные распределяются между этими задачами с использованием некоторой функции разделения от значений промежуточного ключа. По умолчанию используется функция хэширования (например, "hash(key) mod R"). Однако пользователи MapReduce могут специфицировать и собственные функции разделения.
2.3.2. Гарантии упорядоченности
Внутри каждого раздела, передаваемого задаче Reduce, данные упорядочены по возрастанию значений промежуточного ключа. Это позволяет задачам Reduce производить отсортированные результирующие файлы.
2.3.3. Функция-комбинатор
В некоторых случаях в результатах задачи Map содержится значительное число повторящихся значений промежуточного ключа, а определенная пользователем задача Reduce является коммутативной и ассоциативной. В таких случаях пользователь может определить дополнительную функцию-комбинатор (Combiner), выполняющую частичную агрегацию таких данных до их передачи по сети. Функция Combiner выполняется на той же машине, что и задача Map. Обычно для ее реализации используется тот же самый код, что и для реализации функции Reduce. Елинственное различие между функциями Combiner и Reduce состоит в способе работы с их результирующими данными. Результаты функции Reduce записываются в окончательный файл результатов. Результаты же функции Combiner помещаются в промежуточные файлы, которые впоследствии пересылаются в задачи Reduce.
2.3.4. Форматы входных и результирующих данных
В библиотеке MapReduce поддерживается возможность чтения входных данных в нескольких разных форматах. Например, в режиме "text" каждая строка трактуется как пара "ключ-значение", где ключ – это смещение до данной строки от начала файла, а значение – содержимое строки. В другом распространенном формате входные данные представляются в виде пар "ключ-значение", отсортированных по значениям ключа. В каждой реализации формата входных данных известно, каким образом следует расшеплять данные на осмысленные части, которые обрабатываются отдельными задачами Map (например, данные формата "text" расщепляются только по границами строк).

Пользователи могут добавить к реализации собственные форматы входных данных, обеспечив новую реализацию интерфейса reader (в реализации Hadoop – RecordReader). Reader не обязательно должен читать данные из файла, можно легко определить reader, читающий данные из базы данных или из некоторой структуры в виртуальной памяти.

Аналогичным образом, поддерживаются возможности генерации данных в разных форматах, и имеется простая возможность определения новых форматов результирующих данных.

Думаю, что для общего ознакомления с технологией MapReduce и для понимания следующих разделов статьи этой информации достаточно. Кроме того, она позволяет понять, какие особенности модели и реализаций MapReduce обеспечивают масштабируемость технологии до десятков тысяч узлов, ее отказоустойчивость, дешевизну загрузки данных и возможность использования явно написанного кода, который хорошо распараллеливается.

Назад Содержание Вперёд

Новости мира IT:

Архив новостей

Последние комментарии:

Вышло обновление Firefox 57.0.1 (1)
Среда 06.12, 09:14

IT-консалтинг Software Engineering Программирование СУБД Безопасность Internet Сети Операционные системы Hardware

Информация для рекламодателей PR-акции, размещение рекламы — adv@citforum.ru,
тел. +7 985 1945361
Пресс-релизы — pr@citforum.ru
Обратная связь
Информация для авторов
Rambler's Top100 TopList liveinternet.ru: показано число просмотров за 24 часа, посетителей за 24 часа и за сегодня This Web server launched on February 24, 1997
Copyright © 1997-2000 CIT, © 2001-2015 CIT Forum
Внимание! Любой из материалов, опубликованных на этом сервере, не может быть воспроизведен в какой бы то ни было форме и какими бы то ни было средствами без письменного разрешения владельцев авторских прав. Подробнее...