No tienes acceso a esta clase

隆Contin煤a aprendiendo! 脷nete y comienza a potenciar tu carrera

Inferencia de tipos de datos

14/25
Recursos

Aportes 18

Preguntas 5

Ordenar por:

驴Quieres ver m谩s aportes, preguntas y respuestas de la comunidad?

o inicia sesi贸n.

Aqui esta la solucion del reto: Crear DF con los archivos faltantes

evento_schema = StructType([
    StructField("evento_id", IntegerType(), False),
    StructField("evento", StringType(), False), 
    StructField("deporte_id", IntegerType(), False)
])

eventoDF = sqlContext.read.format("csv").\
        option("header", True).\
        schema(evento_schema).\
        load(path+"evento.csv")

eventoDF.show(4)

+---------+--------------------+----------+
|evento_id|              evento|deporte_id|
+---------+--------------------+----------+
|        1|Basketball Men's ...|         1|
|        2|Judo Men's Extra-...|         2|
|        3|Football Men's Fo...|         3|
|        4|Tug-Of-War Men's ...|         4|
+---------+--------------------+----------+
only showing top 4 rows

paises_schema = StructType([
    StructField("paises_id", IntegerType(), False),
    StructField("equipo", StringType(), False), 
    StructField("sigla", StringType(), False)
])

paisesDF = sqlContext.read.format("csv").\
        option("header", True).\
        schema(paises_schema).\
        load(path+"paises.csv")

paisesDF.show(4)

+---------+--------------------+-----+
|paises_id|              equipo|sigla|
+---------+--------------------+-----+
|        1|         30. Februar|  AUT|
|        2|A North American ...|  MEX|
|        3|           Acipactli|  MEX|
|        4|             Acturus|  ARG|
+---------+--------------------+-----+
only showing top 4 rows

resultados_schema = StructType([
    StructField("resultado_id", IntegerType(), False),
    StructField("medalla", StringType(), False),
    StructField("deportista_id", IntegerType(), False),
    StructField("juego_id", IntegerType(), False),
    StructField("evento_id", IntegerType(), False)
])

resultadosDF = sqlContext.read.format("csv").\
            option("header", True).\
            schema(resultados_schema).\
            load(path+"resultados.csv")

resultadosDF.show(4)

+------------+-------+-------------+--------+---------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
+------------+-------+-------------+--------+---------+
|           1|     NA|            1|      39|        1|
|           2|     NA|            2|      49|        2|
|           3|     NA|            3|       7|        3|
|           4|   Gold|            4|       2|        4|
+------------+-------+-------------+--------+---------+
only showing top 4 rows


de verdad pienso que deberia explicar de manera mas detalladas

Funci贸n alternativa para eliminar el encabezado del RDD

def without_header(rdd):
    header = rdd.first()
    rdd = rdd.filter( lambda l: l != header)
    return rdd

Definitivamente hay conceptos que no son claros. La funci贸n mapPartitionsWithIndex() va m谩s all谩 de la vaga explicaci贸n que da el profesor. mapPartitionsWithIndex() se relaciona con el concepto de 鈥減artici贸n鈥 驴qu茅 significa una partici贸n de un RDD?驴c贸mo se trabajan las particiones?

La verdad no estuvo padre que hicieras la transici贸n de la 煤ltima clase a esta sin una apropiada da introducci贸n de las dependencias y en especial de sqlContext.

La clase muestra el uso de SQLContext para Spark 2.4, pero a la fecha que esribo esto es 08-noviembre-2021 y ya se encuntra depreacado SQLContext en la vesi贸n 3.2.

Me sali贸 un mensaje de error porque no puede transformar los RDD鈥檚 que habiamos trabajado en clases anteriores.

Lo solucion茅 de la siguiente manera:

  1. Borrar los headers de cafa RDD con la funci贸n creada en la clase.
  2. Transformar cada RDD a dataframe con el m茅todo propio del RDD llamdo .toDF().
## Ejemplo
deportistaDF = deportistaOlimpicoRDD.toDF([lista con el nombre de cada columna])
  1. Con el m茅todo anterior, transforma todo el esquema a tipo de dato string. Lo puedes berificar con:
