No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

Inferencia de tipos de datos

14/25
Recursos

Aportes 18

Preguntas 5

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

Aqui esta la solucion del reto: Crear DF con los archivos faltantes

evento_schema = StructType([
    StructField("evento_id", IntegerType(), False),
    StructField("evento", StringType(), False), 
    StructField("deporte_id", IntegerType(), False)
])

eventoDF = sqlContext.read.format("csv").\
        option("header", True).\
        schema(evento_schema).\
        load(path+"evento.csv")

eventoDF.show(4)

+---------+--------------------+----------+
|evento_id|              evento|deporte_id|
+---------+--------------------+----------+
|        1|Basketball Men's ...|         1|
|        2|Judo Men's Extra-...|         2|
|        3|Football Men's Fo...|         3|
|        4|Tug-Of-War Men's ...|         4|
+---------+--------------------+----------+
only showing top 4 rows

paises_schema = StructType([
    StructField("paises_id", IntegerType(), False),
    StructField("equipo", StringType(), False), 
    StructField("sigla", StringType(), False)
])

paisesDF = sqlContext.read.format("csv").\
        option("header", True).\
        schema(paises_schema).\
        load(path+"paises.csv")

paisesDF.show(4)

+---------+--------------------+-----+
|paises_id|              equipo|sigla|
+---------+--------------------+-----+
|        1|         30. Februar|  AUT|
|        2|A North American ...|  MEX|
|        3|           Acipactli|  MEX|
|        4|             Acturus|  ARG|
+---------+--------------------+-----+
only showing top 4 rows

resultados_schema = StructType([
    StructField("resultado_id", IntegerType(), False),
    StructField("medalla", StringType(), False),
    StructField("deportista_id", IntegerType(), False),
    StructField("juego_id", IntegerType(), False),
    StructField("evento_id", IntegerType(), False)
])

resultadosDF = sqlContext.read.format("csv").\
            option("header", True).\
            schema(resultados_schema).\
            load(path+"resultados.csv")

resultadosDF.show(4)

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           1|     NA|            1|      39|        1|
|           2|     NA|            2|      49|        2|
|           3|     NA|            3|       7|        3|
|           4|   Gold|            4|       2|        4|
+------------+-------+-------------+--------+---------+
only showing top 4 rows


de verdad pienso que deberia explicar de manera mas detalladas

Función alternativa para eliminar el encabezado del RDD

def without_header(rdd):
    header = rdd.first()
    rdd = rdd.filter( lambda l: l != header)
    return rdd

La clase muestra el uso de SQLContext para Spark 2.4, pero a la fecha que esribo esto es 08-noviembre-2021 y ya se encuntra depreacado SQLContext en la vesión 3.2.

Me salió un mensaje de error porque no puede transformar los RDD’s que habiamos trabajado en clases anteriores.

Lo solucioné de la siguiente manera:

  1. Borrar los headers de cafa RDD con la función creada en la clase.
  2. Transformar cada RDD a dataframe con el método propio del RDD llamdo .toDF().
## Ejemplo
deportistaDF = deportistaOlimpicoRDD.toDF([lista con el nombre de cada columna])
  1. Con el método anterior, transforma todo el esquema a tipo de dato string. Lo puedes berificar con:
deportistaDF.printSchema()
  1. Para poder transformar el esquema de cada RDD, cree una función que pocas palabras itera sobre el diccionario y sobreescribe el dataframe.
from typing import Dict, Any
def change_types_df(dict_types: Dict[str, Any], df: Any) -> Any:
    """
    Change data types.

    This function cast data types of a *Spark* dataframe.

    Parameters:
    - **dict_types: Dict[str, Any]**: A Python dictionary with columns names and data types to cast in the dataframe.
    - **df: DataFrame**: A *Spark* dataframe.

    Returns:
    - **df: Any**: The dataframe with data types casted of every column.

    """
    for column, data_type in dict_types.items():
        df = df.withColumn(
            column,
        df[column].cast(data_type)
    )
    return df
  1. Cree una lista de diccionarios con los tipos de datos para cada esquema (los nombres de headers son las llaves y nombrarlas como quieras):
