Redshift, Athena, Lake formation

Clase 67 de 72Curso de AWS Certified Solutions Architect Associate

Redshift, Athena y Lake Formation: Análisis de Datos a Escala en AWS

Nexiabank enfrentaba el desafío de analizar más de 5 TB de datos transaccionales diarios. Los analistas necesitaban ejecutar consultas complejas sobre datos históricos mientras los equipos de marketing requerían acceso a datos recientes para campañas en tiempo real. Implementaron una arquitectura híbrida: Redshift para análisis estructurados de alto rendimiento, Athena para consultas ad-hoc sobre datos en S3, y Lake Formation para gestionar permisos y gobernanza. Esta combinación redujo sus costos de análisis en un 40% mientras aceleró el tiempo de obtención de insights de días a minutos.

Amazon Redshift: Data Warehouse Escalable

Amazon Redshift es un servicio de data warehouse completamente gestionado que permite analizar datos estructurados y semiestructurados a escala de petabytes utilizando SQL estándar y herramientas de Business Intelligence existentes.

Arquitectura de Redshift

Redshift utiliza una arquitectura de procesamiento paralelo masivo (MPP) que distribuye y paraleliza las consultas entre múltiples nodos:

┌───────────────────────────────────────────────────────────┐ Cliente SQL└───────────────────────────────────────────────────────────┘ ┌───────────────────────────────────────────────────────────┐ Nodo Leader│ │ │ • Planificación de consultas │ │ • Compilación de consultas │ │ • Coordinación de ejecución │ │ • Agregación de resultados │ └───────────────────────────────────────────────────────────┘ ┌─────────────────┼─────────────────┐ │ │ │ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ Nodo Compute 1 │ │ Nodo Compute 2 │ │ Nodo Compute n │ │ │ │ │ │ │ │ • Almacenamiento│ │ • Almacenamiento│ │ • Almacenamiento│ │ • Procesamiento │ │ • Procesamiento │ │ • Procesamiento└─────────────────┘ └─────────────────┘ └─────────────────┘

Tipos de Nodos

  1. Nodo Leader:
    • Recibe consultas de las aplicaciones cliente
    • Desarrolla planes de ejecución
    • Coordina la ejecución paralela de consultas
    • Agrega resultados intermedios
  2. Nodos Compute (o de cómputo):
    • Ejecutan el plan de consulta
    • Almacenan datos en columnas
    • Realizan procesamiento local
    • Envían resultados intermedios al nodo leader

Tipos de Clústeres

Redshift ofrece diferentes tipos de clústeres según las necesidades:

  1. RA3: Nodos con almacenamiento gestionado (separación de cómputo y almacenamiento)
    • ra3.16xlarge, ra3.4xlarge, ra3.xlplus
    • Ideal para cargas de trabajo variables y grandes volúmenes de datos
  2. DC2: Nodos con almacenamiento local SSD
    • dc2.large, dc2.8xlarge
    • Ideal para conjuntos de datos más pequeños con alta intensidad de cómputo
-- Ejemplo de creación de un clúster Redshift CREATE CLUSTER mi_datawarehouse NODE_TYPE ra3.4xlarge NUMBER_OF_NODES 4 MASTER_USERNAME admin MASTER_USER_PASSWORD MySecurePassword123 ENCRYPTED TRUE KMS_KEY_ID arn:aws:kms:us-east-1:123456789012:key/abcd1234-ef56-gh78-ij90-klmn1234pqrs;

Distribución y Ordenación de Datos

La distribución de datos entre nodos es crucial para el rendimiento:

  1. Estilos de distribución:
    • EVEN: Distribuye filas uniformemente (round-robin)
    • KEY: Distribuye según valores de una columna
    • ALL: Replica toda la tabla en cada nodo (para tablas pequeñas)
    • AUTO: Redshift elige automáticamente
  2. Claves de ordenación:
    • SORTKEY: Define cómo se ordenan los datos físicamente
    • COMPOUND: Ordena por múltiples columnas (por defecto)
    • INTERLEAVED: Da igual peso a cada columna de ordenación