deportistaDF.printSchema()
  1. Para poder transformar el esquema de cada RDD, cree una funci贸n que pocas palabras itera sobre el diccionario y sobreescribe el dataframe.
from typing import Dict, Any
def change_types_df(dict_types: Dict[str, Any], df: Any) -> Any:
    """
    Change data types.

    This function cast data types of a *Spark* dataframe.

    Parameters:
    - **dict_types: Dict[str, Any]**: A Python dictionary with columns names and data types to cast in the dataframe.
    - **df: DataFrame**: A *Spark* dataframe.

    Returns:
    - **df: Any**: The dataframe with data types casted of every column.

    """
    for column, data_type in dict_types.items():
        df = df.withColumn(
            column,
        df[column].cast(data_type)
    )
    return df
  1. Cree una lista de diccionarios con los tipos de datos para cada esquema (los nombres de headers son las llaves y nombrarlas como quieras):
from typing import List
casting: List[Dict[str, Any]] = [
    {
        'athetle_id': IntegerType(),
        'athetle_name': StringType(),
        'athetle_gender': IntegerType(),
        'athetle_age': IntegerType(),
        'athetle_height': FloatType(),
        'athetle_weight': FloatType(),
        'team_id': IntegerType()
    },
    {
        'country_id': IntegerType(),
        'team': StringType(),
        'country': StringType()
    },
    {
        'result_id': IntegerType(),
        'medal': StringType(),
        'athetle_id': IntegerType(),
        'game_id': IntegerType(),
        'event_id': IntegerType()
    }
]
  1. Ejecutar la funci贸n para realizar el casting de los tipos de datos para cada dataframe.
# Ejemplo
deportistaDF = change_types_df(casting[0], deportistaDF)
  1. Para verificar el esquema de cada dataframe:
deportistaDF.printSchema()

Creo que m谩s f谩cil resubir los data del archivo csv directamente a un dataframe porque ya est谩n semiestrucurados.

PROFUNDIZAR ESTE CONCEPTO PARA OPTIMIZAR LOS RECURSOS

Los headers del archivo leido tambi茅n se pueden remover de la siguiente forma:

header = olympicAthletesRDD.first()
txt = olympicAthletesRDD.filter(lambda line: line != header)
txt.take(5)

Sin embargo, filter() al igual que map() aplican la funci贸n sobre cada elemento del RDD. mapPartitions() y mapPartitionsWithIndex() ejecutan la funci贸n por partici贸n (si no distribuiste manualmente los recursos, Spark lo hace por defecto y para una laptop regularmente divide la carga de trabajo en el n煤mero de cores) lo cual hace mucho mas r谩pida la operaci贸n.

Ac谩 mi C贸digo del Reto:

Crear una instancia de SparkSession

spark = SparkSession.builder
.appName(鈥淚nferencia de datos鈥)
.getOrCreate()
deportistaDF = spark.createDataFrame(deportistaOlimpicoRDD, schema)
Esto se us驴tiliza en lugar de sqlContext

El profesor sabe bastante sobre Big data. Seria bueno que reflexionara sobre los temas que imparte, su secuencia y la manera que explica. Da saltos cu谩nticos entre detalles vitales.

Comparto mi c贸digo en google colaboratory

Schemas

deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l : (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4]),
    float(l[5]),
    int(l[6])
))

schemaDeportista = StructType([
                     StructField("deportista_id",IntegerType(),False),
                     StructField("nombre",StringType(),False),
                     StructField("genero",IntegerType(),False),
                     StructField("edad",IntegerType(),False),
                     StructField("altura",IntegerType(),False),
                     StructField("peso",FloatType(),False),
                     StructField("equipo_id",IntegerType(),False),
])

deporte_schema = StructType([
                             StructField("deporte_id",IntegerType(),False),
                             StructField("deporte", StringType(),False)
])

evento_schema = StructType([
                            StructField("evento_id",IntegerType(),False),
                            StructField("evento",StringType(),False),
                            StructField("deporte_id",IntegerType(),False)
])