from typing import List
casting: List[Dict[str, Any]] = [
    {
        'athetle_id': IntegerType(),
        'athetle_name': StringType(),
        'athetle_gender': IntegerType(),
        'athetle_age': IntegerType(),
        'athetle_height': FloatType(),
        'athetle_weight': FloatType(),
        'team_id': IntegerType()
    },
    {
        'country_id': IntegerType(),
        'team': StringType(),
        'country': StringType()
    },
    {
        'result_id': IntegerType(),
        'medal': StringType(),
        'athetle_id': IntegerType(),
        'game_id': IntegerType(),
        'event_id': IntegerType()
    }
]
  1. Ejecutar la función para realizar el casting de los tipos de datos para cada dataframe.
# Ejemplo
deportistaDF = change_types_df(casting[0], deportistaDF)
  1. Para verificar el esquema de cada dataframe:
deportistaDF.printSchema()

Creo que más fácil resubir los data del archivo csv directamente a un dataframe porque ya están semiestrucurados.

Definitivamente hay conceptos que no son claros. La función mapPartitionsWithIndex() va más allá de la vaga explicación que da el profesor. mapPartitionsWithIndex() se relaciona con el concepto de “partición” ¿qué significa una partición de un RDD?¿cómo se trabajan las particiones?

La verdad no estuvo padre que hicieras la transición de la última clase a esta sin una apropiada da introducción de las dependencias y en especial de sqlContext.

El profesor sabe bastante sobre Big data. Seria bueno que reflexionara sobre los temas que imparte, su secuencia y la manera que explica. Da saltos cuánticos entre detalles vitales.

PROFUNDIZAR ESTE CONCEPTO PARA OPTIMIZAR LOS RECURSOS

Los headers del archivo leido también se pueden remover de la siguiente forma:

header = olympicAthletesRDD.first()
txt = olympicAthletesRDD.filter(lambda line: line != header)
txt.take(5)

Sin embargo, filter() al igual que map() aplican la función sobre cada elemento del RDD. mapPartitions() y mapPartitionsWithIndex() ejecutan la función por partición (si no distribuiste manualmente los recursos, Spark lo hace por defecto y para una laptop regularmente divide la carga de trabajo en el número de cores) lo cual hace mucho mas rápida la operación.

Acá mi Código del Reto:

Crear una instancia de SparkSession

spark = SparkSession.builder
.appName(“Inferencia de datos”)
.getOrCreate()
deportistaDF = spark.createDataFrame(deportistaOlimpicoRDD, schema)
Esto se us¿tiliza en lugar de sqlContext

Comparto mi código en google colaboratory

Schemas

deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l : (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4]),
    float(l[5]),
    int(l[6])
))

schemaDeportista = StructType([
                     StructField("deportista_id",IntegerType(),False),
                     StructField("nombre",StringType(),False),
                     StructField("genero",IntegerType(),False),
                     StructField("edad",IntegerType(),False),
                     StructField("altura",IntegerType(),False),
                     StructField("peso",FloatType(),False),
                     StructField("equipo_id",IntegerType(),False),
])

deporte_schema = StructType([
                             StructField("deporte_id",IntegerType(),False),
                             StructField("deporte", StringType(),False)
])

evento_schema = StructType([
                            StructField("evento_id",IntegerType(),False),
                            StructField("evento",StringType(),False),
                            StructField("deporte_id",IntegerType(),False)
])

paises_schema = StructType([
                            StructField("id",IntegerType(),False),
                            StructField("equipo",StringType(),False),
                            StructField("sigla",StringType(),False)
])

resultados_schema = StructType([
                            StructField("resultado_id",IntegerType(),False),
                            StructField("medalla",StringType(),False),
                            StructField("deportista_id",IntegerType(),False),
                            StructField("juego_id",IntegerType(),False),
                            StructField("evento_id",IntegerType(),False)
])

función de eliminación de encabezados

def eliminaEncabezado(indice, iterador):
  return iter(list(iterador)[1:])

tipos de datos al RDD

deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l : (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4]),
    float(l[5]),
    int(l[6])
))

deporteRDD = deporteRDD.map(lambda l : (
    int(l[0]),
    l[1]
))

eventoRDD = eventoRDD.map(lambda l : (
    int(l[0]),
    l[1],
    int(l[2])
))

paisesRDD = paisesRDD.map(lambda l : (
    int(l[0]),
    l[1],
    l[2]
))

resultadosRDD = resultadosRDD.map(lambda l : (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4])
))

creación de dataframes