-- Ejemplo de creación de tabla con distribución y ordenación CREATE TABLE ventas ( fecha_venta DATE, region VARCHAR(20), producto_id INTEGER, vendedor_id INTEGER, cantidad INTEGER, precio DECIMAL(10,2), total DECIMAL(10,2) ) DISTKEY(region) SORTKEY(fecha_venta, producto_id);

Características Avanzadas

  1. Concurrency Scaling: Añade capacidad de procesamiento para manejar aumentos en usuarios concurrentes
  2. Redshift Spectrum: Consulta datos directamente en S3 sin necesidad de cargarlos
  3. Materialización automática de vistas: Mejora el rendimiento de consultas complejas
  4. Compresión automática: Optimiza el almacenamiento basado en patrones de datos
  5. AQUA (Advanced Query Accelerator): Aceleración de hardware para consultas
-- Ejemplo de consulta usando Redshift Spectrum -- Combina datos en Redshift con datos en S3 SELECT v.fecha_venta, v.region, v.total, c.clima FROM ventas v JOIN spectrum.clima_historico c ON v.fecha_venta = c.fecha AND v.region = c.region WHERE v.fecha_venta BETWEEN '2023-01-01' AND '2023-03-31' AND c.temperatura > 25 ORDER BY v.total DESC LIMIT 100;

Amazon Athena: Consultas SQL Serverless

Amazon Athena es un servicio de consultas interactivo que facilita el análisis de datos en Amazon S3 utilizando SQL estándar, sin necesidad de cargar los datos o gestionar infraestructura.

Características Principales

  • Serverless: Sin infraestructura que gestionar
  • Pago por consulta: Solo se paga por los datos escaneados
  • SQL estándar: Compatible con ANSI SQL
  • Integración nativa con S3: Consulta directa sobre datos en S3
  • Formatos diversos: CSV, JSON, ORC, Parquet, Avro, etc.
  • Federación de datos: Consultas a fuentes externas (RDS, DynamoDB, etc.)
-- Ejemplo de consulta en Athena SELECT year, month, region, product_category, SUM(sales_amount) as total_sales, COUNT(DISTINCT customer_id) as unique_customers FROM sales_data WHERE year = '2023' AND month BETWEEN '01' AND '06' GROUP BY year, month, region, product_category ORDER BY total_sales DESC LIMIT 10;

Optimización de Rendimiento

  1. Particionamiento: Dividir datos por columnas comunes de filtrado
-- Creación de tabla particionada en Athena CREATE EXTERNAL TABLE IF NOT EXISTS sales_data ( transaction_id STRING, customer_id STRING, product_id STRING, product_category STRING, sales_amount DECIMAL(10,2), region STRING ) PARTITIONED BY (year STRING, month STRING, day STRING) STORED AS PARQUET LOCATION 's3://mi-bucket-datos/ventas/'; -- Añadir particiones ALTER TABLE sales_data ADD PARTITION (year='2023', month='06', day='01') LOCATION 's3://mi-bucket-datos/ventas/year=2023/month=06/day=01/';
  1. Formatos columnar: Usar Parquet u ORC en lugar de CSV o JSON
# Ejemplo de conversión a Parquet usando AWS Glue aws glue start-job-run --job-name convert-to-parquet --arguments='--source_path=s3://mi-bucket/raw/,--target_path=s3://mi-bucket/optimized/'
  1. Compresión: Utilizar compresión adecuada (Snappy para Parquet)
  2. Tamaño de archivo: Mantener archivos entre 100MB-1GB

Casos de Uso de Athena

  • Análisis ad-hoc: Consultas exploratorias sin preparación previa
  • Dashboards: Alimentar visualizaciones con consultas programadas
  • Análisis de logs: Consultar logs almacenados en S3
  • Data lakes: Explorar datos en lagos de datos
  • ETL bajo demanda: Transformar datos cuando se necesiten
