Hadoop & Exemplos de Mapreduce: Criar o primeiro programa em Java

Índice:

Anonim

Neste tutorial, você aprenderá a usar o Hadoop com exemplos de MapReduce. Os dados de entrada usados ​​são SalesJan2009.csv. Ele contém informações relacionadas a vendas, como nome do produto, preço, forma de pagamento, cidade, país do cliente, etc. O objetivo é descobrir o número de produtos vendidos em cada país.

Neste tutorial, você aprenderá-

  • Primeiro programa Hadoop MapReduce
  • Explicação da classe SalesMapper
  • Explicação da classe SalesCountryReducer
  • Explicação da classe SalesCountryDriver

Primeiro programa Hadoop MapReduce

Agora, neste tutorial do MapReduce, criaremos nosso primeiro programa Java MapReduce:

Dados de SalesJan2009

Certifique-se de ter o Hadoop instalado. Antes de iniciar o processo real, altere o usuário para 'hduser' (id usado durante a configuração do Hadoop, você pode alternar para o ID do usuário usado durante a configuração de programação do Hadoop).

su - hduser_

Passo 1)

Crie um novo diretório com o nome MapReduceTutorial como shwon no exemplo de MapReduce abaixo

sudo mkdir MapReduceTutorial

Dê permissões

sudo chmod -R 777 MapReduceTutorial

SalesMapper.java

package SalesCountry;import java.io.IOException;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesMapper extends MapReduceBase implements Mapper  {private final static IntWritable one = new IntWritable(1);public void map(LongWritable key, Text value, OutputCollector  output, Reporter reporter) throws IOException {String valueString = value.toString();String[] SingleCountryData = valueString.split(",");output.collect(new Text(SingleCountryData[7]), one);}}

SalesCountryReducer.java

package SalesCountry;import java.io.IOException;import java.util.*;import org.apache.hadoop.io.IntWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapred.*;public class SalesCountryReducer extends MapReduceBase implements Reducer {public void reduce(Text t_key, Iterator values, OutputCollector output, Reporter reporter) throws IOException {Text key = t_key;int frequencyForCountry = 0;while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}output.collect(key, new IntWritable(frequencyForCountry));}}

SalesCountryDriver.java

package SalesCountry;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.*;import org.apache.hadoop.mapred.*;public class SalesCountryDriver {public static void main(String[] args) {JobClient my_client = new JobClient();// Create a configuration object for the jobJobConf job_conf = new JobConf(SalesCountryDriver.class);// Set a name of the Jobjob_conf.setJobName("SalePerCountry");// Specify data type of output key and valuejob_conf.setOutputKeyClass(Text.class);job_conf.setOutputValueClass(IntWritable.class);// Specify names of Mapper and Reducer Classjob_conf.setMapperClass(SalesCountry.SalesMapper.class);job_conf.setReducerClass(SalesCountry.SalesCountryReducer.class);// Specify formats of the data type of Input and outputjob_conf.setInputFormat(TextInputFormat.class);job_conf.setOutputFormat(TextOutputFormat.class);// Set input and output directories using command line arguments,//arg[0] = name of input directory on HDFS, and arg[1] = name of output directory to be created to store the output file.FileInputFormat.setInputPaths(job_conf, new Path(args[0]));FileOutputFormat.setOutputPath(job_conf, new Path(args[1]));my_client.setConf(job_conf);try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}}}

Baixe os arquivos aqui

Verifique as permissões de todos esses arquivos

e se as permissões de 'leitura' estiverem faltando, conceda o mesmo

Passo 2)

Exportar classpath conforme mostrado no exemplo do Hadoop abaixo

export CLASSPATH="$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-core-2.2.0.jar:$HADOOP_HOME/share/hadoop/mapreduce/hadoop-mapreduce-client-common-2.2.0.jar:$HADOOP_HOME/share/hadoop/common/hadoop-common-2.2.0.jar:~/MapReduceTutorial/SalesCountry/*:$HADOOP_HOME/lib/*"

Etapa 3)

Compile os arquivos Java (esses arquivos estão presentes no diretório Final-MapReduceHandsOn ). Seus arquivos de classe serão colocados no diretório do pacote

