Source code for src.features.textual_features.keyword_search.contextual_fuzzy_search

import pandas as pd

from loguru import logger
from thefuzz import fuzz
from unstructured.cleaners.core import clean


[docs] def find_best_matches(id: str, content: str, keyword: str, threshold: int, context_words: int) -> list: """ Function that finds best matches for a given keyword / keyword combination in the input string. This function utilizes the thefuzz package to find best matches for a given keyword / keyword combination in the input string. It returns a list of dictionaries containing the id, keyword, matched phrase, and similarity score per match. The fuzz ratio is used to calculate the similarity score. It leverages the Levendstein distance to calculate the similarity between two strings. The score is normalized between 0 and 100, with 100 being the most similar. Args: id: identifier of the content (e.g.,filename) content: textual input to be searched for keyword keyword: one or multiple word input of interest threshold: for similarity search between input and keyword context_words: get surrounding context of -x and +x words Returns: dict: list of dictionaries containing - id, - keyword, - matched_phrase, and - similarity score per match """ # split content string into words, get no. of words in content string words = content.split() content_length = len(words) # get no. of keywords in keyword string key_length = len(keyword.split()) # init best_matches = [] # iterate through text input in stepsize of key_length for i in range(0, content_length + 1, key_length): # generate phrases and get similarity between phrase + keyword phrase = " ".join(words[i:i + key_length]) similarity_score = fuzz.ratio(phrase, keyword) # only get matches above threshold if similarity_score > threshold: # calculate -x and +x for context words start_context = max(0, i - context_words) end_context = min(content_length, i + context_words + key_length) # extract keyword and its surroundings context_phrase = " ".join(words[start_context:end_context]) best_matches.append({ 'id': id, 'keyword': keyword, 'matched_phrase': context_phrase, 'similarity_score': similarity_score }) # sort by similarity score in descending order best_matches.sort(key=lambda x: x['similarity_score'], reverse=True) return best_matches
[docs] def search_df_for_best_matches(input_df: pd.DataFrame, id_column_name: str, text_column_name: str, keyword: str, threshold: int = 70, context_words: int = 3) -> pd.DataFrame | None: """ Function that searches df for best matches. This function iterates through the input_df, uses the id and content columns to search for the given keyword. If multiple matches are found, they are aggregated into one cell (using ';;; ' as separator). Args: input_df: expected to have 2 columns, i.e., 'id' and 'content' (exact naming may differ) id_column_name: name of the identifying column (e.g., filename) text_column_name: name of the column holding the relevant text keyword: one or multiple word input of interest threshold: for similarity search between input and keyword context_words: get surrounding context of -x and +x words Returns: pd.DataFrame: df holding the best matches per id. If multiple are found, they are aggregated into one cell (using ';;; ' as separator). """ # init empty df all_matches = pd.DataFrame() # extract id and content from input_df; clean content column input_df = input_df[[id_column_name, text_column_name]].copy() input_df.loc[:, text_column_name] = input_df[text_column_name].astype(str).apply( lambda x: clean(x, lowercase=True, dashes=True)) for id, content in input_df.itertuples(index=False): # per ID, find best matches + corresponding scores for given keyword best_match = find_best_matches(id=id, content=content, keyword=keyword, threshold=threshold, context_words=context_words) best_match = pd.DataFrame(best_match) # concat to main df, holding best matches for all IDs all_matches = pd.concat([all_matches, best_match], axis=0) if len(all_matches) < 1: logger.info(f"No matches for '{keyword}' at similarity threshold of {threshold} found.") return None # long format with 1 entry per id is preferred # if multiple entries for one id, they are joined into one cell wider_df = all_matches.pivot_table(index='id', columns='keyword', values='matched_phrase', aggfunc=lambda x: ' ;;; '.join(x)) return wider_df
[docs] def search_best_matches_dict(input_df: pd.DataFrame, id_column_name: str, text_column_name: str, keyword_dict: dict, threshold: int, context_words: int): """ Function that enables fuzzy search with keyword_dictionary input. This function iterates through the input_df, uses the id and content columns to search all occurences of the keywords in the keyword_dict. If multiple matches are found, they are aggregated into one cell (using ';;; ' as separator). Args: input_df: expected to have 2 columns, i.e., 'id' and 'content' (exact naming may differ) id_column_name: name of the identifying column (e.g., filename) text_column_name: name of the column holding the relevant text keyword_dict: dictionary of relevant keywords threshold: for similarity search between input and keyword context_words: get surrounding context of -x and +x words Returns: pd.DataFrame: df holding the best matches per id. If multiple are found, they are aggregated into one cell (using ';;; ' as separator). """ results = pd.DataFrame() # store results for each key for key, keywords in keyword_dict.items(): combined_df = pd.DataFrame() # store results for the current key for keyword in keywords: try: df = search_df_for_best_matches(input_df, id_column_name, text_column_name, keyword, threshold, context_words) # call existing function if len(df) > 0: df = df.rename(columns={keyword: 'contextualised_keyword'}) df['actual_keyword'] = keyword df['category'] = key df[id_column_name] = df.index combined_df = pd.concat([combined_df, df], ignore_index=True) except TypeError: pass # check if there are any results for the current key if len(combined_df) > 0: results = pd.concat([results, combined_df], ignore_index=True) results = results.reset_index(drop=True) return results
[docs] def search_df_for_best_matches_keyword_dict(input_df: pd.DataFrame, id_column_name: str, text_column_name: str, keyword_dict: dict, default_threshold: int = 70, context_words: int = 3, boolean_output: bool = True): """ Wrapper function to search for multiple keywords in a df. This function is a wrapper around search_df_for_best_matches() and search_best_matches_dict(). It enables fuzzy search with a keyword_dictionary input. Args: input_df: expected to have 2 columns, i.e., 'id' and 'content' (exact naming may differ) id_column_name: name of the identifying column (e.g., filename) text_column_name: name of the column holding the relevant text keyword_dict: dict of keywords to be searched for default_threshold: for similarity search between input and keyword context_words: get surrounding context of -x and +x words boolean_output: defaults to True; if True, df is returned with booleans instead of strings Returns: all_matches: df holding the best matches per id. If multiple are found, they are aggregated into one cell (using ';;; ' as separator). """ all_matches = pd.DataFrame("", columns=list(keyword_dict.keys()), index=input_df.index ) all_matches = all_matches.reset_index() for main_keyword in keyword_dict: searchable_keywords = keyword_dict[main_keyword]["keywords"] # take threshold if exists, else use default threshold = keyword_dict[main_keyword].get("threshold", default_threshold) for searchable_keyword in searchable_keywords: result_df = search_df_for_best_matches(input_df=input_df, id_column_name=id_column_name, text_column_name=text_column_name, keyword=searchable_keyword, threshold=threshold, context_words=context_words) # rename to matched_phrase if result_df.empty: logger.warning(f"No matches found for sub keyword {searchable_keyword}") continue result_df.columns = ["matched_phrase"] result_df = result_df.reset_index() # append each entry to the corresponding column and row for row in result_df.itertuples(index=False): all_matches.loc[row.id, main_keyword] = ';;;' + row.matched_phrase # remove leading ';;;' all_matches[main_keyword] = all_matches[main_keyword].str[3:] if boolean_output: boolean_df = all_matches.iloc[:, 1:len(all_matches)] != '' return pd.concat([all_matches.iloc[:, 0], boolean_df], axis=1) return all_matches