Kinesis, EMR y Glue: Procesamiento de Datos a Gran Escala en AWS

Nexia Bank observaba un aumento significativo en el uso de su plataforma digital durante eventos clave como lanzamientos de productos o cambios en tasas de interés. Durante estos picos, más de 2 millones de usuarios interactuaban simultáneamente con su app y portal web, generando millones de eventos relacionados con transferencias, consultas de saldo, y solicitudes de productos.

Para garantizar una experiencia fluida y detectar comportamientos sospechosos en tiempo real, implementaron Amazon Kinesis Data Streams para capturar cada evento generado por los usuarios. Estos eventos eran procesados de inmediato usando AWS Lambda, lo que permitía tomar decisiones como enviar alertas antifraude o ajustar dinámicamente los recursos de la aplicación. Los datos también eran enviados a Amazon S3 a través de Kinesis Firehose, donde servicios como AWS Glue y Amazon EMR permitían análisis históricos sobre comportamiento transaccional, carga del sistema y patrones de uso.

Gracias a esta arquitectura en tiempo real, Nexia Bank redujo las interrupciones del servicio en un 70%, detectó actividades inusuales de forma proactiva, y mejoró la personalización de su experiencia digital, ofreciendo productos relevantes basados en el comportamiento actual del cliente.

Amazon Kinesis: Ingesta y Procesamiento de Datos en Tiempo Real

Amazon Kinesis es una plataforma para procesar datos de streaming en tiempo real a escala. Permite capturar, procesar y analizar flujos de datos en tiempo real, desde registros de aplicaciones hasta métricas de infraestructura y datos de sensores IoT.

Screenshot (41).png

Kinesis Data Streams

Kinesis Data Streams es un servicio de streaming de datos en tiempo real que puede capturar continuamente gigabytes de datos por segundo desde cientos de miles de fuentes.

Características principales:

  • Retención de datos configurable (24 horas a 365 días)
  • Capacidad basada en shards (1MB/s de entrada, 2MB/s de salida por shard)
  • Múltiples consumidores para el mismo stream
  • Procesamiento en orden dentro de cada shard
  • Replicación automática en 3 AZs
  • Integración nativa con Lambda, KCL, Flink
// Ejemplo de productor de Kinesis en Java PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName("MiDataStream"); List<PutRecordsRequestEntry> recordsList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry record = new PutRecordsRequestEntry(); record.setData(ByteBuffer.wrap(String.format("Evento %d", i).getBytes())); record.setPartitionKey(String.format("partitionKey-%d", i)); recordsList.add(record); } putRecordsRequest.setRecords(recordsList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest);

Kinesis Data Firehose

Kinesis Data Firehose es un servicio completamente gestionado para entregar datos de streaming en tiempo real a destinos como S3, Redshift, Elasticsearch y Splunk.

Características principales:

  • Sin administración de capacidad (escalado automático)
  • Transformación de datos con Lambda
  • Compresión y conversión de formato (Parquet, ORC)
  • Entrega por lotes configurable
  • Reintentos automáticos
  • Cifrado en tránsito y en reposo
# Ejemplo de configuración de Firehose en CloudFormation Resources: AnalyticsFirehose: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamName: AnalyticsEvents DeliveryStreamType: DirectPut S3DestinationConfiguration: BucketARN: !GetAtt AnalyticsBucket.Arn BufferingHints: IntervalInSeconds: 60 SizeInMBs: 5 CompressionFormat: GZIP Prefix: "year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/" ErrorOutputPrefix: "errors/!{firehose:error-output-type}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/" RoleARN: !GetAtt FirehoseRole.Arn

Data Streams vs. Firehose: Comparativa

CaracterísticaKinesis Data StreamsKinesis Data Firehose
Caso de usoProcesamiento personalizado en tiempo realEntrega a destinos específicos
LatenciaMilisegundosMínimo 60 segundos (por lotes)
DesarrolloRequiere código de consumidorSin código (configuración)
EscaladoManual (añadir/eliminar shards)Automático
Retención24 horas - 365 díasSin retención (entrega inmediata)
ConsumidoresMúltiples aplicacionesDestinos predefinidos
ReintentoGestionado por la aplicaciónAutomático

Amazon EMR: Procesamiento de Big Data

Amazon EMR (Elastic MapReduce) es una plataforma de clúster gestionada que simplifica la ejecución de frameworks de big data como Apache Hadoop, Apache Spark, HBase, Presto y Flink para procesar y analizar grandes cantidades de datos.

Screenshot (40).png

Arquitectura de EMR

EMR utiliza una arquitectura de clúster con tres tipos de nodos:

  • Nodo maestro: Coordina la distribución de datos y tareas
  • Nodos core: Ejecutan tareas y almacenan datos en HDFS
  • Nodos task: Solo ejecutan tareas, sin almacenamiento