deportistaDF = sqlContext.createDataFrame(deportistaOlimpicoRDD, schemaDeportista).show(5)
deporteDF = sqlContext.createDataFrame(deporteRDD, deporte_schema).show(5)
eventoDF = sqlContext.createDataFrame(eventoRDD, evento_schema).show(2)
paisesDF = sqlContext.createDataFrame(paisesRDD, paises_schema).show(5)
resultadosDF = sqlContext.createDataFrame(resultadosRDD, resultados_schema).show(5)

Mi solución fue crear una funcion que genere los los DataFrame a partir del CSV

def crearDF(schema, pathfilename):
    return sqlContext.read.schema(schema).option("header", "true").csv(pathfilename)

Y luego generar los schemas y los path y pasarlos a la funcion que me retorma un dataframe

Yo lo he realizado en Databricks y me ah quedado asi, la lectura de todos los archivos CSV creando los dataframe.

# File location and type
file_source = "/FileStore/shared_uploads/[email protected]/"
file_type = "csv"

# File options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
list_files= ['deportista.csv','evento.csv','paises.csv','resultados.csv']

# The applied options are for CSV files.
for name in range(len(list_files)):
    string='raw_' + re.sub(".csv","",list_files[name]) 
    globals()[string] = spark.read.format(file_type ) \
                              .option("inferSchema", infer_schema) \
								.option("header", first_row_is_header) \
								.option("multiline", "true") \
                              .option("sep", delimiter).load(file_source + list_files[name])
    print(' DF_Name: ', string)

#Output:
 DF_Name:  raw_deportista
 DF_Name:  raw_evento
 DF_Name:  raw_paises
 DF_Name:  raw_resultados

Esta es mi solución al reto

Schemas

# Schemas

schemas = {
    "deporte": StructType([
        StructField("deporte_id", IntegerType(), False),
        StructField("deporte", StringType(), False)
    ]),
    "deportista": StructType([
        StructField("deportista_id", IntegerType(), False),
        StructField("nombre", StringType(), False),
        StructField("genero", StringType(), False),
        StructField("edad", IntegerType(), False),
        StructField("altura", IntegerType(), False),
        StructField("peso", FloatType(), False),
        StructField("equipo_id", IntegerType(), False)
    ]),
    "evento": StructType([
        StructField("evento_id", IntegerType(), False),
        StructField("evento", StringType(), False),
        StructField("deporte_id", IntegerType(), False)
    ]),
    "juegos": StructType([
        StructField("juego_id", IntegerType(), False),
        StructField("nombre_juego", StringType(), False),
        StructField("anio", IntegerType(), False),
        StructField("temporada", StringType(), False),
        StructField("ciudad", StringType(), False)
    ]),
    "paises": StructType([
        StructField("pais_id", IntegerType(), False),
        StructField("equipo", StringType(), False),
        StructField("sigla", StringType(), False)
    ]),
    "resultados": StructType([
        StructField("resultado_id", IntegerType(), False),
        StructField("medalla", StringType(), False),
        StructField("deportista_id", IntegerType(), False),
        StructField("juego_id", IntegerType(), False),
        StructField("evento_id", IntegerType(), False)
    ])
}

DataFrames

# Dataframes

dp1 = sql_context.read.schema(schemas['deportista']).option("header", "true").csv(f"{data_dir}/deportista.csv")
dp2 = sql_context.read.schema(schemas['deportista']).option("header", "false").csv(f"{data_dir}/deportista2.csv")

dataframes = {
    "deporte": sql_context.read.schema(schemas['deporte']).option("header", "true").csv(f"{data_dir}/deporte.csv"),
    "deportista": dp1.union(dp2),
    "evento": sql_context.read.schema(schemas['evento']).option("header", "true").csv(f"{data_dir}/evento.csv"),
    "juegos": sql_context.read.schema(schemas['juegos']).option("header", "true").csv(f"{data_dir}/juegos.csv"),
    "paises": sql_context.read.schema(schemas['paises']).option("header", "true").csv(f"{data_dir}/paises.csv"),
    "resultados": sql_context.read.schema(schemas['resultados']).option("header", "true").csv(f"{data_dir}/resultados.csv")
}
del(dp1, dp2)

Ejemplo show

dataframes['deporte'].show(5)

Comparto mi solución al reto planteado

#Creo el schema de mi juegoDF
juegoSchema = StructType([
    StructField("juego_id", IntegerType(), False),
    StructField("nombre_juego", StringType(), False),
    StructField("annio", IntegerType(), False),
    StructField("temporada", StringType(), False),
    StructField("ciudad", StringType(), False)
])

