No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

Curso de Hadoop

Curso de Hadoop

Benjamín Casazza

Benjamín Casazza

Comprender el funcionamiento de MapReduce

12/22
Recursos

Aportes 5

Preguntas 0

Ordenar por:

Los aportes, preguntas y respuestas son vitales para aprender en comunidad. Regístrate o inicia sesión para participar.

Un poco densa esta clase.

MapReduce con Python

Hola, en lo personal prefiero ver tanto el mapper como el reducer en código de python, creo que es mucho más intuitivo, les dejo un ejemplo:

Una vez que tengamos nuestro dataset de entrada en HDFS, por ejemplo uno de películas con las siguientes columnas:

color,director_name,num_critic_for_reviews,.......

Supongamos que queremos hacer un job que nos diga cuantas películas estan a color y en blanco y negro.

Mapper

Con el mapper podemos prepara la data para que tengamos las variables necesarias, que simplemente sería el color que tiene, para probarlo podemos hacer un cat o head sobre el dataset:

head dataset.csv | python3 mapper.py

Reducer

Una vez confirma la correcta separacion de los datos podemos ordenar la salida con sort y pasar la por el reducer, que será el encargado de contar cuantas veces salió cada palabra, o en este caso cuantas peliculas hay en blanco y negro, a color o alguna otra.

cat dataset.csv | python3 mapper.py | sort | python3 reducer.py

JOBS

Con lo anteior pruebas el pipeline completo antes de ejecutar el JOB directo en hadoop, que es posible hacerlo con hadoop streaming, el comando que se necesita es el siguiente:

NOTA recuerda que para probarlo en el cluster el dataset debe de estar importado en hdfs como lo ha mostrado Benjamín,

hadoop jar /usr/lib/hadoop/hadoop-streaming-3.2.2.jar -file ./mapper.py ./reducer.py -mapper "python mapper.py" -reducer "python reducer.py" -input storesdata/data.csv -output testout/

Gracias por llegar al final! Espero alguien lo encuentre de ayuda!! 😄

Si tienen problemas por que la rama del repositorio de GitHub no tiene el archivo start-container.sh, lo único que deben hacer es tomar el mismo archivo de alguna de las otras ramas.

En caso de que no tenga permiso de ejecución, deben ejecutar en linux:

chmod +x start-container.sh

Ya con eso funciona para seguir el curso

Code


cd 3_MapReduce

./start-container.sh

./start-hadoop.sh

hadoop version

javac -version

sudo update

apt install vin

touch WordCount.java

vim WordCount.java

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {

  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);
      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {
      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }
      result.set(sum);
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

esc
:wq

mkdir input_file

echo "hoal mundo desde mapreduce" > input_file/input.txt

hdfs dfs -mkdir /WordCountTutorial

hdfs dfs -mkdir /WordCountTutorial/Input

hdfs dfs -put input_file/input.txt /WordCountTutorial/Input

hdfs dfs -ls -al /WordCountTutorial/Input

mkdir tutorial_clases

export JAVA__HOME=/usr/java/default

export PATH=${JAVA_HOME}/bin:${PATH}

export HADOOP_CLASSPATH=/usr/lib/jvm/java-7-openjdk-amd64/lib/tools.jar

hadoop com.sun.tools.javac.Main WordCount.java

ls -al

jar cf wc.jar WordCount*.class

ls -al

hadoop jar $PWD/wc.jar WordCount /WordCountTutorial/Input /WordCountTutorial/Output

hdfs dfs -cat /WordCountTutorial/Output/part-r-00000