javac -d . SalesMapper.java SalesCountryReducer.java SalesCountryDriver.java

Este aviso pode ser ignorado com segurança.

Esta compilação criará um diretório em um diretório atual denominado com o nome do pacote especificado no arquivo fonte java (ou seja, SalesCountry em nosso caso) e colocará todos os arquivos de classe compilados nele.

Passo 4)

Crie um novo arquivo Manifest.txt

sudo gedit Manifest.txt

adicione as seguintes linhas a ele,

Main-Class: SalesCountry.SalesCountryDriver

SalesCountry.SalesCountryDriver é o nome da classe principal. Observe que você deve pressionar a tecla Enter no final desta linha.

Etapa 5)

Crie um arquivo Jar

jar cfm ProductSalePerCountry.jar Manifest.txt SalesCountry/*.class

Verifique se o arquivo jar foi criado

Etapa 6)

Inicie o Hadoop

$HADOOP_HOME/sbin/start-dfs.sh
$HADOOP_HOME/sbin/start-yarn.sh

Etapa 7)

Copie o arquivo SalesJan2009.csv em ~ / inputMapReduce

Agora use o comando abaixo para copiar ~ / inputMapReduce para HDFS.

$HADOOP_HOME/bin/hdfs dfs -copyFromLocal ~/inputMapReduce /

Podemos ignorar esse aviso com segurança.

Verifique se um arquivo foi realmente copiado ou não.

$HADOOP_HOME/bin/hdfs dfs -ls /inputMapReduce

Etapa 8)

Executar trabalho MapReduce

$HADOOP_HOME/bin/hadoop jar ProductSalePerCountry.jar /inputMapReduce /mapreduce_output_sales

Isso criará um diretório de saída denominado mapreduce_output_sales no HDFS. O conteúdo deste diretório será um arquivo contendo vendas de produtos por país.

Etapa 9)

O resultado pode ser visto através da interface de comando como,

$HADOOP_HOME/bin/hdfs dfs -cat /mapreduce_output_sales/part-00000

Os resultados também podem ser vistos por meio de uma interface da web como-

Abra r em um navegador da web.

Agora selecione 'Navegar no sistema de arquivos' e navegue até / mapreduce_output_sales

Parte aberta -r-00000

Explicação da classe SalesMapper

Nesta seção, entenderemos a implementação da classe SalesMapper .

1. Começamos especificando um nome de pacote para nossa classe. SalesCountry é o nome do nosso pacote. Observe que a saída da compilação, SalesMapper.class , irá para um diretório denominado por este nome de pacote: SalesCountry .

Em seguida, importamos pacotes de bibliotecas.

O instantâneo abaixo mostra uma implementação da classe SalesMapper-

Exemplo de explicação do código:

1. Definição da classe SalesMapper-

public class SalesMapper extends MapReduceBase implementa Mapper {

Cada classe do mapeador deve ser estendida da classe MapReduceBase e deve implementar a interface do mapeador .

2. Definindo a função de 'mapa' -

public void map(LongWritable key,Text value,OutputCollector output,Reporter reporter) throws IOException

A parte principal da classe Mapper é um método 'map ()' que aceita quatro argumentos.

Em cada chamada para o método 'map ()' , um par de valores-chave ( 'chave' e 'valor' neste código) é passado.

O método 'map ()' começa dividindo o texto de entrada que é recebido como um argumento. Ele usa o tokenizer para dividir essas linhas em palavras.

String valueString = value.toString();String[] SingleCountryData = valueString.split(",");

Aqui, ',' é usado como um delimitador.

Depois disso, um par é formado usando um registro no 7º índice do array 'SingleCountryData' e um valor '1' .

output.collect (new Text (SingleCountryData [7]), um);

Estamos escolhendo o registro no 7º índice porque precisamos dos dados do país e ele está localizado no 7º índice na matriz 'SingleCountryData' .

Observe que nossos dados de entrada estão no formato abaixo (onde País está no índice, com 0 como índice inicial) -

Transaction_date, Product, Price, Payment_Type, Name, City, State, Country , Account_Created, Last_Login, Latitude, Longitude

Uma saída do mapeador é novamente um par de valores-chave que é gerado usando o método 'collect ()' de 'OutputCollector' .

Explicação da classe SalesCountryReducer

Nesta seção, entenderemos a implementação da classe SalesCountryReducer .

1. Começamos especificando um nome do pacote para nossa classe. SalesCountry é o nome de nosso pacote. Observe que a saída da compilação, SalesCountryReducer.class , irá para um diretório denominado por este nome de pacote: SalesCountry .

Em seguida, importamos pacotes de bibliotecas.

O instantâneo abaixo mostra uma implementação da classe SalesCountryReducer-

Explicação do código:

1. Definição da classe SalesCountryReducer-

public class SalesCountryReducer extends MapReduceBase implementa Reducer {

Aqui, os primeiros dois tipos de dados, 'Texto' e 'IntWritable' são tipos de dados de valor-chave de entrada para o redutor.

A saída do mapeador está no formato , . Esta saída do mapeador se torna uma entrada para o redutor. Portanto, para alinhar com seu tipo de dados, Text e IntWritable são usados ​​como tipo de dados aqui.

Os dois últimos tipos de dados, 'Texto' e 'IntWritable', são tipos de dados de saída gerados pelo redutor na forma de par de valores-chave.

Cada classe do redutor deve ser estendida da classe MapReduceBase e deve implementar a interface do Redutor .

2. Definindo a função 'reduzir'

public void reduce( Text t_key,Iterator values,OutputCollector output,Reporter reporter) throws IOException {

Uma entrada para o método reduce () é uma chave com uma lista de vários valores.

Por exemplo, em nosso caso, será-

, , , , , .

Isso é dado ao redutor como

Portanto, para aceitar argumentos desta forma, os primeiros dois tipos de dados são usados, viz., Texto e Iterator . Texto é um tipo de dados de chave e Iterator é um tipo de dados para lista de valores dessa chave.

O próximo argumento é do tipo OutputCollector que coleta a saída da fase do redutor.

O método reduz () começa copiando o valor-chave e inicializando a contagem de frequência para 0.

Chave de texto = t_key; frequência interna para país = 0;

Em seguida, usando o loop ' while ' , iteramos através da lista de valores associados à chave e calculamos a frequência final somando todos os valores.

 while (values.hasNext()) {// replace type of value with the actual type of our valueIntWritable value = (IntWritable) values.next();frequencyForCountry += value.get();}

Agora, enviamos o resultado para o coletor de saída na forma de chave e contagem de frequência obtida .

O código abaixo faz isso-

output.collect(key, new IntWritable(frequencyForCountry));

Explicação da classe SalesCountryDriver

Nesta seção, entenderemos a implementação da classe SalesCountryDriver

1. Começamos especificando um nome de pacote para nossa classe. SalesCountry é o nome de nosso pacote. Observe que a saída da compilação, SalesCountryDriver.class , irá para o diretório com este nome de pacote: SalesCountry .

Aqui está uma linha especificando o nome do pacote seguido pelo código para importar pacotes de biblioteca.

2. Defina uma classe de driver que criará um novo trabalho de cliente, objeto de configuração e anunciará as classes Mapper e Reducer.

A classe do driver é responsável por definir nosso trabalho MapReduce para ser executado no Hadoop. Nesta classe, especificamos o nome do trabalho, o tipo de dados de entrada / saída e os nomes das classes de mapeador e redutor .

3. No trecho de código a seguir, definimos os diretórios de entrada e saída que são usados ​​para consumir o conjunto de dados de entrada e produzir a saída, respectivamente.

arg [0] e arg [1] são os argumentos de linha de comando passados ​​com um comando fornecido no MapReduce prático, ou seja,

$ HADOOP_HOME / bin / hadoop jar ProductSalePerCountry.jar / inputMapReduce / mapreduce_output_sales

4. Acione nosso trabalho

Abaixo do código, inicie a execução do trabalho MapReduce-

try {// Run the jobJobClient.runJob(job_conf);} catch (Exception e) {e.printStackTrace();}