paises_schema = StructType([
                            StructField("id",IntegerType(),False),
                            StructField("equipo",StringType(),False),
                            StructField("sigla",StringType(),False)
])

resultados_schema = StructType([
                            StructField("resultado_id",IntegerType(),False),
                            StructField("medalla",StringType(),False),
                            StructField("deportista_id",IntegerType(),False),
                            StructField("juego_id",IntegerType(),False),
                            StructField("evento_id",IntegerType(),False)
])

funci贸n de eliminaci贸n de encabezados

def eliminaEncabezado(indice, iterador):
  return iter(list(iterador)[1:])

tipos de datos al RDD

deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda l : (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4]),
    float(l[5]),
    int(l[6])
))

deporteRDD = deporteRDD.map(lambda l : (
    int(l[0]),
    l[1]
))

eventoRDD = eventoRDD.map(lambda l : (
    int(l[0]),
    l[1],
    int(l[2])
))

paisesRDD = paisesRDD.map(lambda l : (
    int(l[0]),
    l[1],
    l[2]
))

resultadosRDD = resultadosRDD.map(lambda l : (
    int(l[0]),
    l[1],
    int(l[2]),
    int(l[3]),
    int(l[4])
))

creaci贸n de dataframes

deportistaDF = sqlContext.createDataFrame(deportistaOlimpicoRDD, schemaDeportista).show(5)
deporteDF = sqlContext.createDataFrame(deporteRDD, deporte_schema).show(5)
eventoDF = sqlContext.createDataFrame(eventoRDD, evento_schema).show(2)
paisesDF = sqlContext.createDataFrame(paisesRDD, paises_schema).show(5)
resultadosDF = sqlContext.createDataFrame(resultadosRDD, resultados_schema).show(5)

Mi soluci贸n fue crear una funcion que genere los los DataFrame a partir del CSV

def crearDF(schema, pathfilename):
    return sqlContext.read.schema(schema).option("header", "true").csv(pathfilename)

Y luego generar los schemas y los path y pasarlos a la funcion que me retorma un dataframe

Yo lo he realizado en Databricks y me ah quedado asi, la lectura de todos los archivos CSV creando los dataframe.

# File location and type
file_source = "/FileStore/shared_uploads/[email protected]/"
file_type = "csv"

# File options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","
list_files= ['deportista.csv','evento.csv','paises.csv','resultados.csv']

# The applied options are for CSV files.
for name in range(len(list_files)):
    string='raw_' + re.sub(".csv","",list_files[name]) 
    globals()[string] = spark.read.format(file_type ) \
                              .option("inferSchema", infer_schema) \
								.option("header", first_row_is_header) \
								.option("multiline", "true") \
                              .option("sep", delimiter).load(file_source + list_files[name])
    print(' DF_Name: ', string)

#Output:
 DF_Name:  raw_deportista
 DF_Name:  raw_evento
 DF_Name:  raw_paises
 DF_Name:  raw_resultados

Esta es mi soluci贸n al reto

Schemas

# Schemas

schemas = {
    "deporte": StructType([
        StructField("deporte_id", IntegerType(), False),
        StructField("deporte", StringType(), False)
    ]),
    "deportista": StructType([
        StructField("deportista_id", IntegerType(), False),
        StructField("nombre", StringType(), False),
        StructField("genero", StringType(), False),
        StructField("edad", IntegerType(), False),
        StructField("altura", IntegerType(), False),
        StructField("peso", FloatType(), False),
        StructField("equipo_id", IntegerType(), False)
    ]),
    "evento": StructType([
        StructField("evento_id", IntegerType(), False),
        StructField("evento", StringType(), False),
        StructField("deporte_id", IntegerType(), False)
    ]),
    "juegos": StructType([
        StructField("juego_id", IntegerType(), False),
        StructField("nombre_juego", StringType(), False),
        StructField("anio", IntegerType(), False),
        StructField("temporada", StringType(), False),
        StructField("ciudad", StringType(), False)
    ]),
    "paises": StructType([
        StructField("pais_id", IntegerType(), False),
        StructField("equipo", StringType(), False),
        StructField("sigla", StringType(), False)
    ]),
    "resultados": StructType([
        StructField("resultado_id", IntegerType(), False),
        StructField("medalla", StringType(), False),
        StructField("deportista_id", IntegerType(), False),
        StructField("juego_id", IntegerType(), False),
        StructField("evento_id", IntegerType(), False)
    ])
}

