2010 г.
Интеграция Hadoop и параллельной СУБД
Ю Ксу, Пекка Костамаа, Лайк Гао
Перевод: Сергей Кузнецов
Назад Содержание Вперёд
3. Выборка данных EDW в программах MapReduce
В этом разделе мы обсуждаем подход TeradataInputFormat, позволяющий программам MapReduce напрямую читать данные Teradata EDW через драйверы JDBC без потребности в каких-либо внешних шагах экспортирования (из Teradata EDW) и загрузки данных в Hadoop. Прямолинейный подход, обеспечивающий программам MapReduce доступ к реляционным данным, состоит в том, что сначала используется утилита СУБД для экспорта результатов требуемых SQL-запросов в локальный файл, а затем этот локальный файл загружается в Hadoop (или результаты запросов используются в потоковом стиле без потребности в промежуточном файле). Однако программисты MapReduce часто считают более удобным и продуктивным прямой доступ к реляционным данным из своих программ MapReduce без привлечения внешних шагов экспортирования данных из СУБД (для чего требуется знание скриптового языка экпорта данных) и их загрузки в Hadoop.
Признавая потребность интеграции реляционных данных в программах Hadoop MapReduce, компания-стартап Cloudera [6], которая специализируется на коммерциализации продуктов и сервисов, связанных с Hadoop, обеспечивает несколько Java-классов (в основном, DBInputFormat [7]), входящих теперь в основной дстрибутив Hadoop и позволяющих программам MapReduce посылать SQL-запросы через стандартный интерфейс JDBC для параллельного доступа реляционных данных. Поскольку наш подход TeradataInputFormat инспирирован подходом DBInputFormat (но не основывается на нем), мы сначала кратко опишем, как работает подход DBInputFormat, а затем обсудим подход TeradataInputFormat.
3.1. DBInputFormat
Основная идея состоит в том, что программист MapReduce через класс DBInputFormat представляет SQL-запрос. Последующее выполнение производится реализацией DBInputFormat и является прозрачным для программиста MapReduce. Класс DBInputFormat ассоциирует некоторый модифицированный SQL-запрос с каждым Mapper'ом, запущенным Hadoop. Затем каждый Mapper посылает запрос в СУБД через стандартный драйвер JDBC, получает в ответ часть результатов запроса и в параллель с другими Mapper'ами обрабатывает результаты. Подход DBInputFormat является корректным, поскольку объединение всех запросов, посылаемых всеми Mapper'ами, эквивалентно исходному SQL-запросу.
В подходе DBInputFormat обеспечиваются два интерфейса для обеспечения прямого доступа программам MapReduce к данным из СУБД. Мы посмотрели на исходный код реализации подхода DBInputFormat. Основная реализация является одной и той же для обоих интерфейсов. Эту реализацию можно резюмировать следующим образом. В первом интерфейсе программа MapReduce обеспечивает имя пользователя, пароль и URL СУБД, а также имя таблицы T, список P имен столбцов, которые следует выбрать, необязательные фильтрующие условия C и список имен столбцов O для использования в разделе ORDER BY
. Реализация DBInputFormat сначала генерирует запрос "SELECT count(*) from T where C"
и посылает его в СУБД для получения числа строк (R) в таблице T. Во время выполнения реализация DBInputFormat знает число Mapper'ов (M), запущенных Hadoop (это число либо обеспечивается пользователем в командной строке, либо берется из конфигурационного файла Hadoop) и ассоциирует с каждым Mapper'ом следующий запрос Q. Каждый Mapper подключается к СУБД, посылает Q через JDBC-подключение и получает результаты.
SELECT P FROM T WHERE C ORDER BY O
LIMIT L
OFFSET X (Q)
При получении запроса Q СУБД реально выполняет запрос SELECT P FROM T WHERE C ORDER BY O
, но возвращаются только L строк результата со смещением X. M запросов, посылаемых в СУБД M Mapper'ами, являются почти идентичными, за исключением того, что значения L и X в них различны. Для i-го Mapper'а (где 1 ≤ i ≤ M - 1), который не является последним Mapper'ом, L = ⌊R / M⌋, и X = (i - 1) × ⌊R / M⌋. Для последнего Mapper'а
L = R - (M - 1) × ⌊R / M⌋, и X = (M - 1) × ⌊R / M⌋.
Во втором интерфейсе класса DBInputFormat программа MapReduce может предоставить произвольный SQL-запрос SQ на выборку данных, результаты которого являются входными данными для Mapper'ов. В этом случае программа MapReduce должна предоставить и запрос со счетчиком (count query) QC, который должен возвращать целочисленное значение, являющееся числом строк в результате запроса SQ. Класс DBInputFormat посылает в СУБД запрос QC, чтобы получить число строк (R), а дальнейшая обработка – та же самая, что и в первом интерфейсе.
Хотя понятно, что подход DBInputFormat, обеспечиваемый компанией Claudera, упрощает процесс доступа к реляционным данным, он не обеспечивает должного роста
производительности при увеличении числа Mapper'ов. С подходом DBInputFormat связано несколько проблем производительности. В обоих интерфейсах каждый Mapper для получения своего поднабора реляционных данных посылает в СУБД, по существу, один и тот же запрос, только с разными значениями в разделах LIMIT
и OFFSET
. Требуются и указываются программой MapReduce столбцы упорядочивания, которое используется для корректного разделения результатов запроса между всеми Mapper'ами, даже если самой программе MapReduce не нужны отсортированные входные данные. За счет этого достигается параллельность обработки реляционных данных Mapper'ами. СУБД приходится выполнять столько запросов, сколько Mapper'ов имеется в системе Hadoop, и, конечно, это не эффективно, особенно, если число Mapper'ов велико.
Отмеченные проблемы производительности особенно серьезны для параллельной СУБД, в которой, как правило, имеются много одновременно выполняемых запросов и крупные наборы данных. Кроме того, требуемое упорядочивание/сортировка – это дорогостоящая операция в параллельных СУБД, поскольку строки таблицы не сохраняются в каком-либо одном узле, и для сортировки требуется перераспределение строк по узлам.
3.2. TeradataInputFormat
Основная идея нашего подхода заключается в том, что коннектор Teradata для Hadoop (TeradataInputFormat) посылает в Teradata EDW SQL-запрос
Q, обеспечиваемый программой MapReduce, только один раз, и результаты сохраняются в некоторой
PPI-таблице
T (PPI –
Partitioned Primary Index, разделенный первичный индекс). После этого каждый Mapper системы Hadoop посылает новый запрос
Qi, в котором всего лишь в каждом AMP запрашивается
i-ый раздел.
Обсудим теперь нашу реализацию более подробно. Прежде всего, класс TeradataInputFormat посылает в Teradata EDW следующий запрос P, основанный на запросе Q, который предоставляется программой MapReduce.
CREATE TABLE T AS (Q) WITH DATA
PRIMARY INDEX ( c1 )
PARTITION BY (c2 MOD M) + 1 (P)
В этом запросе требуется, чтобы в Teradata EDW был выполнен запрос Q, и чтобы результаты были сохранены в новой PPI-таблице T. Хэш-значение столбца первичного индекса c1 каждой строки результата запроса определяет, в каком AMP должна храниться эта строка. После этого значение выражения, указанного в разделе Partition By
, определяет физический раздел (местоположение) каждой строки в конкретном AMP. В одном AMP все строки с одним и тем же значением выражения Partition By
физически хранятся совместно и могут быть напрямую и эффективно найдены Teradata EDW. Мы опустим детали того, каким образом мы автоматически выбираем столбец первичного индекса и выражение Partition By
. После выполнения запроса Q и создания таблицы T в каждом AMP имеется M разделов с номерами от 1 до M (M – число Mapper'ов, запущенных в Hadoop). В качестве одного из дополнительных вариантов мы думаем разрешить опытным программистам самим задавать выражение Partition By
через интерфейс TeradataInputFormat, чтобы получить более тонкое программное управление над тем, как следует разделять результаты запросов (конечно, это возможно, только если программистам хорошо известна демография данных).
Затем каждый Mapper посылает в Teradata EDW следующий запрос Qi (1 ≤ i ≤ M):
SELECT * FROM T WHERE PARTITION = i (Qi)
Teradata EDW напрямую параллельно определит местоположение всех строк i-го раздела каждого AMP и вернет эти строки i-му Mapper'у. Эта операция выполняется параллельно для всех Mapper'ов. После того как все Mapper'ы получат свои данные, таблица T удаляется.
Заметим, что если в исходном SQL-запросе всего лишь выбираются данные из базовой таблицы, которая является PPI-таблицей, то мы не создаем еще одну PPI-таблицу (T), поскольку можем непосредственно использовать существующие разделы для разделения данных, которые должен получить каждый Mapper.
В настоящее время у PPI-таблицы в Teradata EDW должен иметься столбец первичного индекса. Поэтому при вычислении запроса P системе Teradata EDW требуется разделить результаты запроса между всеми AMP в соответствии со значениями столбца первичного индекса. Одной из возможных в будущем оптимизаций является параллельное построение разделов результатов запроса в каждом AMP без перемещения результатов SQL-запроса Q между AMP. Еще одна возможная оптимизация состоит в том, что для построения M разделов нам в действительности не требуется сортировать строки в каком-либо AMP на основе значений выражения Partition By
. Для наших целей мы может использовать здесь "номера псевдоразделов": первой 1/M-части строк результата запроса в любом AMP можно назначить номер раздела 1, ..., последней 1/M-части строк результата запроса в любом AMP можно назначить номер раздела M.
Заметим, что данные, выбираемые программой MapReduce через интерфейс TeradataInputFormat, не сохраняются в Hadoop после завершения программы MapReduce (если только их не сохранит сама программа MapReduce). Поэтому, если какие-то данные Teradata EDW часто используются многими программами MapReduce, более эффективно будет скопировать эти данные и материализовать их в Hadoop в виде файлов Hadoop DFS.
В зависимости от числа Mapper'ов, сложности SQL-запроса, предоставляемого программой MapReduce, и объема данных, затрагиваемых этим SQL-запросом, производительность подхода TeradataInputFormat, очевидным образом, может на порядки величин превышать производительность подхода DBInputFormat, что подтверждается предварительными результатами тестирования.
Подход TeradataInputFormat, описанный в этом подразделе, можно назвать подходом, основанным на горизонтальном разделении, в том смысле, что каждый Mapper выбирает часть результатов запроса из каждого AMP (узла). В настоящее время мы исследуем подход, основанный на вертикальном разделении, когда несколько Mapper'ов выбирают данные только из одного AMP при M > A (M – число Mapper'ов, запущенных Hadoop, и A – число AMP в Teradata EDW), или когда каждый Mapper выбирает данные из некоторого подмножества AMP при M < A, или когда каждый Mapper выбирает данные из одного и только одного AMP при M = A. Для реализации подхода, основанного на вертикальном разделении, в текущем варианте Teradata EDW требуется больше изменений, чем для реализации подхода, основанного на горизонтальном разделении. Мы предполагаем, что производительность любого из этих подходов не всегда будет превосходить производительность другого подхода.
4. Доступ к данным Hadoop из SQL с использованием табличной UDF
В этом разделе мы опишем, как можно обеспечить прямой доступ к данным Hadoop через SQL-запросы и использовать эти данные совместно с реляционными данными Teradata EDW для выполнения интегрированного анализа данных. Мы обеспечиваем табличную
UDF (User Defined Function – функцию, определяемую пользователями), называемую
HDFSUDF, которая "вытягивает" данные из Hadoop в Teradata EDW. Например, в следующем SQL-запросе вызывается
HDFSUDF для загрузки данных из файла Hadoop с именем
mydfsfile.txt
в таблицу
Tab1
в Teradata EDW:
INSERT INTO Tab1
SELECT * FROM TABLE(HDFSUDF (‘mydfsfile.txt’)) AS T1;
Заметим, что после создания табличной UDF HDFSUDF и предоставления ее пользователям она вызывается подобно любой другой UDF. Для пользователей этой табличной UDF несущественно, каким образом данные перемещаются из Hadoop в Teradata EDW. Обычно табличная UDF HDFSUDF пишется таким образом, чтобы при ее вызове из SQL-запроса она выполнялась в каждом AMP. Однако ее можно написать и таким образом, чтобы при вызове из SQL-запроса она выполнялась в каком-либо одном AMP или в какой-либо группе AMP. Каждый экземпляр HDFSUDF, выполняемый в некотором AMP, отвечает за извлечение некоторой части файла Hadoop. Табличная функция HDFSUDF может также производить фильтрацию и преобразование данных по мере того, как эта функция доставляет строки в процессор SQL. Примерный код HDFSUDF и другие подробности доступны на Web-сайте Teradata Developer Exchange [1]. Когда в некотором AMP запускается экземпляр UDF, этот экземпляр связывается с NameNode в Hadoop, который заведует метаданными относительно mydfsfile.txt
. Метаданные Hadoop NameNode включают информацию о том, какие блоки файла Hadoop сохраняются, и в каких узлах они реплицируются. В нашем примере каждый экземпляр UDF обращается к NameNode и обнаруживает общий размер S файла mydfsfile.txt
. Затем табличная UDF запрашивает у Teradata EDW номер своего собственного AMP и общее число AMP. На основе этих фактов каждый экземпляр UDF вычисляет смещение в файле mydfsfile.txt
, от которого он начнет читать данные из Hadoop.
Для любого запроса от экземпляров UDF к системе Hadoop NameNode устанавливает, какие DataNode в Hadoop отвечают на возврат требуемых данных. Экземпляр табличной UDF, выполяемый на некотором AMP, получает данные непосредственно от тех DataNode, которые сохраняют требуемые блоки данных. Заметим, что никакие данные из файла Hadoop никогда не маршрутизируются через NameNote. Все это делается напрямую от одного узла другому узлу. В нашей примерной реализации [1] мы просто вынуждаем N-ый AMP в системе загружать N-ую порцию файла Hadoop. В зависимости от потребностей приложений можно обеспечить другие типы отображений.
При принятии решения о том, какую часть файла следует загружать каждому AMD с использованием табличной UDF, нужно убедиться, что, в конечном счете, все экземпляры UDF прочитают все байты файла Hadoop, и каждый байт будет прочитан только один раз. Поскольку каждый AMP запрашивает данные из Hadoop, посылая в своем запросе смещение в байтах до позиции файла, с которого должно начаться чтение, нам требуется гаантировать, что последняя строка, прочитанная каждым AMP, является полной, а не частичной строкой (если экземпляры UDF обрабатывают входной файл в режиме "строка за строкой"). В нашей примерной реализации [1] у файла Hadoop, который требуется загрузить, строки имеют фиксированный размер; поэтому мы можем простым образом вычислить начальное и конечное смещение в байтах требуемой порции данных для любого AMP. В зависимости от формата входного файла и потребностей приложений назначению каждому AMP соответствующей порции файла может потребоваться уделять более серьезное внимание.
После загрузки данных Hadoop в Teradata мы можем анализировать набор данных Hadoop точно так же как любые другие данные, сохраняемые в EDW. Более интересно то, что мы можем выполнять интегрированный анализ данных над реляционными данными, хранимыми в Teradata EDW, и внешними данными, исходно сохранявшимися в Hadoop, без потребности в создании новой таблицы и загрузке в нее данных Hadoop. Это демонстрируется в следующем примере. Предположим, что у некоторой телекоммуникационной компании имеется файл Hadoop packets.txt
, в котором сохраняется информация о сетевых пакетах, и строки которого имеют формат <source-id,
dest-id, timestamp>
. Поля source-id
и dest-id
используются для обнаружения спамеров и хакеров. Их значения говорят нам, кто и куда послал запрос. Допустим теперь, что в Teradata EDW имеется таблица watch-list
("список отслеживания"), в которой сохраняется список source-id
, которые отслеживаются и используются для анализа тенденций изменения. В следующем SQL-запросе соединяются файл Hadoop packets.txt
и таблица watch-list
для нахождения списка source-id
в таблице watch-list
, из которых рассылались пакеты в более чем миллион уникальных dest-id
.
SELECT watchlist.source-id,
count(distinct(T.dest-id)) as Total
FROM watchlist, TABLE(HDFSUDF(’packets.txt’)) AS T
WHERE watchlist.source-id=T.source-id
GROUP BY watchlist.source-id
HAVING Total > 1000000
Приведенный пример показывает, что мы можем использовать подход табличной UDF для обеспечения простой возможности выполнения сложного анализа с применением процессора SQL над данными Hadoop и реляционными данными. В настоящее время мы работаем над более развитой версией HDFSUDF [1], позволяющей пользователям SQL объявлять отображение схем между файлами Hadoop и таблицами SQL, а также фильтровать и трансформировать данные с применением высокоуровневых конструкций SQL без потребности в написании кода на языке Java.
5. Родственные работы
MapReduce вызывает огромный интерес как в индустрии, так и в академических кругах. Одно из направлений исследований состоит в повышении мощности или выразительности модели программирования MapReduce. В [19] предлагается добавить к набору примитивов MapReduce новый примитив
MERGE
, чтобы облегчить выполнение соединений в среде MapReduce, поскольку в MapReduce реализация соединений затруднительна. Pig Latin [14, 9] – это новый язык, разработанный Yahoo! в качестве "золотой середины" между декларативным стилем SQL и низкоуровневым процедурным стилем MapReduce. В проекте Facebook Hive [17] – разрабатывается решение с открытыми исходными кодами для построения хранилищ данных на основе Hadoop. В Hive обеспечивается SQL-подобный декларативный язык HiveQL, который компилируется в задания MapReduce, выполняемые в Hadoop.
В то время как [14, 9, 17, 4] помогают интегрировать конструкции декларативных запросов из РСУБД в MapReduce-подобную среду программирования для поддержки автоматической оптимизации запросов, достижения более высокой продуктивности программирования и большей выразительности запросов, другое направление исследований состоит в том, что исследователи и производители программных продуктов управления базами данных пытаются внедрить лучшие черты MapReduce, включая дружественность по отношению к пользователям и отказоустойчивость, в реляционные базы данных. HadoopDB [3] – это гибридная система, целью разработчиков которой является объединение лучших черт Hadoop и РСУБД. Основная идея HadoopDB состоит в соединении нескольких одноузловых систем баз данных (PostgreSQL) с использованием Hadoop в качестве координатора задач и уровня сетевых коммуникаций. Greenplum и Aster Data позволяют пользователям писать функции в стиле MapReduce над данными, хранимыми под управлением их параллельных систем [12].
Родственной работой по отношению к описанному в разд. 3 подходу TeradataInputFormat является реализация VerticaInputFormat компании Vertica [18], обеспечивающая программам MapReduce прямой доступ к реляционным данным, хранимым в параллельной СУБД Vertica (эта реализация также инспирирована DBInputFormat [7], но не основана на реализации данного интерфейса). Однако в реализации Vertica (как и в реализации DBInputFormat) в СУБД посылается столько SQL-запросов (в каждом из которых, как и в подходе DBInputFormat, к SQL-запросу, предоставленному пользователем, добавляется один раздел LIMIT
и один раздел OFFSET
), сколько имеется Mapper'ов в Hadoop, хотя каждый Mapper случайным образом выбирает для подключения узел кластера Vertica.
В нашем подходе TeradataInputFormat каждый Mapper также случайным образом подключается к узлу Teradata EDW. Однако, по нашему опыту, это не приводит к существенному повышению производительности программ MapReduce, поскольку все запросы параллельно выполняются во всех узлах, независимо от того, в какой узел посылался конкретный запрос. Ключевым фактором высокой производительности подхода TeradataInputFormat является то, что специфицируемые пользователями запросы выполняются только один раз, а не столько раз, сколько имеется Mapper'ов, как это происходит в DBInputFormat и VerticaInputFormat.
Дополнительным (не всегда применимым) оптимизационным приемом в VerticaInputFormat является то, что когда пользователь задает параметризованный SQL-запрос типа “SELECT
* FROM T WHERE c=?”
, в VerticaInputFormat поддерживается список значений параметра для разных Mapper'ов, и значения параметра обеспечиваются пользователем во время выполнения. И опять же число SQL-запросов, посылаемых в кластер Vertica, совпадает с числом Mapper'ов.
6. Заключение
Исследования, связанные с MapReduce, продолжают активно развиваться и вызывают интерес как в индустрии, так и в академических кругах. Подход MapReduce особенно интересен для производителей параллельных СУБД, поскольку и в MapReduce, и в РСУБД используются кластеры узлов и масштабируемая технология анализа данных. Крупные заказчики Teradata все чаще сталкиваются с потребностью выполнения интегрированного анализа данных, хранимых и в среде Hadoop, и в Teradata EDW. Мы представили три исследовательские работы, направленные на достижение тесной интеграции Hadoop и Teradata EDW.
Наш подход DirectLoad обеспечивает быструю параллельную загрузку данных Hadoop в Teradata EDW. Наш подход TeradataInputFormat дает программам MapReduce возможность эффективного и прямого параллельного доступа к данным Teradata EDW без потребности во внешних шагах экспортирования и загрузки данных из Teradata EDW в Hadoop. Мы также продемонстрировали, каким образом пользователи SQL могут напрямую обращаться к данным Hadoop и соединять их с данными Teradata EDW с применением определяемых пользователями функций.
Хотя результаты работ, описанных в этой статье, могут удовлетворить потребности большого числа заказчиков Teradata, нуждающихся в совместном использовании данных Hadoop и Teradata EDW в своей среде корпоративного хранилища данных, имеется еще много проблем, над решением которых мы продолжаем работать. Одной из проблем, которые нас более всего интересуют, является возможность переноса большего объема вычислений из Hadoop в Teradata EDW и из Teradata EDW в Hadoop.
Назад Содержание Вперёд