┌─────────────────┐ │ │ Nodo Maestro│ │ └────────┬────────┘ ┌────────┴────────┐ │ │ Nodos Core│ │ └────────┬────────┘ ┌────────┴────────┐ │ │ Nodos Task│ │ └─────────────────┘

Casos de Uso de EMR

  • Procesamiento de logs a gran escala
  • Análisis de clickstream
  • Aprendizaje automático
  • Transformaciones ETL
  • Indexación y búsqueda
  • Análisis genómico
  • Procesamiento de datos financieros

Configuración y Despliegue

EMR ofrece múltiples opciones de despliegue:

  • Clústeres transitorios: Se crean para un trabajo específico y se terminan al finalizar
  • Clústeres de larga duración: Permanecen activos para múltiples trabajos
  • EMR Serverless: Entorno sin servidor para ejecutar aplicaciones Spark y Hive
# Ejemplo de creación de clúster EMR con AWS CLI aws emr create-cluster \\ --name "Cluster de Análisis" \\ --release-label emr-6.5.0 \\ --applications Name=Spark Name=Hive Name=Pig \\ --ec2-attributes KeyName=mi-clave \\ --instance-type m5.xlarge \\ --instance-count 3 \\ --use-default-roles

Integración con Servicios AWS

EMR se integra con varios servicios AWS:

  • S3: Almacenamiento duradero para datos de entrada/salida
  • DynamoDB: Acceso a datos NoSQL
  • RDS: Integración con bases de datos relacionales
  • Lake Formation: Control de acceso a datos
  • CloudWatch: Monitoreo y alertas
  • IAM: Control de acceso y seguridad

Ejemplo de Trabajo Spark en EMR

# Ejemplo de trabajo Spark para análisis de datos from pyspark.sql import SparkSession spark = SparkSession.builder \\ .appName("Análisis de Eventos") \\ .getOrCreate() # Leer datos de S3 eventos = spark.read.json("s3://mi-bucket/eventos/") # Transformar y analizar resultados = eventos.filter(eventos.tipo == "compra") \\ .groupBy("producto_id") \\ .agg({"precio": "sum", "cantidad": "sum"}) \\ .orderBy("sum(precio)", ascending=False) # Escribir resultados en S3 resultados.write.parquet("s3://mi-bucket/resultados/")

AWS Glue: ETL Serverless y Catálogo de Datos

AWS Glue es un servicio de ETL (Extract, Transform, Load) completamente gestionado que facilita la preparación y carga de datos para análisis. Screenshot (39).png

Componentes Principales de Glue

  1. Glue Data Catalog: Repositorio centralizado de metadatos
  2. Glue Crawlers: Descubren y catalogan automáticamente fuentes de datos
  3. Glue ETL: Genera código para transformar, limpiar y enriquecer datos
  4. Glue DataBrew: Preparación visual de datos sin código
  5. Glue Studio: Interfaz visual para crear, ejecutar y monitorear trabajos ETL

Glue Data Catalog

El Data Catalog es un repositorio centralizado que almacena metadatos sobre fuentes de datos, esquemas y transformaciones.

# Ejemplo de definición de base de datos y tabla en Glue Resources: AnalyticsDatabase: Type: AWS::Glue::Database Properties: CatalogId: !Ref AWS::AccountId DatabaseInput: Name: analytics_db Description: Base de datos para análisis de eventos EventsTable: Type: AWS::Glue::Table Properties: CatalogId: !Ref AWS::AccountId DatabaseName: !Ref AnalyticsDatabase TableInput: Name: eventos Description: Tabla de eventos de usuario TableType: EXTERNAL_TABLE Parameters: { "classification": "parquet", "compressionType": "none" } StorageDescriptor: Location: s3://mi-bucket/eventos/ InputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat OutputFormat: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat SerdeInfo: SerializationLibrary: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe Columns: - Name: evento_id Type: string - Name: usuario_id Type: string - Name: timestamp Type: timestamp - Name: tipo_evento Type: string - Name: datos Type: struct<producto_id:string,categoria:string,precio:double>

Trabajos ETL en Glue

Los trabajos ETL en Glue pueden escribirse en Python o Scala, y utilizan Apache Spark en segundo plano.

# Ejemplo de trabajo ETL en Glue import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job # Inicializar contexto de Glue args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) # Extraer datos eventos_dynamicframe = glueContext.create_dynamic_frame.from_catalog( database = "analytics_db", table_name = "eventos_raw" ) # Transformar datos eventos_filtrados = Filter.apply( frame = eventos_dynamicframe, f = lambda x: x["tipo_evento"] in ["compra", "vista_detalle"] ) # Aplicar mapping para transformar estructura eventos_mapeados = ApplyMapping.apply( frame = eventos_filtrados, mappings = [ ("evento_id", "string", "id", "string"), ("usuario_id", "string", "usuario", "string"), ("timestamp", "timestamp", "fecha", "timestamp"), ("tipo_evento", "string", "accion", "string"), ("datos.producto_id", "string", "producto", "string"), ("datos.precio", "double", "valor", "decimal(10,2)") ] ) # Cargar datos transformados glueContext.write_dynamic_frame.from_options( frame = eventos_mapeados, connection_type = "s3", connection_options = { "path": "s3://mi-bucket/eventos-procesados/" }, format = "parquet" ) job.commit()

