FastNetMon

Sunday 7 April 2013

Разработка под Hadoop на Python без JRE/Jython


Итак, для начала развлечения нам нужен работающий Hadoop: http://www.stableit.ru/2013/04/apache-hadoop-112-ubuntu-1204-single.html

Отойдем от банальных примеров с подсчетом числа слов - будем считать число доменов у каждого из регистраторов .ru зоны.

Для начала стянем данные необходимые для анализа (список всех доменов зоны .ru)
wget "https://partner.r01.ru/ru_domains.gz" -O "/opt/ru_domains.gz"gunzip  /opt/ru_domains.gz

Так как в /opt/ru_domains содержится несколько болше данных, чем требуется нам, нужно его обработать - привести в вид ключ/значение. А именно, ключом у нас будет имя регистратора конкретного домена, а значением - 1.

Код mapper'а предельно прост:
vim /opt/mapper.py
#!/usr/bin/env python
import sys
for line in sys.stdin:
elements = line.split("\t")
print '%s\t%s' % (elements[1], 1)

Далее дело за resucer'ом, он должен просуммировать значения для идентичных ключей, то есть, вместо двух строк  TEST-REG-RIPN 1 мы получаем строку: TEST-REG-RIPN 2, то есть, мы как раз и выполнили операцию reduce. Само важное, на что нужно обратить внимание - это то, что данны на вход reducer поступают в отсортированном виде (это делает сам Hadoop). Так как Blogger гробит Питоновский код, вот исходник с нормальной разметкой: http://paste.org/63424

vim /opt/reducer.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys
current_word = None
current_count = 0
word = None
for line in sys.stdin:
line = line.strip()
(word, count) = line.split('\t', 1)
count = int(count)
# Так как Hadoop сортирует выдачу вывода mapper.py, то данные пиходят к нам в отсортированном порядке
if current_word == word:
current_count += count
else:
# Если слово поменялось, то выводим результаты предыдущего подсчета
if current_word:
print '%s\t%s' % (current_word, current_count)
current_word = word
current_count = count
# В конце цикла нужно вывести результаты подсчета
if current_word == word:
print '%s\t%s' % (current_word, current_count)

Далее необходимо поставить флаг исполняемости на наши скрипты:

chmod +x /opt/reducer.py
chmod +x /opt/mapper.py

Теперь попробуем запустить обработку (про sort я упомянул заранее):
time cat /opt/ru_domains | /opt/mapper.py | sort | /opt/reducer.py
У меня real было около 12 секунд. Задача оказалась не так сложна, как я предполагал или мой сервер слишком быстр :)

su hdfs

Копируем данные в файловую систему Hadoop:
hadoop dfs -mkdir /users/domains
hadoop dfs -copyFromLocal /opt/ru_domains  /users/domains/ru_domains
hadoop dfs -ls /users/domains


Запускаем Hadoop:
hadoop jar /usr/share/hadoop/contrib/streaming/hadoop-*streaming*.jar -file /opt/mapper.py -mapper /opt/mapper.py -file /opt/reducer.py -reducer /opt/reducer.py -input /users/domains/ru_domains -output /users/domains/ru_domains_registrar_count
Далее мы получим много информации:
3/04/08 10:10:31 INFO util.NativeCodeLoader: Loaded the native-hadoop library
13/04/08 10:10:31 WARN snappy.LoadSnappy: Snappy native library not loaded
13/04/08 10:10:31 INFO mapred.FileInputFormat: Total input paths to process : 1
13/04/08 10:10:31 INFO streaming.StreamJob: getLocalDirs(): [/var/lib/hadoop/mapred]
13/04/08 10:10:31 INFO streaming.StreamJob: Running job: job_201304071544_0001
13/04/08 10:10:31 INFO streaming.StreamJob: To kill this job, run:
13/04/08 10:10:31 INFO streaming.StreamJob: /usr/libexec/../bin/hadoop job  -Dmapred.job.tracker=localhost:9000 -kill job_201304071544_0001
13/04/08 10:10:31 INFO streaming.StreamJob: Tracking URL: http://localhost:50030/jobdetails.jsp?jobid=job_201304071544_0001
13/04/08 10:10:32 INFO streaming.StreamJob:  map 0%  reduce 0%
13/04/08 10:10:43 INFO streaming.StreamJob:  map 40%  reduce 0%
13/04/08 10:10:47 INFO streaming.StreamJob:  map 80%  reduce 0%
13/04/08 10:10:49 INFO streaming.StreamJob:  map 100%  reduce 0%
13/04/08 10:10:50 INFO streaming.StreamJob:  map 100%  reduce 33%
13/04/08 10:10:53 INFO streaming.StreamJob:  map 100%  reduce 75%
13/04/08 10:10:57 INFO streaming.StreamJob:  map 100%  reduce 100%
13/04/08 10:10:58 INFO streaming.StreamJob: Job complete: job_201304071544_0001
13/04/08 10:10:58 INFO streaming.StreamJob: Output: /users/domains/ru_domains_registrar_count
Теперь идем смотерть результаты работы mad reduce:
dfs -ls /users/domains/ru_domains_registrar_count
Found 2 items
-rw-------   3 hdfs supergroup          0 2013-04-08 10:28 /users/domains/ru_domains_registrar_count/_SUCCESS
-rw-------   3 hdfs supergroup        603 2013-04-08 10:28 /users/domains/ru_domains_registrar_count/part-00000

