Para evitar el uso de ciclos for
en python y obtener un código más limpio y entendible te presento la siguiente solución al problema presentado en clase para generar un modelo markoviano latente (HMM).
Para empezar importamos nuestras librerías y clonamos el corpus de la clase.
import pandas as pd
import numpy as np
from pathlib import Path
from conllu import parse_incr
url = 'https://github.com/UniversalDependencies/UD_Spanish-AnCora.git'
path = Path('/tmp/spanish-corpus/')
!git clone$url$path && echo'Done.'
<h1>Generar el dataset</h1>
Para generar los contadores deberemos agregar cada palabra a un a lista que contenga cada una de ellas, en pocas palabras un flatten.
wordlist = []
first_word_list = []
counter = 0
conllu_file = 'es_ancora-ud-dev.conllu'withopen(path / conllu_file, 'r', encoding='utf-8') as f:
for tokenlist in parse_incr(f):
fortokenin tokenlist:
wordlist.append(token)
first_word_list.append(tokenlist[0])
Para generar un dataframe que tenga un lista atomizada de todos los elementos que necesitaremos aplicamos.
df = pd.DataFrame(word_list)
df = df[['form','upos']]
df.rename(columns={'form': 'word', 'upos': 'tag'}, inplace=True)
df.word = df.word.str.lower()
df['tag_prev'] = df['tag'].shift(1)
df
Nos dará el siguiente output:
word | tag | tag_prev | |
---|---|---|---|
0 | el | DET | |
1 | gobernante | NOUN | DET |
2 | , | PUNCT | NOUN |
3 | con | ADP | PUNCT |
4 | ganada | ADJ | ADP |
Necesitaremos calcular 4 contadores:
tag_count = df['tag'].value_counts()
tagpair_count = df[['tag', 'tag_prev']].drop(index=0).value_counts()
first_tag_prob = pd.Series(list(map(lambda x: x['upos'], first_word_list))).value_counts(normalize=True)
for tag in tag_count.index:
if tag notin first_tag_prob.index:
first_tag_prob[tag] = 0
pd: Con normalize=True
puedes calcular las probabilidades directamente.
En este caso lo que realmente estamos calculando es P(first_tag). rho como lo llamaremos desde ahora.
word_tag_count = df[['word', 'tag']].value_counts()
Necesitamos calcular 3 probabilidades:
prior = tagpair_count / tag_count[tagpair_count.reset_index()['tag_prev']].values
likelihood = wordtag_count / tag_count[wordtag_count.reset_index()['tag']].values
rho = first_tag_prob
Antes que nada, tendremos que tener en cuenta 3 importantes dimensiones, que representan el shape de nuestras
matrices, estas son: word, tag y prevtag
word sera una dimension de n palabras de largo, mientras que tag y prevtag sera dimensiones de 17 de largo cada una, pero no se podrá mezclar tag y prevtag.
Calcularemos 2 matrices:
emission = likelihood \
.sort_index(level=[0,1]) \
.to_frame("p") \
.reset_index() \
.pivot_table(index = "word", columns = "tag") \
.fillna(0) \
.droplevel(level=0, axis=1)
Nos da una matriz (word, tag)
Este código nos devolverá una matriz (en forma de dataframe) con las dimensiones (words, tag).
Es importante recordar que aunque tag y tag_prev tienen el mismo número de elementos únicos no deben ser confundidas entre si porque representan diferentes dimensiones.
transition = \
prior \
.sort_index(level=[0,1]) \
.to_frame("p") \
.reset_index() \
.pivot_table(index = "tag", columns = "tag_prev") \
.fillna(0) \
.droplevel(level=0, axis=1)
tags_enum = transition_mat.index.tolist()
Nos devolverá una matriz (tag, prevtag)
En este caso ubicamos la dimension tag al inicio para que sea operable con el tag de la matriz emission.
<h1>Definir la funcion</h1>Nota: Para que todo salga bien las dimensiones tag y prev_tag deben ser casi idénticas, esto significa mismo orden y elementos en los índices, para eso usamos .sort_index()
anteriormente, para asegurarnos que es así ejecutamos.
assert emission.columns == transition.columns
Podriamos añadir un check en la funcion pero quiero que sea lo más limpia y entendible posible, queda en ti determinar si necesitarás ese check o no.
from typing import Iterable
text_type = Iterable | str
df_type = pd.DataFrame | pd.Series
defViterbi(words: text_type,
emission: df_type = emission,
transition: df_type = transition,
rho: df_type = rho):# Overloadingif isinstance(words, str):
tokenizer = nltk.tokenize.NLTKWordTokenizer()
words = tokenizer.tokenize(words)
# Solo usaremos los valores que se encuentran en nuestra oración
emission = emission.loc[words, :]
# P(word|tag) * P(tag|tag_prev) # Nota: No expandimos el resultado
bayes = np.einsum('ij,kj->ikj', emission, transition, dtype=np.float64)
# Viterbi para el primer elemento
v_0 = (inittag_prob * emission_filt.iloc[0,:]).values
# La magia
res = []
res.append(v_0)
# Para los estados posterioresfor idx in range(1, len(words)):
prev_row = res[-1] # previous state
buff = prev_row @ bayes[idx] # rho x emission x transmission
res.append(buff)
res = np.array(res)
# Pretty printing
tags_enum = transition.index.tolist()
dd = pd.DataFrame(res, index=emission.index, columns=transition.index)
return dd.apply(lambda x: {'tag' : tags_enum[x.argmax()]}, axis=1, result_type='expand')
Y lo probamos:
viterbi('el mundo es pequeño.')
word | tag |
---|---|
el | DET |
mundo | NOUN |
es | AUX |
pequeño | ADJ |
. | PUNCT |
✨ Voilá! ✨
Si deseas obtener la matriz puede ubicar return res
debajo del último res
.
Si tienes dudas sobre cómo funciona esta línea en específico:
bayes = np.einsum('ij,kj->ikj', emission, transition, dtype=np.float64)
Te recomiendo investigar sobre una suma de einstein o el tensordot product, temas de algebra lineal.
Para ponerlo fácil, la operación realizada en la línea es muy parecida a una multiplicación de matrices, pero nos saltamos, el último paso, la suma.
Esto se debe a que en la formula tenemos rho * (emission * transmission)
y no rho x sum_i^n { emission x transmission }
Además como puedes ver tenemos anotadas las dimensiones: ij
jk
y ijk
i
representa el numero de palabras, en este caso 5, por el numero de palabras en “el mundo es pequeño”j
representa la dimensión tag (17)k
representa la dimensión tag_prev (17)
si escribimos ‘->ijk’ obtendremos una matriz (word, tag, prev_tag) resultado de la multiplicación de la dimension j
(tags) de las matrices emission y transmission
si escribimos ‘->ik’ obtendremos un multiplicación de matrices
Espero que este tutorial te haya servido para entender más a fondo el algoritmo de viterbi.
Gracias. 👋🏽👋🏽