#Cargo juegoDF
juegoDF = sqlContext.read.schema(juegoSchema)\
    .option("header","true").csv('files/juegos.csv')

Para el caso de deportista deportistaDF1 y deportistaDF2 utilizo el mismo schema ya que contienen informaciónestructurada de la misma manera.

deportistaSchema = StructType([
    StructField("deportista_id", IntegerType(), False),
    StructField("nombre", StringType(), False),
    StructField("genero", IntegerType(), False),
    StructField("edad", IntegerType(), False),
    StructField("altura", IntegerType(), False),
    StructField("peso", FloatType(), False),
    StructField("equipo_id", IntegerType(), False)
])


deportistaDF1 = sqlContext.read.schema(deportistaSchema)\
    .option("header","true").csv('files/deportista.csv')

deportistaDF2 = sqlContext.read.schema(deportistaSchema)\
    .option("header","true").csv('files/deportista2.csv')

deportistaDF = deportistaDF1.union(deportistaDF2)
resultadoSchemaF = StructType([
    StructField("resultado_id", IntegerType(), False),
    StructField("medalla", StringType(), False),
    StructField("deportista_id", IntegerType(), False),
    StructField("juego_id", IntegerType(), False),
    StructField("evento_id", IntegerType(), False),
])
resultadoDF = sqlContext.read.schema(resultadoSchemaF)\
    .option("header","true").csv('files/resultados.csv')

Si alguien no entendió muy bien como quitó el encabezado dejo mis notas:

deportistaOlimpicoRDD.take(5)

Tiene encabezado

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]
<h3>Remove head from RDD</h3>
  1. create a funtion that make a iteration for each value since row number 1 with out head
def eliminarEncabezado(indice, iterador):
  return iter(list(iterador)[1:])
  1. Cut RDD in two partitions and take the second partition that was created with the las function
deportistaOlimpicoRDD = deportistaOlimpicoRDD.mapPartitionsWithIndex(eliminarEncabezado)
deportistaOlimpicoRDD.take(5)

Sin encabezado

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

A mi me está saliendo todo bien, pero al momento de darle

.show(5)

dentro de:

deportistaDF = sqlContext.createDataFrame(deportistaOlimpicoRDD,schema).show(5)

Me sale el siguiente error:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-20-49619bf78f62> in <module>
----> 1 deportistaDF.show(2)