Вот как раз файл part-0000 нам и нужен!

hadoop dfs -cat /users/domains/ru_domains_registrar_count/part-00000
Вот такая веселая выдача у нас поулчилась:
101DOMAIN-REG-RIPN 4569
AGAVA-REG-RIPN 26120
BEELINE-REG-RIPN 5924
CC-REG-RIPN 13
CENTRALREG-REG-RIPN 5826
CT-REG-RIPN 1
DEMOS-REG-RIPN 1778
DOMENUS-REG-RIPN 30705
ELVIS-REG-RIPN 1321
NAUNET-REG-RIPN 220510
NETFOX-REG-RIPN 21489
R01-REG-RIPN 854982
REGFORMAT-REG-RIPN 399
REGGI-REG-RIPN 65716
REGISTR1-REG-RIPN 58
REGISTRANT-REG-RIPN 54794
REGISTRATOR-REG-RIPN 223443
REGRU-REG-RIPN 1190814
REGTIME-REG-RIPN 343050
RELCOM-REG-RIPN 151
RTCOMM-REG-RIPN 55
RU-CENTER-REG-RIPN 1284347
SALENAMES-REG-RIPN 142167
TCI-REG-RIPN 1
TEST-REG-RIPN 1
TESTMONITOR-REG-RIPN 2
UNINIC-REG-RIPN 1823
WEBNAMES-REG-RIPN 68
Каковы итоги? Hadoop MapReduce затратил 25 секунд на обработку данных (17 секунд map, 8 секунд reduce), в то время как сырая обработка посредством Python и sort заняла 12 секунд. Почему медленнее? В данном случае это объясняется тем, что обрабатывались весьма малые объемы данных и оверхед на управление MapReduce превысил преимущества полного распараллеливания.

Для того, чтобы повторно запускать нашу задачу, нужно каждый раз удалять папку с результатами работы:
hadoop dfs -rmr /users/domains/ru_domains_registrar_count

Как было указано, за счет полной параллелизации возможно добавить любое число reducer потоков, например, давайте создадим 4 потока:

hadoop jar /usr/share/hadoop/contrib/streaming/hadoop-*streaming*.jar  -D mapred.reduce.tasks=4 -file /opt/mapper.py -mapper /opt/mapper.py -file /opt/reducer.py -reducer /opt/reducer.py -input /users/domains/ru_domains -output /users/domains/ru_domains_registrar_count

Результатат почти такой же, 27 секунд. Почему? Да все по тому же - малый объем данных для обработки. 5 миллионов строк для Hadoop маловато :(

Но при использовании более 1 reducer потока у нас получается, что выходных файлов создается по числу потоков:

hadoop dfs -ls /users/domains/ru_domains_registrar_countFound 5 items
-rw-------   3 hdfs supergroup          0 2013-04-08 10:45 /users/domains/ru_domains_registrar_count/_SUCCESS
-rw-------   3 hdfs supergroup        100 2013-04-08 10:45 /users/domains/ru_domains_registrar_count/part-00000
-rw-------   3 hdfs supergroup        128 2013-04-08 10:45 /users/domains/ru_domains_registrar_count/part-00001
-rw-------   3 hdfs supergroup        208 2013-04-08 10:45 /users/domains/ru_domains_registrar_count/part-00002
-rw-------   3 hdfs supergroup        167 2013-04-08 10:45 /users/domains/ru_domains_registrar_count/part-00003
Что с ними делать? Да еще раз прогнать их через mapreduce (но уже на порядки быстрее) или тупо сцепить (при этом возможно дублирование записей) :)

Источник (вообще, люто рекомендую автора, блог не просто крутой, а МЕГА крутой!): http://www.michael-noll.com/tutorials/writing-an-hadoop-mapreduce-program-in-python/

No comments :

Post a Comment

Note: only a member of this blog may post a comment.