No tienes acceso a esta clase

¡Continúa aprendiendo! Únete y comienza a potenciar tu carrera

Reto: construye un MEMM en Python

16/26
Recursos

Aportes 7

Preguntas 1

Ordenar por:

¿Quieres ver más aportes, preguntas y respuestas de la comunidad?

from conllu import parse_incr

uniqueFeatureDict = {}
contextDict = {}

tagtype = 'upos'
data_file = open("UD_Spanish-AnCora/es_ancora-ud-train.conllu", "r", encoding="utf-8")

#Calculando conteos (pre-probabilidades)
for tokenlist in parse_incr(data_file):
  prevtag = "None"
  for token in tokenlist:
    tag = token[tagtype]
    word = token['form'].lower()
    #c(tag|word,prevtag)
    largeKey = tag+'|'+word+','+prevtag
    if largeKey in uniqueFeatureDict.keys():
      uniqueFeatureDict[largeKey]+=1
    else:
      uniqueFeatureDict[largeKey]=1
    key = word+','+prevtag
    if key in contextDict.keys():
      contextDict[key]+=1
    else:
      contextDict[key]=1
    #print(largeKey, key, '\n')
    prevtag=tag

también hice el reto, lo pueden ver aqui

Reto terminado. Fue un desafio, la verdad es muy diferente ver como lo hace Francisco a hacerlo uno mismo.

https://github.com/JDGALLEGOS/MEMM_Viterbi/blob/main/MEMM_Viterbi.ipynb

```js !pip install conllu !pip install stanza !git clone https://github.com/UniversalDependencies/UD_Spanish-AnCora.git import os from conllu import parse_incr import stanza from sklearn.linear_model import LogisticRegression from sklearn.model_selection import train_test_split import numpy as np def load_ancora_dataset(): file_path = "UD_Spanish-AnCora/es_ancora-ud-train.conllu" sentences = [] with open(file_path, 'r', encoding='utf-8') as f: for tokenlist in parse_incr(f): sentence = [(token['form'], token['upostag']) for token in tokenlist] sentences.append(sentence) return sentences dataset = load_ancora_dataset() stanza.download('es') nlp = stanza.Pipeline('es') def extract_features(sentence): doc = nlp(' '.join([word for word, _ in sentence])) features = [] for i, word in enumerate(doc.sentences[0].words): feature_dict = { 'word': word.text, 'is_first': i == 0, 'is_last': i == len(doc.sentences[0].words) - 1, 'prev_word': '' if i == 0 else doc.sentences[0].words[i - 1].text, 'next_word': '' if i == len(doc.sentences[0].words) - 1 else doc.sentences[0].words[i + 1].text, 'word_length': len(word.text), 'is_capitalized': word.text[0].isupper(), 'is_digit': word.text.isdigit(), 'pos': word.upos } features.append(feature_dict) return features def prepare_dataset(dataset): X, y = [], [] for sentence in dataset: features = extract_features(sentence) X.extend(features) y.extend([tag for _, tag in sentence]) return X, y X, y = prepare_dataset(dataset) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42) clf = LogisticRegression(max_iter=1000, multi_class='multinomial', solver='lbfgs') clf.fit([list(x.values()) for x in X_train], y_train) def viterbi(observations, states, classifier): T = len(observations) N = len(states) viterbi_matrix = np.zeros((N, T)) backpointer = np.zeros((N, T), dtype=int) for s in range(N): viterbi_matrix[s, 0] = classifier.predict_proba([observations[0]])[0][s] for t in range(1, T): for s in range(N): max_tr_prob = max(viterbi_matrix[s_prime, t-1] * classifier.predict_proba([observations[t]])[0][s] for s_prime in range(N)) viterbi_matrix[s, t] = max_tr_prob best_path = np.argmax(viterbi_matrix[:, -1]) return best_path states = clf.classes_ test_sentence = X_test[:5] predicted_sequence = viterbi(test_sentence, states, clf) train_accuracy = clf.score([list(x.values()) for x in X_train], y_train) test_accuracy = clf.score([list(x.values()) for x in X_test], y_test) print(f"Precisión en conjunto de entrenamiento: {train_accuracy:.4f}") print(f"Precisión en conjunto de test: {test_accuracy:.4f}") ```

Aqui mi solución al reto, quería hacerlo elegante al inicio pero no salió xd.

Github

Hola, aquí deje mi solución al reto, al final traté de compactar todo en una clase para el modelo y una función extra para plotear la matriz de Viterbi. El código fue el siguiente:

Importación de librerías y recursos

###### COSAS IMPORTANTES, CORRER AL INICIO

# cargamos el dataset o corpus
!git clone https://github.com/UniversalDependencies/UD_Spanish-AnCora.git
# librerias
!pip3 install conllu
!pip3 install stanza
import numpy as np # para el manejo de la mtriz de Viterbi
import stanza # queremos el tokenizador en español
stanza.download('es') # descargamos el paquete de español

# cosas para plotear la matriz de viterbi
import seaborn as sn # para plotear
import matplotlib.pyplot as plt
import pandas as pd # para organizar la matriz

# funcion para leer el conllu
from conllu import parse_incr

El modelo y la función para leer el corpus

 # todas las etiquetas
tags = {'ADJ','ADP','ADV','AUX','CCONJ','DET','INTJ','NOUN','NUM',
        'PART','PRON','PROPN','PUNCT','SCONJ','SYM','VERB','_'}

# instanciamos el tokenizador que sera de importancia
nlp = stanza.Pipeline('es', processors='tokenize')

class MEMM():

  # en el constructor que se definan los diccionarios
  def __init__(self):
    # diccionarios de conteos
    self.uniqueFeatureDict = {}
    self.contextDict = {}
    # diccionarios de las probabilidades
    self.posteriorProbDict = {}
    self.initialTagProb = {}
    # diccionario de tags con ids
    self.tagsDict = {}

  # ayudante para contar en diccionarios
  def __count_dict(self, dic, key):
    if key in dic.keys():
      # si ya esta le sumamos 1
      dic[key] += 1
    else:
      # si no la iniciamos en 1
      dic[key] = 1
    return dic

  # privado, conteo que se usara en entrenamiento
  def __counts(self, para_list): # paragraph_list*
    # uniqueFeatureDict = {} # C(tag|word,prevtag)
    # contextDict = {} # C(word|prevtag)
    # iterar los "parrafos" del dataset
    for para in para_list:
      prevtag = "None"# al inicio no hay tag previo
      # iterar las palabras del "parrafo"
      for token in para:
        # leemos la palabra y tag del token
        word, tag = token[0], token[1]

        ########## CONTEO PARA uniqueFeatureDict C(tag|word,prevtag)
        # definimos la clave para el diccionario
        ufd_key = tag + '|' + word + ',' + prevtag
        self.uniqueFeatureDict = self.__count_dict(self.uniqueFeatureDict, ufd_key)
        
        ########## CONTEO PARA contextDict C(word|prevtag)
        # definimos la clave para el diccionario
        cd_key = word + ',' + prevtag
        self.contextDict = self.__count_dict(self.contextDict, cd_key)

        # al final el tag será el prevtag
        prevtag = tag

  # probabilidades, igual privada, para el entrenamiento
  def __probs(self):
    # iteramos las claves del ciccionario
    for key in self.uniqueFeatureDict.keys():
      if len(key.split('|'))==3: # cae un caso, es algo raro por lo que vi
        # clave para el contextDict
        cd_key = '|'+key.split('|')[-1]
        # efectuamos el calculo
        self.posteriorProbDict[key] = self.uniqueFeatureDict[key]/self.contextDict[cd_key]
        # el caso que cayo fue [ SYM||,NOUN ], al parecer el dataset contuvo un
        # caso en el que el texto traia un |, y lo toma como palabra
      else:
        # clave para el contextDict
        cd_key = key.split('|')[1]
        # efectuamos el calculo
        self.posteriorProbDict[key] = self.uniqueFeatureDict[key]/self.contextDict[cd_key]
  
  def __initial_count(self, data):
    # hay que enumerarlas para que el modelo prediga segun el numero del tag
    # de modo que el tagDict traera tag: id, el id será la que use le modelo
    for i, tag in enumerate(tags):
      self.tagsDict[tag] = i

    for para in data: # iteramos los parrafos
      # buscamos el tag
      tag = para[0][1]
      # y hacemos el conteo mediante la funcion de contar
      self.__count_dict(self.initialTagProb, tag)
      # ayuda para las probabilidades
    total = len(data)
    # iteramos los conteos
    for key in self.initialTagProb.keys():
      # dividir el conteo entre el total
      self.initialTagProb[key] /= total

  # entrenamiento: conteos y probabilidades
  def fit(self, data):
    # realizamos los conteos
    self.__counts(data)
    # y el calculo de las probabilidades
    self.__probs()
    # sacar las distribuciones iniciales
    self.__initial_count(data)
  
  # plotear la matriz de viterbi
  def __plot_matrix(self, matrix, seq):
    # hay que hacer un dataframe para plotear mejor
    df = pd.DataFrame(matrix, index = self.tagsDict.keys(), columns = list(seq))
    # ploteamos
    plt.figure(figsize = (10,7))
    plt.title("Predicción")
    sn.heatmap(df, annot=True)

  # algoritmo de viterbi para las predicciones, será publico
  def Viterbi(self, string, plot=False):
    if string == '':
      raise ValueError('El mensaje no puede estar vacío')
    # primero hay que tokenizar la palabra
    tok = nlp(string)
    # como queremos solo las palabras hay que hacer esto
    seq = [word.text for word in tok.sentences[0].words]
    # matriz de ceros para comenzar
    viterbiProb = np.zeros((len(self.tagsDict), len(seq)))

    # calculo de la primera columna, la especial
    for tag in self.tagsDict.keys(): 
      # buscamos la llave para el diccionario inicial
      row = self.tagsDict[tag]
      # buscamos la otra probabilidad
      key = tag + '|' + seq[0].lower() + ',' + "None"
      # efectuamos el calculo de la probabilidad el try es por
      # que la palabra puede no corresponder a el tag de la key
      try:
        viterbiProb[row, 0] = self.initialTagProb[tag] * self.posteriorProbDict[key]
      except: # si no pos no
        pass

    # calculos para las siguientes columnas o palabras
    for col in range(1, len(seq)): # iteramos las columnas
      for tag in self.tagsDict.keys(): # iteramos los tags o filas
        tag_row = self.tagsDict[tag] # seleccionamos la fila
        possible_probs = []
        # se hace doble for y possible_probs por que tambien hay que
        # intentar todas las probabilidades previas y de esas ver la
        # mas probable es decir la maxima
        for prevtag in self.tagsDict.keys(): # buscamos el tag previo
          prevtag_row = self.tagsDict[prevtag] # buscamos el row del tag previo
          # key para la probabilidad previa de viterbi
          key = tag + '|' + seq[col].lower() + ',' + prevtag
          try:
            # tratamos de buscar la probabilidad previa
            possible_probs.append(
                viterbiProb[prevtag_row, col-1] * self.posteriorProbDict[key])
          except:
            # si no pos metemos un 0
            possible_probs.append(0)
        # de ahi metemos en la matriz aquella mas probable
        viterbiProb[tag_row, col] = max(possible_probs)

    # Buscamos el resultado de la predicción
    res = []
    for i, p in enumerate(seq): # enumaramos la cadena tokenizada
      for tag in self.tagsDict.keys(): # iteramos los tags
        # vemos si el tag es el argumento maximo de la columna i
        if self.tagsDict[tag] == np.argmax(viterbiProb[:, i]):
          # si es el maximo lo agregamos junto con la palabra
          res.append((p, tag))
    # si nos pidio plotear hacemos el plot
    if plot:
      self.__plot_matrix(viterbiProb, seq)
    return res

  # para predecir hacemos el algoritmo de viterbi
  def predict(self, string, plot=False):
    # usamos el algoritmo de Viterbi y pasamos el plot
    res = self.Viterbi(string, plot)
    return res