~/Documents/platzi/curso_spark/spark-3.0.1-bin-hadoop2.7/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    438         """
    439         if isinstance(truncate, bool) and truncate:
--> 440             print(self._jdf.showString(n, 20, vertical))
    441         else:
    442             print(self._jdf.showString(n, int(truncate), vertical))

~/Documents/platzi/curso_spark/spark-3.0.1-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

~/Documents/platzi/curso_spark/spark-3.0.1-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
    126     def deco(*a, **kw):
    127         try:
--> 128             return f(*a, **kw)
    129         except py4j.protocol.Py4JJavaError as e:
    130             converted = convert_exception(e.java_exception)

~/Documents/platzi/curso_spark/spark-3.0.1-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o85.showString.
: java.lang.IllegalStateException: SparkContext has been shutdown
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2091)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)

Creo que está todo bien porque todo funciona y si solo le pongo, por ejemplo:

deportistaDF

Me aparece lo siguiente:

DataFrame[deportista_id: int, nombre: string, genero: int, edad: int, altura: int, peso: float, equipo_id: int]

Seguiré con el curso, espero sea un error mínimo.

Por cierto, este es mi código, según yo es igual al del profesor, pero pues uno nunca sabe, a la mejor se me pasó algo, sobretodo en las primeras líneas porque el profe no muestra el inicio de su jupyter notebook:

from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.types import Row
sc = SparkContext(master="local", appName="RDD_to_df")
path = "repositorio/files/"

deportistaOlimpicoRDD = sc.textFile(path+"deportista.csv") \
    .map(lambda l : l.split(","))
deportistaOlimpicoRDD.take(5)
#Removemos el encabezado con la siguiente función.
#retorna una lista limpia que ya posee los valores que nosostros queremos.
#iter nos devuelve valor a valor de lo que nosotros procesemos.
#para asegurarnos que no tenemos ninguna falla de contención de errores vamos a transformar el parámetro que nosotros
#recibamos como lista. DEvolvemos a partir del primer valor de RDD, es decir del índice 1 -> [1:]

def eliminaEncabezado(indice, interador):
    return iter(list(interador)[1:])
#mapPartitionWithIndex le pasa dos parámetros a la función que nosotros le apliquemos.
#1. Toda la columna y 2. Un valor por índice
deportistaOlimpicoRDD = deportistaOlimpicoRDD.mapPartitionsWithIndex(eliminaEncabezado)
#ya no tiene encabezado
deportistaOlimpicoRDD.take(6)
#Antes de transformar el RDD, debemos transformar los valores del mismo.
#Sobre deportistaOlimpicoRDD haremos un mapeo y rasnformaremos los valores:

deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda x: (
int(x[0]),
x[1],
int(x[2]),
int(x[3]),
int(x[4]),
float(x[5]),
int(x[6])    
))
schema = StructType([
    StructField("deportista_id",IntegerType(),False),
    StructField("nombre",StringType(),False),
    StructField("genero",IntegerType(),False),
    StructField("edad",IntegerType(),False),
    StructField("altura",IntegerType(),False),
    StructField("peso",FloatType(),False),
    StructField("equipo_id",IntegerType(),False)
])

Y aquí viene el error:

deportistaDF = sqlContext.createDataFrame(deportistaOlimpicoRDD,schema).show(5)
<h3>Reto</h3>
  1. Crear los schemas
# create Schemas

deporte_schema = StructType([
          StructField("deporte_id",IntegerType(),False),
          StructField("deporte", StringType(),False)
])

evento_schema = StructType([
          StructField("evento_id",IntegerType(),False),
          StructField("evento",StringType(),False),
          StructField("deporte_id",IntegerType(),False)
])

paises_schema = StructType([
          StructField("id",IntegerType(),False),
          StructField("equipo",StringType(),False),
          StructField("sigla",StringType(),False)
])

resultados_schema = StructType([
          StructField("resultado_id",IntegerType(),False),
          StructField("medalla",StringType(),False),
          StructField("deportista_id",IntegerType(),False),
          StructField("juego_id",IntegerType(),False),
          StructField("evento_id",IntegerType(),False)
])
  1. Crear dataframes
# create dataframes
deporteDF = sqlContext.read.schema(deporte_schema).option('header','true').csv(path+'deporte.csv')
eventoDF = sqlContext.read.schema(evento_schema).option('header','true').csv(path+'evento.csv')
paisesDF = sqlContext.read.schema(paises_schema).option('header','true').csv(path+'paises.csv')
resultadosDF = sqlContext.read.schema(resultados_schema).option('header','true').csv(path+'resultados.csv')
  1. Show results
juegoDF.show(5)
deporteDF.show(5)
eventoDF.show(5)
paisesDF.show(5)
resultadosDF.show(5)
<h4>Output</h4>

±-------±----------±--------±-----+
|juego_id| anio|temporada|ciudad|
±-------±----------±--------±-----+
| 1|1896 Verano| 1896|Verano|
| 2|1900 Verano| 1900|Verano|
| 3|1904 Verano| 1904|Verano|
| 4|1906 Verano| 1906|Verano|
| 5|1908 Verano| 1908|Verano|
±-------±----------±--------±-----+
only showing top 5 rows

±---------±------------+
|deporte_id| deporte|
±---------±------------+
| 1| Basketball|
| 2| Judo|
| 3| Football|
| 4| Tug-Of-War|
| 5|Speed Skating|
±---------±------------+
only showing top 5 rows

±--------±-------------------±---------+
|evento_id| evento|deporte_id|
±--------±-------------------±---------+
| 1|Basketball Men’s …| 1|
| 2|Judo Men’s Extra-…| 2|
| 3|Football Men’s Fo…| 3|
| 4|Tug-Of-War Men’s …| 4|
| 5|Speed Skating Wom…| 5|
±--------±-------------------±---------+
only showing top 5 rows

±–±-------------------±----+
| id| equipo|sigla|
±–±-------------------±----+
| 1| 30. Februar| AUT|
| 2|A North American …| MEX|
| 3| Acipactli| MEX|
| 4| Acturus| ARG|
| 5| Afghanistan| AFG|
±–±-------------------±----+
only showing top 5 rows

±-----------±------±------------±-------±--------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
±-----------±------±------------±-------±--------+
| 1| NA| 1| 39| 1|
| 2| NA| 2| 49| 2|
| 3| NA| 3| 7| 3|
| 4| Gold| 4| 2| 4|
| 5| NA| 5| 36| 5|
±-----------±------±------------±-------±--------+
only showing top 5 rows