# Ejemplo de uso de Athena desde Python import boto3 import pandas as pd import io athena_client = boto3.client('athena', region_name='us-east-1') s3_client = boto3.client('s3', region_name='us-east-1') # Ejecutar consulta query_response = athena_client.start_query_execution( QueryString='SELECT * FROM sales_data WHERE year=\\'2023\\' LIMIT 10', QueryExecutionContext={'Database': 'analytics_db'}, ResultConfiguration={'OutputLocation': 's3://mi-bucket-resultados/'} ) # Obtener ID de ejecución query_execution_id = query_response['QueryExecutionId'] # Esperar a que termine la consulta import time while True: query_status = athena_client.get_query_execution(QueryExecutionId=query_execution_id) state = query_status['QueryExecution']['Status']['State'] if state in ['SUCCEEDED', 'FAILED', 'CANCELLED']: break time.sleep(1) # Obtener resultados if state == 'SUCCEEDED': result_location = query_status['QueryExecution']['ResultConfiguration']['OutputLocation'] # Extraer nombre del bucket y clave del objeto path_parts = result_location.replace('s3://', '').split('/', 1) bucket = path_parts[0] key = path_parts[1] # Descargar resultados obj = s3_client.get_object(Bucket=bucket, Key=key) df = pd.read_csv(io.BytesIO(obj['Body'].read())) print(df.head())

Screenshot (42).png

AWS Lake Formation: Gobernanza de Data Lakes

AWS Lake Formation es un servicio que facilita la configuración, protección y gestión de data lakes. Proporciona un punto centralizado para definir políticas de seguridad, gobernanza y auditoría.

Screenshot (43).png

Componentes Principales

  1. Catálogo de datos centralizado: Basado en AWS Glue Data Catalog
  2. Control de acceso granular: Permisos a nivel de tabla, columna y fila
  3. Blueprints: Plantillas para ingestión de datos
  4. Registro de recursos: Gestión de ubicaciones S3 como recursos del data lake
  5. Etiquetas LF: Etiquetas para control de acceso basado en atributos (ABAC)
┌───────────────────────────────────────────────────────────┐ AWS Lake Formation└───────────────────────────────────────────────────────────┘ ┌─────────────────┼─────────────────┐ │ │ │ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ AWS Glue Data │ │ Permisos y │ │ BlueprintsCatalog │ │ Políticas │ │ de Ingestión└─────────────────┘ └─────────────────┘ └─────────────────┘ │ │ │ └─────────────────┼─────────────────┘ ┌─────────────────┼─────────────────┐ │ │ │ ▼ ▼ ▼ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ Redshift │ │ Athena │ │ EMR/Glue/│ │ │ │ │ Otros Servicios└─────────────────┘ └─────────────────┘ └─────────────────┘

Modelo de Permisos

Lake Formation implementa un modelo de permisos granular:

  1. Permisos de datos:
    • SELECT (lectura)
    • INSERT (escritura)
    • DELETE (eliminación)
    • ALTER (modificación de esquema)
    • DROP (eliminación de tabla)
  2. Permisos de metadatos:
    • CREATE_DATABASE
    • CREATE_TABLE
    • ALTER
    • DROP
  3. Niveles de permisos:
    • Base de datos
    • Tabla
    • Columna
    • Fila (mediante expresiones de filtro)
# Ejemplo de configuración de permisos en Lake Formation Resources: AnalyticsPermissions: Type: AWS::LakeFormation::Permissions Properties: DataLakePrincipal: DataLakePrincipalIdentifier: !GetAtt AnalyticsRole.Arn Resource: TableResource: DatabaseName: analytics_db Name: sales_data ColumnWildcard: ExcludedColumnNames: - customer_email - credit_card_number Permissions: - SELECT PermissionsWithGrantOption: - SELECT

Integración con Servicios de Análisis

Lake Formation se integra con múltiples servicios de análisis:

  • Athena: Consultas SQL ad-hoc
  • Redshift Spectrum: Análisis de data warehouse
  • EMR: Procesamiento de big data
  • Glue: ETL y catalogación
  • QuickSight: Visualización

Implementación de Gobernanza de Datos

  1. Registro de ubicaciones de datos:
# Registrar ubicación S3 en Lake Formation import boto3 lake_formation = boto3.client('lakeformation') response = lake_formation.register_resource( ResourceArn='arn:aws:s3:::mi-datalake-bucket', UseServiceLinkedRole=True )
  1. Definición de permisos granulares:
# Otorgar permisos a nivel de columna lake_formation.grant_permissions( Principal={'DataLakePrincipalIdentifier': 'arn:aws:iam::123456789012:role/AnalyticsRole'}, Resource={ 'Table': { 'DatabaseName': 'analytics_db', 'Name': 'customer_data', 'ColumnNames': ['customer_id', 'name', 'region', 'purchase_history'] } }, Permissions=['SELECT'], PermissionsWithGrantOption=[] )
  1. Implementación de etiquetas LF:
# Crear etiqueta LF lake_formation.create_lf_tag( TagKey='Sensitivity', TagValues=['Public', 'Confidential', 'Restricted'] ) # Asignar etiqueta a columnas lake_formation.add_lf_tags_to_resource( Resource={ 'Table': { 'DatabaseName': 'analytics_db', 'Name': 'customer_data', 'ColumnNames': ['email', 'phone', 'address'] } }, LFTags=[ { 'TagKey': 'Sensitivity', 'TagValues': ['Confidential'] } ] ) # Otorgar permisos basados en etiquetas lake_formation.grant_permissions( Principal={'DataLakePrincipalIdentifier': 'arn:aws:iam::123456789012:role/AnalystRole'}, Resource={'LFTagPolicy': { 'Expression': [ { 'TagKey': 'Sensitivity', 'TagValues': ['Public'] } ] }}, Permissions=['SELECT'] )

Optimización de Datos en S3 para Análisis

La estructura y formato de los datos en S3 tiene un impacto significativo en el rendimiento y costo de las consultas.

Particionamiento de Datos

El particionamiento divide los datos en "directorios" basados en valores de columnas específicas:

s3://mi-bucket/datos/ ├── year=2023/ │ ├── month=01/ │ │ ├── day=01/ │ │ │ ├── part-00000.parquet │ │ │ └── part-00001.parquet │ │ └── day=02/ │ │ ├── part-00000.parquet │ │ └── part-00001.parquet │ └── month=02/ │ └── ... └── year=2022/ └── ...

Beneficios:

  • Reduce la cantidad de datos escaneados
  • Mejora el rendimiento de consultas
  • Reduce costos (Athena cobra por TB escaneado)
-- Consulta que aprovecha el particionamiento SELECT product_category, SUM(sales_amount) as total_sales FROM sales_data WHERE year = '2023' AND month = '06' AND region = 'EMEA' GROUP BY product_category ORDER BY total_sales DESC;

Formatos Columnar: Parquet y ORC

Los formatos columnar almacenan datos por columnas en lugar de por filas, lo que ofrece ventajas significativas para análisis:

  1. Apache Parquet:
    • Compresión eficiente por tipo de datos
    • Estadísticas por columna para saltar bloques
    • Ideal para consultas que seleccionan pocas columnas
    • Amplio soporte en ecosistema AWS
  2. ORC (Optimized Row Columnar):
    • Originalmente optimizado para Hive
    • Índices integrados
    • Buen rendimiento para Presto/Athena
# Ejemplo de escritura de Parquet con PyArrow import pyarrow as pa import pyarrow.parquet as pq import pandas as pd # Crear DataFrame df = pd.DataFrame({ 'id': range(1000), 'valor': [i * 2 for i in range(1000)], 'categoria': ['A' if i % 3 == 0 else 'B' if i % 3 == 1 else 'C' for i in range(1000)] }) # Convertir a tabla PyArrow table = pa.Table.from_pandas(df) # Escribir como Parquet con compresión Snappy pq.write_table( table, 'datos.parquet', compression='snappy', row_group_size=100000 # Tamaño de grupo de filas )