Glue vs. EMR: Cuándo Usar Cada Uno

AspectoAWS GlueAmazon EMR
Modelo de servicioServerlessBasado en clústeres
Caso de usoETL programado, catálogo de datosProcesamiento big data, análisis avanzado
AdministraciónTotalmente gestionadoRequiere configuración de clúster
EscalabilidadAutomáticaManual o auto-scaling
FrameworksSpark, Python, ScalaHadoop, Spark, Hive, Presto, etc.
CostoPago por uso (tiempo de ejecución)Pago por hora de instancia
PersistenciaSin estadoPuede mantener estado en HDFS

Patrones de Análisis en Tiempo Real

Arquitectura de Referencia: Análisis Near Real-Time

┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │ │ │ │ │ │ │ Fuentes │────►│ Kinesis │────►│ Lambda │────►│ S3│ de Datos │ │ Data │ │ │ │ │ │ │ │ Streams │ │ │ │ │ └───────────┘ └─────┬─────┘ └───────────┘ └─────┬─────┘ │ │ ▼ ▼ ┌───────────┐ ┌───────────┐ │ │ │ │ Kinesis │ │ GlueAnalytics │ │ Crawler │ │ │ │ └───────────┘ └─────┬─────┘ ┌───────────┐ │ │ Athena │ │ └─────┬─────┘ ┌───────────┐ │ │ QuickSight│ │ │ └───────────┘

Patrón 1: Procesamiento en Tiempo Real con Lambda

Este patrón utiliza Kinesis Data Streams y Lambda para procesar datos en tiempo real:

  1. Los datos se ingieren en Kinesis Data Streams
  2. Lambda consume los registros y realiza procesamiento inmediato
  3. Los resultados se almacenan en DynamoDB para acceso de baja latencia
  4. Paralelamente, los datos se envían a S3 vía Firehose para análisis histórico
// Ejemplo de función Lambda para procesar eventos de Kinesis exports.handler = async (event) => { const processedRecords = []; for (const record of event.Records) { // Decodificar datos de Base64 const payload = Buffer.from(record.kinesis.data, 'base64').toString('utf-8'); const data = JSON.parse(payload); // Procesar datos (ejemplo: enriquecimiento, filtrado) if (data.eventType === 'purchase') { // Calcular métricas data.totalValue = data.quantity * data.price; data.processingTime = new Date().toISOString(); // Almacenar en DynamoDB para acceso en tiempo real await storeInDynamoDB(data); // Añadir a resultados procesados processedRecords.push(data); } } // Opcional: Enviar datos agregados a otro stream if (processedRecords.length > 0) { await sendToAggregationStream(processedRecords); } return { recordsProcessed: event.Records.length, validRecords: processedRecords.length }; };

Patrón 2: Análisis con Kinesis Data Analytics

Kinesis Data Analytics permite realizar análisis SQL en tiempo real sobre streams de datos:

  1. Datos ingresados en Kinesis Data Streams
  2. Kinesis Data Analytics ejecuta consultas SQL continuas
  3. Resultados enviados a otro stream o directamente a Firehose
  4. Firehose entrega los datos a S3, Redshift o Elasticsearch
-- Ejemplo de consulta SQL en Kinesis Data Analytics CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( event_time TIMESTAMP, category VARCHAR(16), product_id VARCHAR(24), total_sales DOUBLE, transaction_count INTEGER ); -- Ventana deslizante de 1 minuto con actualización cada 10 segundos CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE) AS event_time, "category", "product_id", SUM("price") AS total_sales, COUNT(*) AS transaction_count FROM "SOURCE_SQL_STREAM_001" WHERE "event_type" = 'purchase' GROUP BY FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE), "category", "product_id" WINDOW TUMBLING (SIZE 1 MINUTE);

Patrón 3: Análisis Batch con Athena

Este patrón utiliza Athena para consultar datos almacenados en S3:

  1. Datos enviados a S3 mediante Firehose
  2. Glue Crawler cataloga automáticamente los datos
  3. Athena ejecuta consultas SQL sobre los datos en S3
  4. QuickSight visualiza los resultados
