Source code for src.data_pipeline.rplan_content_extraction.rplan_content_extractor

import os
import re

import pandas as pd
from loguru import logger

from data_pipeline.rplan_content_extraction.rplan_utils import RPLAN_OUTPUT_PATH, RPLAN_TXT_DIR, \
    RPLAN_PDF_DIR, CONFIG_FILE_PATH, parse_result_df, extract_text_and_save_to_txt_files
from utility.config_utils import read_yaml


[docs] class RPlanContentExtractor: def __init__(self, rplan_config): """Initializes the RPlanContentExtractor class. Args: rplan_config: the rplan config as dictionary, It contains different format dictionary's e.g. format1 and format2. Each format dictionary contains the following keys: - chapter_marker: the marker for the chapters as string, has to be in a regex format, e.g. Kapitel \d - target_marker: the marker for the targets as string, has to be in a regex format, e.g. Ziel \d\n: - principle_marker: the marker for the principles as string, has to be in a regex format, e.g. Grundsatz \d\n: - explanation_marker: the marker for the explanations as string, has to be in a regex format, e.g. Erläuterung - file_names: list of file names as strings, e.g. ['rplan_2019_2024', 'rplan_2020_2025'] To find the correct format for a rplan, the file name is compared to the file_names list. If the file name is in the file_names list, the format is used for the extraction. If the file name is not in the file_names list, a ValueError is raised. """ self.rplan_config = rplan_config
[docs] def parse_rplan_from_textfile(self, txt_path: str) -> pd.DataFrame: """ Parses a rplan textfile into a dataframe with columns chapter and section This method extracts the chapters, targets, principles and explanations from a rplan textfile. The extraction is based on the rplan_config.yml file, where regular expressions are given for each rplan and the specific task. The textfile is preprocessed before the extraction, e.g. lowered, removal of newlines. The chapters are extracted from the first 10% of the textfile, as the chapters are usually listed at the beginning. The chapters are then used to assign each section to a chapter. Args: txt_path: path to the rplan textfile Returns: sections_df: dataframe with columns filename, chaptername and section """ # read textfile filename_without_ext, txt = self.read_text(txt_path) cfg = self._get_format_config(filename_without_ext, self.rplan_config) toc_marker = self._find_toc_marker(filename_without_ext, txt) # # preprocess text txt = self.preprocess_rplan_content(txt) chapter_names, txt = self.extract_chapter_names(txt, cfg, toc_end_index=toc_marker) # split into sections sections_df = self.parse_into_sections(txt, cfg, chapter_names) sections_df['filename'] = filename_without_ext # reorder columns sections_df = sections_df[['filename', 'chapter', 'section', 'section_type']] return sections_df
def _find_toc_marker(self, filename_without_ext, txt): """Finds the table of contents marker in the textfile. The marker is defined in the rplan_config.yml file. Args: filename_without_ext: the filename without extension txt: the rplan content as string Returns: toc_marker: the index of the table of contents marker """ # get TOC marker last_dict, keyword = self.rplan_config['toc_marker'][filename_without_ext] if last_dict["last"]: toc_marker = txt.rfind(keyword) else: toc_marker = txt.find(keyword) if toc_marker == -1: raise ValueError(f"TOC Marker not found in file {filename_without_ext}") return toc_marker
[docs] def read_text(self, txt_path): with open(txt_path, 'r',encoding='utf8') as f: txt = f.read() filename = os.path.basename(txt_path) filename_without_ext = filename[:filename.rindex('.')] return filename_without_ext, txt
[docs] def preprocess_rplan_content(self, content: str): """Preprocesses the rplan content This method preprocesses the rplan content by removing all whitespaces, newlines and special characters. Args: content: the rplan content as string Returns: content: the preprocessed rplan content as string """ # replace   with space content = content.replace('\xa0', ' ') # convert all multiple whitespaces to single whitespaces content = re.sub(' +', ' ', content) # remove linebreaks if line contains only whitespace content = '\n'.join([line for line in content.split('\n') if line.strip() != '']) # remove double linebreaks content = content.replace('\n\n', '\n') # remove linebreak if line contains only 2 or fewer characters content = '\n'.join([line for line in content.split('\n') if len(line) > 2]) # remove all spaces before or after a newline content = content.replace(' \n', '\n').replace('\n ', '\n') # remove annoying chars chars_to_remove = ['-\n', '–\n', '—\n', '-\n', '- \n', '-', '–', '—', '•', '·', '●', '○', '▪', '▫', '□', '■', '□', '\t'] for char in chars_to_remove: content = content.replace(char, '') return content.lower()
def _find_indices_by_marker(self, content: str, marker: str) -> list: """Finds all indices in the content where the marker is located. This method finds all indices in the content where the marker is located. The marker is usually a word followed by a number, e.g. Ziel 1: or Grundsatz 1:. Unwanted indices are removed, e.g. if the marker is in the middle of a word. Args: content: the rplan content as string marker: the marker as string, has to be in a regex format, e.g. Ziel 1\n: or Grundsatz 1\n: markers can be found in the rplan_config.yml file Returns: indices: list of indices where the marker is located """ # find positions of all marker followed by a number if content is None: return [] indices = [m.start() for m in re.finditer(marker, content, flags=re.I)] for i, index in enumerate(indices): # check if the previous character is a whitespace, newline or newline followed by a whitespace if content[index - 1] not in [' ', '\n', '\n ']: # if not, remove index indices.remove(index) # try to remove indices where the marker is in the text indices = self._filter_unwanted_prefixes(content, indices) return indices def _find_explanation_indices(self, content: str, marker: str) -> list: """ Finds all indices in the content where the marker is located. This method finds all indices in the content where the marker is located. Here, the marker is usually a word "Erläuterung". Unwanted indices are removed, e.g. if the marker is in the middle of a word. Args: content: the rplan content as string marker: the marker as string, has to be in a regex format, e.g. Erläuterung markers can be found in the rplan_config.yml file Returns: indices: list of indices where the marker is located """ if content is None: return [] explanation_indices = [m.start() for m in re.finditer(marker, content, flags=re.I)] explanation_indices = self._filter_unwanted_prefixes(content, explanation_indices) return explanation_indices def _filter_unwanted_prefixes(self, content, indices, unwanted_prefixes=None): """Removes indices where the marker right after a conjunction, e.g. "oder" or "und". This method removes indices where the marker right after a conjunction, e.g. "oder" or "und". Args: content: the rplan content as string indices: list of indices where the marker is located unwanted_prefixes: list of prefixes that should be removed, e.g. ["oder", "und"] Returns: indices: list of indices where the marker is located """ if unwanted_prefixes is None: unwanted_prefixes = ['zu', 'oder', 'und', 'nach'] for index in indices: for prefix in unwanted_prefixes: if prefix in content[index - (len(prefix) + 3):index]: indices.remove(index) return indices
[docs] def parse_into_sections(self, txt: str, cfg: dict, chapter_names: list) -> pd.DataFrame: """Parses the rplan content into sections. This method parses the rplan content into sections. The sections are the targets, principles and explanations. The indices of the sections are found by the markers, which are defined in the rplan_config.yml file. The chapters are used to assign each section to a chapter. Args: txt: the rplan content as string cfg: the rplan config as dictionary, for keys of the dict see init method chapter_names: list of chapter names as strings Returns: result_df: dataframe with columns chapter and section """ indices, section_types = self._get_indices(cfg, txt) # assign chapter name to index closest_chapter_names = self.find_chapter_name_for_indices(indices, chapter_names, txt) closest_chapter_names = closest_chapter_names[1:] # remove start section sections = [txt[indices[i]:indices[i + 1]] for i in range(len(indices) - 1)] result_df = pd.DataFrame({'chapter': closest_chapter_names, 'section': sections, 'section_type': section_types}) return result_df
[docs] def find_chapter_name_for_indices(self, indices, chapter_names, txt): """Finds the chapter name for each index.""" closest_chapter_names = [] for index in indices: closest_chapter_names.append(self._find_closest_chapter_name(index, chapter_names, txt)) return closest_chapter_names
def _get_indices(self, indices_cfg: dict, txt: str): """Gets the indices for the targets, principles and explanations.""" # get indices principles_indices = self._find_indices_by_marker(txt, marker=indices_cfg['principle_marker']) target_indices = self._find_indices_by_marker(txt, marker=indices_cfg['target_marker']) explanation_indices = self._find_explanation_indices(txt, marker=indices_cfg['explanation_marker']) section_type = ["principle"] * len(principles_indices) + ["target"] * len(target_indices) + [ "explanation"] * len(explanation_indices) # combine indices combined_indices = principles_indices + target_indices + explanation_indices # sort sectiontypes based on indices sorted_section_type = [x for _, x in sorted(zip(combined_indices, section_type))] sorted_section_type = ['start'] + sorted_section_type # add start section that has no type indices = sorted(combined_indices) # add start and end index indices = [0] + indices + [len(txt)] return indices, sorted_section_type def _get_format_config(self, filename, indices_cfg): """Gets the format config for the given filename.""" for format_key, format_cfg in indices_cfg.items(): if "file_names" in format_cfg.keys() and filename in format_cfg["file_names"]: indices_cfg = format_cfg return indices_cfg raise ValueError(f"Format for file {filename} not found, maybe it's not in the config file?")
[docs] def extract_chapter_names(self, txt, cfg, margin: float = 0.1, toc_end_index: int = None): """Extracts the chapter names from the textfile. This method extracts the chapter names from the textfile. The chapter names are usually listed at the beginning of the textfile, therefore the margin. The chapter names are used to assign each section to a chapter. Args: txt: the rplan content as string cfg: the rplan config as dictionary, for keys of the dict see init method margin: the margin as float, the chapter names are extracted from the first margin% of the textfile. Not used if toc_end_index is specified toc_end_index: the index where the table of contents ends, if None, the margin is used Returns: chapter_names: list of chapter names as strings txt: the rplan content as string """ if toc_end_index: starting_text = txt[:toc_end_index] else: starting_text = txt[:int(len(txt) * margin)] chapter_marker = cfg['chapter_marker'] complete_chapternames = [line for line in starting_text.split('\n') if re.match(chapter_marker, line)] # remove all numbers and dots from the chapters chapter_names = [re.sub('\d', '', chapter) for chapter in complete_chapternames] chapter_names = [re.sub('\.', '', chapter) for chapter in chapter_names] # remove trailing whitespaces chapter_names = [chapter.strip() for chapter in chapter_names] # remove empty chapters chapter_names = [chapter for chapter in chapter_names if chapter != ''] # remove complete chapter names that are not in the chapters chapter_names = list(dict.fromkeys(chapter_names)) # remove doubles txt = txt[txt.find(chapter_names[-1], 1) + len(chapter_names[-1]):] return chapter_names, txt
def _find_closest_chapter_name(self, index, chapter_names, txt): """Finds the closest chapter name for a given index.""" closest_chapter_name = "" closest_position = -1 # set to high number for i, chapter_name in enumerate(chapter_names): tmp_position = txt.rfind(chapter_name, 0, index) # if chapter not found -1 is returned and always smaller than closest_position if tmp_position > closest_position: # chapter name found and closer than previous chapter name closest_chapter_name = chapter_name closest_position = tmp_position return closest_chapter_name
[docs] def parse_rplan_directory(txt_dir_path: str, json_output_path: str = None): """Parses a directory with rplan textfiles into a dataframe with columns chapter and section This method extracts the chapters, targets, principles and explanations from a rplan textfile. The extraction is based on the rplan_config.yml file, where regular expressions are given for each rplan and the specific task. The textfile is preprocessed before the extraction, e.g. lowered, removal of newlines. The chapters are extracted from the first 10% of the textfile, as the chapters are usually listed at the beginning. The chapters are then used to assign each section to a chapter. The dataframe is then saved to a json file. Args: txt_dir_path: path to the directory with the rplan textfiles json_output_path: path to the output json file Returns: sections_df: dataframe with columns filename, chapter and section """ cfg = read_yaml(CONFIG_FILE_PATH) rplan_content_extractor = RPlanContentExtractor(cfg) df_list = [] # iterate over all files in folder for filename in os.listdir(txt_dir_path): txt_path = os.path.join(txt_dir_path, filename) logger.debug(txt_path) if os.path.isfile(txt_path): logger.debug(f"Processing file {txt_path}") try: result_df = rplan_content_extractor.parse_rplan_from_textfile(txt_path) except ValueError as e: logger.error(f"Skipping file {txt_path} due to error {e}") continue df_list.append(result_df) else: logger.warning(f"Skipping file {txt_path} as it is not a file") result_df = pd.concat(df_list).reset_index(drop=True) if json_output_path is not None: # save df as JSON result_df.to_json(json_output_path) logger.info(f"Parsing done. Saved to {json_output_path}") return result_df
def parse_pdf_dir(): """ Parses the rplan pdfs in the rplan pdf directory and saves the result as json file. The file paths are specified in the rplan_utils.py file. """ extract_text_and_save_to_txt_files(pdf_dir_path=RPLAN_PDF_DIR) result_df = parse_rplan_directory(txt_dir_path=RPLAN_TXT_DIR, json_output_path=RPLAN_OUTPUT_PATH) result_df = parse_result_df(df=result_df) # save df as JSON result_df.to_json(RPLAN_OUTPUT_PATH) logger.info(f"Parsing done. Saved to {RPLAN_OUTPUT_PATH}") if __name__ == '__main__': result_df = parse_rplan_directory(txt_dir_path=RPLAN_TXT_DIR, ) # df = parse_result_df(df=result_df)