Estrategias de Optimización

  1. Tamaño de archivo:
    • Evitar archivos muy pequeños (<128MB)
    • Consolidar archivos pequeños periódicamente
  2. Compresión:
    • Snappy: Buen balance entre velocidad y ratio
    • ZSTD: Mejor compresión, algo más lento
    • GZIP: Alta compresión, más lento
  3. Estadísticas de columnas:
    • Habilitar estadísticas para mejor planificación de consultas
    • Útil para predicados de filtrado
  4. Evolución de esquema:
    • Parquet y ORC soportan evolución de esquema
    • Añadir columnas sin romper consultas existentes
# Ejemplo de compactación de archivos pequeños con AWS Glue aws glue start-job-run --job-name compact-small-files --arguments='--source_path=s3://mi-bucket/datos/year=2023/month=06/,--target_path=s3://mi-bucket/datos-optimizados/year=2023/month=06/,--target_file_size=256'

Integración Redshift Spectrum y Athena

Redshift Spectrum y Athena pueden trabajar juntos para crear una arquitectura de análisis híbrida que combine lo mejor de ambos mundos.

Arquitectura Híbrida

┌───────────────────────────────────────────────────────────┐ │ │ Amazon S3 Data Lake│ │ └───────────────────────────────────────────────────────────┘ ▲ ▲ │ │ │ │ ┌─────────────────┴───┐ ┌───────┴─────────────┐ │ │ │ │ Redshift Spectrum │ │ Athena│ │ │ │ └─────────────────┬───┘ └───────┬─────────────┘ │ │ ▼ │ ┌─────────────────────┐ │ │ │ │ Redshift Cluster │ │ │ │ │ └─────────────────────┘ │ ▲ │ │ │ └───────────────────────┘

Escenarios de Uso

  1. Datos calientes vs. fríos:
    • Datos frecuentemente accedidos en Redshift
    • Datos históricos o menos accedidos en S3
  2. Consultas federadas:
    • Unir datos de Redshift con datos externos en S3
  3. ETL progresivo:
    • Explorar datos en S3 con Athena
    • Cargar datos relevantes a Redshift para análisis intensivo

Implementación

  1. Crear esquema externo en Redshift:
-- Crear esquema externo en Redshift CREATE EXTERNAL SCHEMA spectrum_schema FROM DATA CATALOG DATABASE 'analytics_db' IAM_ROLE 'arn:aws:iam::123456789012:role/RedshiftSpectrumRole' CREATE EXTERNAL DATABASE IF NOT EXISTS;
  1. Consultar tablas externas:
-- Consulta que combina datos locales con externos SELECT v.fecha_venta, v.region, v.producto_id, p.nombre_producto, p.categoria, v.cantidad, v.precio_unitario, v.cantidad * v.precio_unitario AS total FROM ventas_recientes v JOIN spectrum_schema.catalogo_productos p ON v.producto_id = p.id WHERE v.fecha_venta >= DATEADD(month, -3, CURRENT_DATE) ORDER BY v.fecha_venta DESC, total DESC LIMIT 1000;
  1. Compartir metadatos entre servicios:
    • Usar AWS Glue Data Catalog como fuente común
    • Definir tablas una vez, usar en múltiples servicios

Optimización de Consultas Híbridas

  1. Empujar predicados:
    • Filtrar datos en S3 antes de unir con Redshift
  2. Particionar estratégicamente:
    • Alinear particiones con patrones de consulta comunes
  3. Distribución de carga:
    • Usar Redshift para agregaciones y joins complejos
    • Usar Spectrum para escaneo de grandes volúmenes
-- Ejemplo de consulta optimizada -- Filtra primero en Spectrum antes de unir con tabla local SELECT DATE_TRUNC('month', s.fecha) AS mes, s.region, s.categoria, COUNT(DISTINCT s.cliente_id) AS clientes_unicos, SUM(s.monto_venta) AS ventas_totales, AVG(r.tiempo_respuesta) AS tiempo_respuesta_promedio FROM spectrum_schema.ventas_historicas s JOIN metricas_servicio r ON s.fecha = r.fecha AND s.region = r.region WHERE s.fecha BETWEEN '2022-01-01' AND '2022-12-31' AND s.region IN ('EMEA', 'APAC') AND s.categoria = 'Electronics' GROUP BY DATE_TRUNC('month', s.fecha), s.region, s.categoria ORDER BY mes, ventas_totales DESC;