-- Ejemplo de consulta Athena sobre datos particionados SELECT date_trunc('hour', event_time) AS hour, category, COUNT(*) AS event_count, SUM(CASE WHEN event_type = 'purchase' THEN 1 ELSE 0 END) AS purchases, SUM(CASE WHEN event_type = 'purchase' THEN price ELSE 0 END) AS revenue FROM analytics_db.eventos WHERE year = '2023' AND month = '06' AND day BETWEEN '01' AND '07' GROUP BY date_trunc('hour', event_time), category ORDER BY hour, revenue DESC;

Optimización de Costos y Escalado

Estrategias de Optimización para Kinesis

  1. Dimensionamiento adecuado de shards:
    • Monitorear métricas de utilización (GetRecords.IteratorAgeMilliseconds)
    • Utilizar on-demand para cargas variables
    • Implementar auto-scaling con CloudWatch y Lambda
  2. Batch de registros:
    • Agrupar múltiples eventos en un solo registro
    • Utilizar compresión para reducir tamaño de datos
  3. Enhanced fan-out:
    • Usar solo cuando se necesite baja latencia con múltiples consumidores
    • Considerar el costo adicional vs. beneficio

Optimización de EMR

  1. Tipos de instancias:
    • Usar instancias spot para nodos task (hasta 90% de descuento)
    • Instancias optimizadas para memoria/computación según workload
  2. Ciclo de vida del clúster:
    • Clústeres transitorios para trabajos programados
    • Auto-termination para evitar costos innecesarios
  3. Almacenamiento:
    • Usar S3 en lugar de HDFS cuando sea posible
    • Compresión de datos (Snappy, GZIP)
    • Formatos columnar (Parquet, ORC)
# Ejemplo de configuración de EMR con instancias spot Resources: EMRCluster: Type: AWS::EMR::Cluster Properties: Name: OptimizedAnalyticsCluster ReleaseLabel: emr-6.5.0 Applications: - Name: Spark - Name: Hive Instances: MasterInstanceGroup: InstanceCount: 1 InstanceType: m5.xlarge Market: ON_DEMAND CoreInstanceGroup: InstanceCount: 2 InstanceType: r5.2xlarge Market: ON_DEMAND TaskInstanceGroups: - Name: SpotTasks InstanceCount: 10 InstanceType: c5.2xlarge Market: SPOT BidPrice: "0.5" AutoTerminationPolicy: IdleTimeout: 3600

Optimización de Glue

  1. Configuración de trabajos:
    • Ajustar número de DPUs según volumen de datos
    • Utilizar marcadores de trabajo
# Ejemplo de configuración de trabajo Glue con optimizaciones glue_job = glue.CfnJob( self, "OptimizedETLJob", name="datos_optimizados_etl", role=glue_role.role_arn, command=glue.CfnJob.JobCommandProperty( name="glueetl", python_version="3", script_location=f"s3://{script_bucket.bucket_name}/scripts/optimized_etl.py" ), default_arguments={ "--enable-metrics": "true", "--enable-continuous-cloudwatch-log": "true", "--job-bookmark-option": "job-bookmark-enable", "--enable-spark-ui": "true", "--spark-event-logs-path": f"s3://{logging_bucket.bucket_name}/spark-logs/", "--enable-job-insights": "true", "--enable-glue-datacatalog": "true", "--conf": "spark.sql.adaptive.enabled=true" }, execution_property=glue.CfnJob.ExecutionPropertyProperty( max_concurrent_runs=5 ), glue_version="3.0", max_retries=2, timeout=120, worker_type="G.1X", number_of_workers=10 )

Patrones de Escalado Eficiente

  1. Escalado predictivo:
    • Analizar patrones históricos
    • Escalar proactivamente antes de picos conocidos
  2. Procesamiento por lotes:
    • Acumular datos para procesamiento eficiente
    • Balancear latencia vs. eficiencia
  3. Arquitectura de capas:
    • Capa de velocidad para análisis inmediato
    • Capa de servicio para consultas frecuentes
    • Capa por lotes para análisis profundo

El procesamiento de datos a gran escala en AWS requiere una combinación estratégica de servicios como Kinesis, EMR y Glue. Cada uno tiene sus fortalezas y casos de uso óptimos. Kinesis brilla en la ingesta y procesamiento en tiempo real, EMR ofrece potencia y flexibilidad para cargas de trabajo de big data complejas, mientras que Glue simplifica las tareas ETL con un enfoque serverless.

La clave para implementar soluciones exitosas está en entender los patrones de datos, los requisitos de latencia y las consideraciones de costo. Una arquitectura bien diseñada combinará estos servicios de manera que aproveche sus fortalezas individuales mientras optimiza el rendimiento y el costo general.

Al adoptar las prácticas recomendadas para el dimensionamiento, la configuración y la optimización de estos servicios, podemos construir pipelines de datos robustos y eficientes que escalan con las necesidades del negocio y proporcionan insights valiosos en el momento oportuno.