1

HMM con pandas y numpy

Para evitar el uso de ciclos for en python y obtener un código más limpio y entendible te presento la siguiente solución al problema presentado en clase para generar un modelo markoviano latente (HMM).

<h1>Setup</h1>

Para empezar importamos nuestras librerías y clonamos el corpus de la clase.

import pandas as pd
import numpy as np
from pathlib import Path
from conllu import parse_incr
url = 'https://github.com/UniversalDependencies/UD_Spanish-AnCora.git'
path = Path('/tmp/spanish-corpus/')
!git clone$url$path && echo'Done.'
<h1>Generar el dataset</h1>

Para generar los contadores deberemos agregar cada palabra a un a lista que contenga cada una de ellas, en pocas palabras un flatten.

wordlist = []
first_word_list = []
counter = 0
conllu_file = 'es_ancora-ud-dev.conllu'withopen(path / conllu_file, 'r', encoding='utf-8') as f:
    for tokenlist in parse_incr(f):
        fortokenin tokenlist:
            wordlist.append(token)
        first_word_list.append(tokenlist[0])

Para generar un dataframe que tenga un lista atomizada de todos los elementos que necesitaremos aplicamos.

df = pd.DataFrame(word_list)
df = df[['form','upos']]
df.rename(columns={'form': 'word', 'upos': 'tag'}, inplace=True)
df.word = df.word.str.lower()
df['tag_prev'] = df['tag'].shift(1)
df

Nos dará el siguiente output:

wordtagtag_prev
0elDET
1gobernanteNOUNDET
2,PUNCTNOUN
3conADPPUNCT
4ganadaADJADP

Generar los contadores


Necesitaremos calcular 4 contadores:

  • C(tag)
tag_count = df['tag'].value_counts()
  • C(tag , tag_prev)
tagpair_count = df[['tag', 'tag_prev']].drop(index=0).value_counts()
  • C(first_tag)
first_tag_prob = pd.Series(list(map(lambda x: x['upos'], first_word_list))).value_counts(normalize=True)
for tag in tag_count.index:
    if tag notin first_tag_prob.index:
        first_tag_prob[tag] = 0

pd: Con normalize=True puedes calcular las probabilidades directamente.
En este caso lo que realmente estamos calculando es P(first_tag). rho como lo llamaremos desde ahora.

  • C(word, tag)
word_tag_count = df[['word', 'tag']].value_counts()

Calcular las probabilidades


Necesitamos calcular 3 probabilidades:

  • prior P( t | t_(n-1) )
prior = tagpair_count /  tag_count[tagpair_count.reset_index()['tag_prev']].values
  • likelihood P( w | t )
likelihood = wordtag_count / tag_count[wordtag_count.reset_index()['tag']].values
  • P(first_tag)
rho = first_tag_prob

Construcción de matrices


Antes que nada, tendremos que tener en cuenta 3 importantes dimensiones, que representan el shape de nuestras
matrices, estas son: word, tag y prevtag

word sera una dimension de n palabras de largo, mientras que tag y prevtag sera dimensiones de 17 de largo cada una, pero no se podrá mezclar tag y prevtag.

Calcularemos 2 matrices:

  • emission
emission = likelihood \
    .sort_index(level=[0,1]) \
    .to_frame("p") \
    .reset_index() \
    .pivot_table(index = "word", columns = "tag") \
    .fillna(0) \
    .droplevel(level=0, axis=1)

Nos da una matriz (word, tag)

Este código nos devolverá una matriz (en forma de dataframe) con las dimensiones (words, tag).
Es importante recordar que aunque tag y tag_prev tienen el mismo número de elementos únicos no deben ser confundidas entre si porque representan diferentes dimensiones.

  • transimition
transition = \
    prior \
    .sort_index(level=[0,1]) \
    .to_frame("p") \
    .reset_index() \
    .pivot_table(index = "tag", columns = "tag_prev") \
    .fillna(0) \
    .droplevel(level=0, axis=1)
tags_enum = transition_mat.index.tolist()

Nos devolverá una matriz (tag, prevtag)

En este caso ubicamos la dimension tag al inicio para que sea operable con el tag de la matriz emission.

<h1>Definir la funcion</h1>

Nota: Para que todo salga bien las dimensiones tag y prev_tag deben ser casi idénticas, esto significa mismo orden y elementos en los índices, para eso usamos .sort_index() anteriormente, para asegurarnos que es así ejecutamos.

assert emission.columns == transition.columns

Podriamos añadir un check en la funcion pero quiero que sea lo más limpia y entendible posible, queda en ti determinar si necesitarás ese check o no.

from typing import Iterable
text_type = Iterable | str
df_type = pd.DataFrame | pd.Series


defViterbi(words: text_type, 
            emission: df_type = emission, 
            transition: df_type = transition, 
            rho: df_type = rho):# Overloadingif isinstance(words, str):
        tokenizer = nltk.tokenize.NLTKWordTokenizer()
        words = tokenizer.tokenize(words)

    # Solo usaremos los valores que se encuentran en nuestra oración
    emission = emission.loc[words, :]

    # P(word|tag) * P(tag|tag_prev) # Nota: No expandimos el resultado
    bayes = np.einsum('ij,kj->ikj', emission, transition, dtype=np.float64)

    # Viterbi para el primer elemento
    v_0 = (inittag_prob * emission_filt.iloc[0,:]).values

    # La magia
    res = []
    res.append(v_0)
   		# Para los estados posterioresfor idx in range(1, len(words)):
        prev_row = res[-1]              # previous state
        buff = prev_row @ bayes[idx]    # rho x emission x transmission
        res.append(buff)
    res = np.array(res)
    
    # Pretty printing
    tags_enum = transition.index.tolist()
    dd = pd.DataFrame(res, index=emission.index, columns=transition.index)
    return dd.apply(lambda x: {'tag' : tags_enum[x.argmax()]}, axis=1, result_type='expand')

Y lo probamos:

viterbi('el mundo es pequeño.')
wordtag
elDET
mundoNOUN
esAUX
pequeñoADJ
.PUNCT

✨ Voilá! ✨

Si deseas obtener la matriz puede ubicar return res debajo del último res.

Si tienes dudas sobre cómo funciona esta línea en específico:

bayes = np.einsum('ij,kj->ikj', emission, transition, dtype=np.float64)

Te recomiendo investigar sobre una suma de einstein o el tensordot product, temas de algebra lineal.
Para ponerlo fácil, la operación realizada en la línea es muy parecida a una multiplicación de matrices, pero nos saltamos, el último paso, la suma.

Esto se debe a que en la formula tenemos rho * (emission * transmission) y no rho x sum_i^n { emission x transmission }

Además como puedes ver tenemos anotadas las dimensiones: ijjk y ijk

i representa el numero de palabras, en este caso 5, por el numero de palabras en “el mundo es pequeño”
j representa la dimensión tag (17)
k representa la dimensión tag_prev (17)

si escribimos ‘->ijk’ obtendremos una matriz (word, tag, prev_tag) resultado de la multiplicación de la dimension j (tags) de las matrices emission y transmission

si escribimos ‘->ik’ obtendremos un multiplicación de matrices

Espero que este tutorial te haya servido para entender más a fondo el algoritmo de viterbi.

Gracias. 👋🏽👋🏽

Escribe tu comentario
+ 2