# función para leer el dataset
def get_word_list(path, encoding='utf-8', tagtype='upos', alert=True):
  # abrir el archivo
  data_file = open(path, 'r', encoding=encoding)
  word_list = []
  # iteramos las listas de tokens procesando el conllu
  for tokenlist in parse_incr(data_file): # con parse_incr()
      paragraph_list = [] # iteramos por parrafo
      # iteramos buscando la palabra y su tag segun tagtype
      for token in tokenlist:
          # agregamos la palabra y el tag seleccionado
          word = token['form'].lower()
          tag = token[tagtype]
          paragraph_list.append((word, tag))
      word_list.append(paragraph_list) # le agregamos el parrafo
  # si hay alerta ponemos eso
  if alert:
    print(word_list[0])
  return word_list

Usando el modelo y la función

######### Usando a la función
path = "UD_Spanish-AnCora/es_ancora-ud-train.conllu"
data = get_word_list(path, alert=False)
# [(palabra, tag), ...]

# instanciamos el modelo
model = MEMM()
# lo entrenamos
model.fit(data)
# y nos divertimos con el :D
model.predict('El mundo es muy pequeño', plot=True)

Les paso mi código … Aunque no funciona lo estructure como una clase y cree un método train el cual toma el dataset y va llamando a las distintas funciones de manera secuencial.