DataFrames

# Dataframes

dp1 = sql_context.read.schema(schemas['deportista']).option("header", "true").csv(f"{data_dir}/deportista.csv")
dp2 = sql_context.read.schema(schemas['deportista']).option("header", "false").csv(f"{data_dir}/deportista2.csv")

dataframes = {
    "deporte": sql_context.read.schema(schemas['deporte']).option("header", "true").csv(f"{data_dir}/deporte.csv"),
    "deportista": dp1.union(dp2),
    "evento": sql_context.read.schema(schemas['evento']).option("header", "true").csv(f"{data_dir}/evento.csv"),
    "juegos": sql_context.read.schema(schemas['juegos']).option("header", "true").csv(f"{data_dir}/juegos.csv"),
    "paises": sql_context.read.schema(schemas['paises']).option("header", "true").csv(f"{data_dir}/paises.csv"),
    "resultados": sql_context.read.schema(schemas['resultados']).option("header", "true").csv(f"{data_dir}/resultados.csv")
}
del(dp1, dp2)

Ejemplo show

dataframes['deporte'].show(5)

Comparto mi soluci贸n al reto planteado

#Creo el schema de mi juegoDF
juegoSchema = StructType([
    StructField("juego_id", IntegerType(), False),
    StructField("nombre_juego", StringType(), False),
    StructField("annio", IntegerType(), False),
    StructField("temporada", StringType(), False),
    StructField("ciudad", StringType(), False)
])

#Cargo juegoDF
juegoDF = sqlContext.read.schema(juegoSchema)\
    .option("header","true").csv('files/juegos.csv')

Para el caso de deportista deportistaDF1 y deportistaDF2 utilizo el mismo schema ya que contienen informaci贸nestructurada de la misma manera.

deportistaSchema = StructType([
    StructField("deportista_id", IntegerType(), False),
    StructField("nombre", StringType(), False),
    StructField("genero", IntegerType(), False),
    StructField("edad", IntegerType(), False),
    StructField("altura", IntegerType(), False),
    StructField("peso", FloatType(), False),
    StructField("equipo_id", IntegerType(), False)
])


deportistaDF1 = sqlContext.read.schema(deportistaSchema)\
    .option("header","true").csv('files/deportista.csv')

deportistaDF2 = sqlContext.read.schema(deportistaSchema)\
    .option("header","true").csv('files/deportista2.csv')

deportistaDF = deportistaDF1.union(deportistaDF2)
resultadoSchemaF = StructType([
    StructField("resultado_id", IntegerType(), False),
    StructField("medalla", StringType(), False),
    StructField("deportista_id", IntegerType(), False),
    StructField("juego_id", IntegerType(), False),
    StructField("evento_id", IntegerType(), False),
])
resultadoDF = sqlContext.read.schema(resultadoSchemaF)\
    .option("header","true").csv('files/resultados.csv')

Si alguien no entendi贸 muy bien como quit贸 el encabezado dejo mis notas:

deportistaOlimpicoRDD.take(5)

Tiene encabezado

[['deportista_id', 'nombre', 'genero', 'edad', 'altura', 'peso', 'equipo_id'],
 ['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278']]
<h3>Remove head from RDD</h3>
  1. create a funtion that make a iteration for each value since row number 1 with out head
def eliminarEncabezado(indice, iterador):
  return iter(list(iterador)[1:])
  1. Cut RDD in two partitions and take the second partition that was created with the las function
deportistaOlimpicoRDD = deportistaOlimpicoRDD.mapPartitionsWithIndex(eliminarEncabezado)
deportistaOlimpicoRDD.take(5)

Sin encabezado