Casos de Uso y Patrones de Implementación

1. Análisis de Datos Operacionales

Patrón: Datos operacionales → S3 → Athena para exploración → Redshift para análisis profundo

┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │ │ │ │ │ │ │ Sistemas │────►│ Kinesis │────►│ S3 │────►│ Glue│Operacionales │ Firehose │ │ │ │ Crawler│ │ │ │ │ │ │ │ └───────────┘ └───────────┘ └───────────┘ └───────────┘ │ │ │ ▼ │ ┌───────────┐ │ │ │ │ │ Lake │ │ Formation │ │ │ │ └─────┬─────┘ │ │ ▼ ▼ ┌───────────┐ ┌───────────┐ │ │ │ │ Athena │ │ Redshift (Exploración) (Análisis) │ │ │ │ └───────────┘ └───────────┘

2. Data Lake Empresarial

Patrón: Múltiples fuentes → Lake Formation para gobernanza → Múltiples consumidores con permisos diferenciados

# Ejemplo de definición de flujo de trabajo para data lake Resources: DataLakeWorkflow: Type: AWS::Glue::Workflow Properties: Name: EnterpriseDataLakeWorkflow Description: "Flujo de trabajo para procesamiento de data lake empresarial" CrawlerTrigger: Type: AWS::Glue::Trigger Properties: Name: StartCrawlersTrigger Type: SCHEDULED Schedule: cron(0 0 * * ? *) Actions: - CrawlerName: !Ref RawDataCrawler WorkflowName: !Ref DataLakeWorkflow ProcessingTrigger: Type: AWS::Glue::Trigger Properties: Name: StartProcessingTrigger Type: CONDITIONAL StartOnCreation: true Actions: - JobName: !Ref DataProcessingJob Predicate: Conditions: - LogicalOperator: EQUALS CrawlerName: !Ref RawDataCrawler CrawlState: SUCCEEDED WorkflowName: !Ref DataLakeWorkflow

3. Análisis de Costos Optimizado

Patrón: Datos en S3 con particionamiento y formato Parquet → Athena para consultas ad-hoc → Redshift para dashboards frecuentes

-- Ejemplo de vista materializada en Redshift CREATE MATERIALIZED VIEW mv_resumen_ventas_diarias AUTO REFRESH YES AS SELECT fecha_venta, region, categoria_producto, SUM(monto_venta) AS ventas_totales, COUNT(DISTINCT cliente_id) AS clientes_unicos, SUM(monto_venta) / COUNT(DISTINCT cliente_id) AS valor_promedio_cliente FROM ventas WHERE fecha_venta >= DATEADD(month, -12, CURRENT_DATE) GROUP BY fecha_venta, region, categoria_producto;

La combinación de Redshift, Athena y Lake Formation proporciona un conjunto completo de herramientas para implementar soluciones de análisis de datos a escala. Redshift ofrece rendimiento y capacidades avanzadas para cargas de trabajo analíticas intensivas, Athena proporciona flexibilidad y simplicidad para consultas ad-hoc sin infraestructura, y Lake Formation añade una capa de gobernanza y seguridad esencial para entornos empresariales.

La clave para aprovechar al máximo estos servicios está en entender sus fortalezas y limitaciones, y diseñar una arquitectura que los combine de manera óptima según los requisitos específicos de cada caso de uso. El particionamiento adecuado, la elección de formatos de archivo optimizados y la implementación de estrategias de permisos granulares son fundamentales para construir soluciones de análisis eficientes, seguras y escalables.

A medida que el volumen y la complejidad de los datos continúan creciendo, estas herramientas proporcionan la base para transformar datos en conocimientos accionables, permitiendo a las organizaciones tomar decisiones basadas en datos de manera más rápida y efectiva.