class MEMM():

  def __init__(self):
    """
    """    
    
    self.posteriorProbDict = {}
    self.initTagStateProb = {}

    self.uniqueFeatureDict = {}
    self.contextDict = {}

    self.tagStateDict = {}

  def train(self, data_file):
    """
    """

    self.data_file = data_file

    self.makeAndCountTags()
    self.posterior()
    self.initial()

    self.makeStates()

  def decode(self, string):
    """
    Algoritmo de vitebi.
    """

    doc = nlp(string)

    if len(doc.sentences) > 1:
      raise ValueError(':(')

    seq = [ word.text for word in doc.sentences[0].words ]
    viterbiProb = np.zeros( (17, len(seq)) )

    # inicilizamos la primera columna.
    for tag in self.tagStateDict.keys():
      tag_row = self.tagStateDict[tag]
      key = tag + '|' + seq[0].lower() + ',' + 'None'

      try:
        viterbiProb[tag_row, 0] = self.initTagStateProb[tag] * self.posteriorProbDict[key]
      except:
        pass

    # computo las siguientes columnas.
    for col in range( 1, len(seq) ):
      for tag in self.tagStateDict.keys():
        tag_row = self.tagStateDict[tag]
        possible_probs = []

        for prevtag in self.tagStateDict.keys(): 
          prevtag_row = self.tagStateDict[prevtag]
          key = tag + '|' + seq[col].lower() + ',' + prevtag

          try:
            possible_probs.append(
                viterbiProb[prevtag_row, col-1] * self.posteriorProbDict[key]
            )
          except:
            possible_probs.append(0)

          viterbiProb[tag_row, col] = max(possible_probs)

    # construccion de secuencia de tags
    res = []
    for i, p in enumerate(seq):
      for tag in self.tagStateDict.keys():
        if self.tagStateDict[tag] == np.argmax( viterbiProb[:, i] ):
          res.append( (p, tag) )

    return res

  def makeStates(self):
    """
    """

    # identificamos las categorias gramaticales 'upos' unicas en el corpus
    stateSet = {'ADJ',
                'ADP',
                'ADV',
                'AUX',    
                'CCONJ',
                'DET',
                'INTJ',
                'NOUN',
                'NUM',
                'PART',
                'PRON',
                'PROPN',
                'PUNCT',
                'SCONJ',
                'SYM',
                'VERB',
                '_'
    }

    # enumeramos las categorias con numeros para asignar a 
    # las columnas de la matriz de Viterbi
    
    for i, state in enumerate(stateSet):
      self.tagStateDict[state] = i

  def makeAndCountTags(self):
    """
    """

    for tokenlist in parse_incr(self.data_file):
      prevtag = 'None'
      for token in tokenlist:
        tag = token[tagtype]
        word = token['form'].lower()

        largekey = tag + '|' + word + ','
        if largekey in self.uniqueFeatureDict.keys():
          self.uniqueFeatureDict[largekey] += 1
        else:
          self.uniqueFeatureDict[largekey] = 1

        key = word + ',' + prevtag
        if key in self.contextDict.keys():
          self.contextDict[key] += 1
        else:
          self.contextDict[key] = 1

        prevtag = tag

  def posterior(self):
    """
    """

    for key in self.uniqueFeatureDict.keys():
      if len( key.split('|') ) == 3:
        self.posteriorProbDict[key] = self.uniqueFeatureDict[key] / self.contextDict['|'+key.split('|')[-1]]
      else:
        self.posteriorProbDict[key] = self.uniqueFeatureDict[key] / self.contextDict[key.split('|')[1]]

  def initial(self):
    """
    """

    wordlist = []
    count = 0

    for tokenlist in parse_incr(self.data_file):
      count += 1
      tag = tokenlist[0]['upos']

      if tag in self.initTagStateProb.keys():
        self.initTagStateProb[tag] += 1
      else:
        self.initTagStateProb[tag] = 1

    for key in self.initTagStateProb.keys():
      self.initTagStateProb[key] /= count

Aquí llamamos a la clase y al entrenamiento.

model = MEMM()
model.train(data_file)
model.decode('el mundo es pequeño')

Donde el resultado es:

[('el', 'NOUN'), ('mundo', 'NOUN'), ('es', 'NOUN'), ('pequeño', 'NOUN')]

Es aquí donde me genera la falla, creo que el error está en las funciones que calculan posterior y initial.