[['1', 'A Dijiang', '1', '24', '180', '80', '199'],
 ['2', 'A Lamusi', '1', '23', '170', '60', '199'],
 ['3', 'Gunnar Nielsen Aaby', '1', '24', '0', '0', '273'],
 ['4', 'Edgar Lindenau Aabye', '1', '34', '0', '0', '278'],
 ['5', 'Christine Jacoba Aaftink', '2', '21', '185', '82', '705']]

A mi me est谩 saliendo todo bien, pero al momento de darle

.show(5)

dentro de:

deportistaDF = sqlContext.createDataFrame(deportistaOlimpicoRDD,schema).show(5)

Me sale el siguiente error:

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-20-49619bf78f62> in <module>
----> 1 deportistaDF.show(2)

~/Documents/platzi/curso_spark/spark-3.0.1-bin-hadoop2.7/python/pyspark/sql/dataframe.py in show(self, n, truncate, vertical)
    438         """
    439         if isinstance(truncate, bool) and truncate:
--> 440             print(self._jdf.showString(n, 20, vertical))
    441         else:
    442             print(self._jdf.showString(n, int(truncate), vertical))

~/Documents/platzi/curso_spark/spark-3.0.1-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name)
   1306 
   1307         for temp_arg in temp_args:

~/Documents/platzi/curso_spark/spark-3.0.1-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
    126     def deco(*a, **kw):
    127         try:
--> 128             return f(*a, **kw)
    129         except py4j.protocol.Py4JJavaError as e:
    130             converted = convert_exception(e.java_exception)

~/Documents/platzi/curso_spark/spark-3.0.1-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o85.showString.
: java.lang.IllegalStateException: SparkContext has been shutdown
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2091)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2120)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2139)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:467)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:420)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2697)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2904)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:300)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:337)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:64)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:564)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.base/java.lang.Thread.run(Thread.java:832)

Creo que est谩 todo bien porque todo funciona y si solo le pongo, por ejemplo:

deportistaDF

Me aparece lo siguiente:

DataFrame[deportista_id: int, nombre: string, genero: int, edad: int, altura: int, peso: float, equipo_id: int]

Seguir茅 con el curso, espero sea un error m铆nimo.

Por cierto, este es mi c贸digo, seg煤n yo es igual al del profesor, pero pues uno nunca sabe, a la mejor se me pas贸 algo, sobretodo en las primeras l铆neas porque el profe no muestra el inicio de su jupyter notebook:

from pyspark import SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, FloatType
from pyspark.sql.types import Row
sc = SparkContext(master="local", appName="RDD_to_df")
path = "repositorio/files/"

deportistaOlimpicoRDD = sc.textFile(path+"deportista.csv") \
    .map(lambda l : l.split(","))
deportistaOlimpicoRDD.take(5)
#Removemos el encabezado con la siguiente funci贸n.
#retorna una lista limpia que ya posee los valores que nosostros queremos.
#iter nos devuelve valor a valor de lo que nosotros procesemos.
#para asegurarnos que no tenemos ninguna falla de contenci贸n de errores vamos a transformar el par谩metro que nosotros
#recibamos como lista. DEvolvemos a partir del primer valor de RDD, es decir del 铆ndice 1 -> [1:]

def eliminaEncabezado(indice, interador):
    return iter(list(interador)[1:])
#mapPartitionWithIndex le pasa dos par谩metros a la funci贸n que nosotros le apliquemos.
#1. Toda la columna y 2. Un valor por 铆ndice
deportistaOlimpicoRDD = deportistaOlimpicoRDD.mapPartitionsWithIndex(eliminaEncabezado)
#ya no tiene encabezado
deportistaOlimpicoRDD.take(6)
#Antes de transformar el RDD, debemos transformar los valores del mismo.
#Sobre deportistaOlimpicoRDD haremos un mapeo y rasnformaremos los valores:

deportistaOlimpicoRDD = deportistaOlimpicoRDD.map(lambda x: (
int(x[0]),
x[1],
int(x[2]),
int(x[3]),
int(x[4]),
float(x[5]),
int(x[6])    
))
schema = StructType([
    StructField("deportista_id",IntegerType(),False),
    StructField("nombre",StringType(),False),
    StructField("genero",IntegerType(),False),
    StructField("edad",IntegerType(),False),
    StructField("altura",IntegerType(),False),
    StructField("peso",FloatType(),False),
    StructField("equipo_id",IntegerType(),False)
])

Y aqu铆 viene el error:

deportistaDF = sqlContext.createDataFrame(deportistaOlimpicoRDD,schema).show(5)
<h3>Reto</h3>
  1. Crear los schemas
# create Schemas

deporte_schema = StructType([
          StructField("deporte_id",IntegerType(),False),
          StructField("deporte", StringType(),False)
])

evento_schema = StructType([
          StructField("evento_id",IntegerType(),False),
          StructField("evento",StringType(),False),
          StructField("deporte_id",IntegerType(),False)
])

paises_schema = StructType([
          StructField("id",IntegerType(),False),
          StructField("equipo",StringType(),False),
          StructField("sigla",StringType(),False)
])

resultados_schema = StructType([
          StructField("resultado_id",IntegerType(),False),
          StructField("medalla",StringType(),False),
          StructField("deportista_id",IntegerType(),False),
          StructField("juego_id",IntegerType(),False),
          StructField("evento_id",IntegerType(),False)
])
  1. Crear dataframes
# create dataframes
deporteDF = sqlContext.read.schema(deporte_schema).option('header','true').csv(path+'deporte.csv')
eventoDF = sqlContext.read.schema(evento_schema).option('header','true').csv(path+'evento.csv')
paisesDF = sqlContext.read.schema(paises_schema).option('header','true').csv(path+'paises.csv')
resultadosDF = sqlContext.read.schema(resultados_schema).option('header','true').csv(path+'resultados.csv')
  1. Show results
juegoDF.show(5)
deporteDF.show(5)
eventoDF.show(5)
paisesDF.show(5)
resultadosDF.show(5)
<h4>Output</h4>

卤-------卤----------卤--------卤-----+
|juego_id| anio|temporada|ciudad|
卤-------卤----------卤--------卤-----+
| 1|1896 Verano| 1896|Verano|
| 2|1900 Verano| 1900|Verano|
| 3|1904 Verano| 1904|Verano|
| 4|1906 Verano| 1906|Verano|
| 5|1908 Verano| 1908|Verano|
卤-------卤----------卤--------卤-----+
only showing top 5 rows

卤---------卤------------+
|deporte_id| deporte|
卤---------卤------------+
| 1| Basketball|
| 2| Judo|
| 3| Football|
| 4| Tug-Of-War|
| 5|Speed Skating|
卤---------卤------------+
only showing top 5 rows

卤--------卤-------------------卤---------+
|evento_id| evento|deporte_id|
卤--------卤-------------------卤---------+
| 1|Basketball Men鈥檚 鈥 1|
| 2|Judo Men鈥檚 Extra-鈥 2|
| 3|Football Men鈥檚 Fo鈥 3|
| 4|Tug-Of-War Men鈥檚 鈥 4|
| 5|Speed Skating Wom鈥 5|
卤--------卤-------------------卤---------+
only showing top 5 rows

卤鈥撀-------------------卤----+
| id| equipo|sigla|
卤鈥撀-------------------卤----+
| 1| 30. Februar| AUT|
| 2|A North American 鈥 MEX|
| 3| Acipactli| MEX|
| 4| Acturus| ARG|
| 5| Afghanistan| AFG|
卤鈥撀-------------------卤----+
only showing top 5 rows

卤-----------卤------卤------------卤-------卤--------+
|resultado_id|medalla|deportista_id|juego_id|evento_id|
卤-----------卤------卤------------卤-------卤--------+
| 1| NA| 1| 39| 1|
| 2| NA| 2| 49| 2|
| 3| NA| 3| 7| 3|
| 4| Gold| 4| 2| 4|
| 5| NA| 5| 36| 5|
卤-----------卤------卤------------卤-------卤--------+
only showing top 5 rows