class MedsAllergiesAnnotator(Annotator):
"""
Annotator class for medication and allergy concepts.
This class extends the `Annotator` base class and provides methods for running a pipeline of
annotation tasks on a given note, as well as validating and converting concepts related to
medications and allergies.
Attributes:
valid_meds (List[int]): A list of valid medication IDs.
reactions_subset_lookup (Dict[int, str]): A dictionary mapping reaction IDs to their corresponding subset IDs.
allergens_subset_lookup (Dict[int, str]): A dictionary mapping allergen IDs to their corresponding subset IDs.
allergy_type_lookup (Dict[str, List[str]]): A dictionary mapping allergen types to their corresponding codes.
vtm_to_vmp_lookup (Dict[str, str]): A dictionary mapping VTM (Virtual Therapeutic Moiety) IDs to VMP (Virtual Medicinal Product) IDs.
vtm_to_text_lookup (Dict[str, str]): A dictionary mapping VTM IDs to their corresponding text.
"""
def __init__(self, cat: CAT, config: AnnotatorConfig = None):
super().__init__(cat, config)
self._load_med_allergy_lookup_data()
@property
def concept_types(self) -> List[Category]:
"""
Returns a list of concept types.
Returns:
[Category.MEDICATION, Category.ALLERGY, Category.REACTION]
"""
return [Category.MEDICATION, Category.ALLERGY, Category.REACTION]
@property
def pipeline(self) -> List[str]:
"""
Returns a list of annotators in the pipeline.
The annotators are executed in the order they appear in the list.
Returns:
["preprocessor", "medcat", "list_cleaner", "paragrapher", "postprocessor", "dosage_extractor", "vtm_converter", "deduplicator"]
"""
return [
"preprocessor",
"medcat",
"list_cleaner",
"paragrapher",
"postprocessor",
"dosage_extractor",
"vtm_converter",
"deduplicator",
]
def run_pipeline(
self,
note: Note,
record_concepts: Optional[List[Concept]] = None,
dosage_extractor: Optional[DosageExtractor] = None,
) -> List[Concept]:
"""
Runs the annotation pipeline on the given note.
Args:
note (Note): The input note to run the pipeline on.
record_concepts (Optional[List[Concept]]): The list of previously recorded concepts.
dosage_extractor (Optional[DosageExtractor]): The dosage extractor function.
Returns:
The list of annotated concepts.
"""
concepts: List[Concept] = []
for pipe in self.pipeline:
if pipe not in self.config.disable:
if pipe == "preprocessor":
note = self.preprocess(note=note)
elif pipe == "medcat":
concepts = self.get_concepts(note=note)
elif pipe == "list_cleaner":
concepts = self.filter_concepts_in_numbered_list(concepts=concepts, note=note)
elif pipe == "paragrapher":
concepts = self.process_paragraphs(note=note, concepts=concepts)
elif pipe == "postprocessor":
concepts = self.postprocess(concepts=concepts, note=note)
elif pipe == "deduplicator":
concepts = self.deduplicate(concepts=concepts, record_concepts=record_concepts)
elif pipe == "vtm_converter":
concepts = self.convert_VTM_to_VMP_or_text(concepts=concepts)
elif pipe == "dosage_extractor" and dosage_extractor is not None:
concepts = self.add_dosages_to_concepts(
dosage_extractor=dosage_extractor, concepts=concepts, note=note
)
return concepts
def _load_med_allergy_lookup_data(self) -> None:
"""
Loads the medication and allergy lookup data.
"""
self.valid_meds = load_lookup_data(
self.lookup_data_path + "valid_meds.csv", is_package_data=self.use_package_data, no_header=True
)
self.reactions_subset_lookup = load_lookup_data(
self.lookup_data_path + "reactions_subset.csv", is_package_data=self.use_package_data, as_dict=True
)
self.allergens_subset_lookup = load_lookup_data(
self.lookup_data_path + "allergens_subset.csv", is_package_data=self.use_package_data, as_dict=True
)
self.allergy_type_lookup = load_allergy_type_combinations(
self.lookup_data_path + "allergy_type.csv", is_package_data=self.use_package_data
)
self.vtm_to_vmp_lookup = load_lookup_data(
self.lookup_data_path + "vtm_to_vmp.csv", is_package_data=self.use_package_data
)
self.vtm_to_text_lookup = load_lookup_data(
self.lookup_data_path + "vtm_to_text.csv", is_package_data=self.use_package_data, as_dict=True
)
def _validate_meds(self, concept) -> bool:
"""
Validates if the concept is a valid medication.
Args:
concept: The concept to validate.
Returns:
True if the concept is a valid medication, False otherwise.
"""
# check if substance is valid med
if int(concept.id) in self.valid_meds.values:
return True
return False
def _validate_and_convert_substance(self, concept) -> bool:
"""
Validates and converts a substance concept for allergy.
Args:
concept: The substance concept to be validated and converted.
Returns:
True if the substance is valid and converted successfully, False otherwise.
"""
# check if substance is valid substance for allergy - if it is, convert it to Epic subset and return that concept
lookup_result = self.allergens_subset_lookup.get(int(concept.id))
if lookup_result is not None:
log.debug(
f"Converted concept ({concept.id} | {concept.name}) to "
f"({lookup_result['subsetId']} | {concept.name}): valid Epic allergen subset"
)
concept.id = str(lookup_result["subsetId"])
# then check the allergen type from lookup result - e.g. drug, food
try:
concept.category = AllergenType(str(lookup_result["allergenType"]).lower())
log.debug(
f"Assigned substance concept ({concept.id} | {concept.name}) "
f"to allergen type category {concept.category}"
)
except ValueError as e:
log.warning(f"Allergen type not found for {concept.__str__()}: {e}")
return True
else:
log.warning(f"No lookup subset found for substance ({concept.id} | {concept.name})")
return False
def _validate_and_convert_reaction(self, concept) -> bool:
"""
Validates and converts a reaction concept to the Epic subset.
Args:
concept: The concept to be validated and converted.
Returns:
True if the concept is a valid reaction and successfully converted to the Epic subset,
False otherwise.
"""
# check if substance is valid reaction - if it is, convert it to Epic subset and return that concept
lookup_result = self.reactions_subset_lookup.get(int(concept.id), None)
if lookup_result is not None:
log.debug(
f"Converted concept ({concept.id} | {concept.name}) to "
f"({lookup_result} | {concept.name}): valid Epic reaction subset"
)
concept.id = str(lookup_result)
return True
else:
log.warning(f"Reaction not found in Epic subset conversion for concept {concept.__str__()}")
return False
def _validate_and_convert_concepts(self, concept: Concept) -> Concept:
"""
Validates and converts the given concept based on its metadata annotations.
Args:
concept (Concept): The concept to be validated and converted.
Returns:
The validated and converted concept.
"""
meta_ann_values = [meta_ann.value for meta_ann in concept.meta] if concept.meta is not None else []
# assign categories
if SubstanceCategory.ADVERSE_REACTION in meta_ann_values:
if self._validate_and_convert_substance(concept):
self._convert_allergy_type_to_code(concept)
self._convert_allergy_severity_to_code(concept)
concept.category = Category.ALLERGY
else:
log.warning(f"Double-checking if concept ({concept.id} | {concept.name}) is in reaction subset")
if self._validate_and_convert_reaction(concept) and (
ReactionPos.BEFORE_SUBSTANCE in meta_ann_values or ReactionPos.AFTER_SUBSTANCE in meta_ann_values
):
concept.category = Category.REACTION
else:
log.warning(
f"Reaction concept ({concept.id} | {concept.name}) not in subset or reaction_pos is NOT_REACTION"
)
if SubstanceCategory.TAKING in meta_ann_values:
if self._validate_meds(concept):
concept.category = Category.MEDICATION
if SubstanceCategory.NOT_SUBSTANCE in meta_ann_values and (
ReactionPos.BEFORE_SUBSTANCE in meta_ann_values or ReactionPos.AFTER_SUBSTANCE in meta_ann_values
):
if self._validate_and_convert_reaction(concept):
concept.category = Category.REACTION
return concept
@staticmethod
def _link_reactions_to_allergens(concept_list: List[Concept], note: Note, link_distance: int = 5) -> List[Concept]:
"""
Links reaction concepts to allergen concepts based on their proximity in the given concept list.
Args:
concept_list (List[Concept]): The list of concepts to search for reaction and allergen concepts.
note (Note): The note object containing the text.
link_distance (int, optional): The maximum distance between a reaction and an allergen to be considered linked.
Defaults to 5.
Returns:
The updated concept list with reaction concepts removed and linked to their corresponding allergen concepts.
"""
allergy_concepts = [concept for concept in concept_list if concept.category == Category.ALLERGY]
reaction_concepts = [concept for concept in concept_list if concept.category == Category.REACTION]
for reaction_concept in reaction_concepts:
nearest_allergy_concept = None
min_distance = inf
meta_ann_values = (
[meta_ann.value for meta_ann in reaction_concept.meta] if reaction_concept.meta is not None else []
)
for allergy_concept in allergy_concepts:
# skip if allergy is after and meta is before_substance
if ReactionPos.BEFORE_SUBSTANCE in meta_ann_values and allergy_concept.start < reaction_concept.start:
continue
# skip if allergy is before and meta is after_substance
elif ReactionPos.AFTER_SUBSTANCE in meta_ann_values and allergy_concept.start > reaction_concept.start:
continue
else:
distance = calculate_word_distance(
reaction_concept.start, reaction_concept.end, allergy_concept.start, allergy_concept.end, note
)
log.debug(
f"Calculated distance between reaction {reaction_concept.name} "
f"and allergen {allergy_concept.name}: {distance}"
)
if distance == -1:
log.warning(
f"Indices for {reaction_concept.name} or {allergy_concept.name} invalid: "
f"({reaction_concept.start}, {reaction_concept.end})"
f"({allergy_concept.start}, {allergy_concept.end})"
)
continue
if distance <= link_distance and distance < min_distance:
min_distance = distance
nearest_allergy_concept = allergy_concept
if nearest_allergy_concept is not None:
nearest_allergy_concept.linked_concepts.append(reaction_concept)
log.debug(
f"Linked reaction concept {reaction_concept.name} to "
f"allergen concept {nearest_allergy_concept.name}"
)
# Remove the linked REACTION concepts from the main list
updated_concept_list = [concept for concept in concept_list if concept.category != Category.REACTION]
return updated_concept_list
@staticmethod
def _convert_allergy_severity_to_code(concept: Concept) -> bool:
"""
Converts allergy severity to corresponding codes and links them to the concept.
Args:
concept (Concept): The concept to convert severity for.
Returns:
True if the conversion is successful, False otherwise.
"""
meta_ann_values = [meta_ann.value for meta_ann in concept.meta] if concept.meta is not None else []
if Severity.MILD in meta_ann_values:
concept.linked_concepts.append(Concept(id="L", name="Low", category=Category.SEVERITY))
elif Severity.MODERATE in meta_ann_values:
concept.linked_concepts.append(Concept(id="M", name="Moderate", category=Category.SEVERITY))
elif Severity.SEVERE in meta_ann_values:
concept.linked_concepts.append(Concept(id="H", name="High", category=Category.SEVERITY))
elif Severity.UNSPECIFIED in meta_ann_values:
return True
else:
log.warning(f"No severity annotation associated with ({concept.id} | {concept.name})")
return False
log.debug(
f"Linked severity concept ({concept.linked_concepts[-1].id} | {concept.linked_concepts[-1].name}) "
f"to allergen concept ({concept.id} | {concept.name}): valid meta model output"
)
return True
def _convert_allergy_type_to_code(self, concept: Concept) -> bool:
"""
Converts the allergy type of a concept to a code and adds it as a linked concept.
Args:
concept (Concept): The concept whose allergy type needs to be converted.
Returns:
True if the conversion and linking were successful, False otherwise.
"""
# get the ALLERGYTYPE meta-annotation
allergy_type = [meta_ann for meta_ann in concept.meta if meta_ann.name == "allergy_type"]
if len(allergy_type) != 1:
log.warning(
f"Unable to map allergy type code: allergy_type meta-annotation "
f"not found for concept {concept.__str__()}"
)
return False
else:
allergy_type = allergy_type[0].value
# perform lookup with ALLERGYTYPE and AllergenType combination
lookup_combination: Tuple[str, str] = (concept.category.value, allergy_type.value)
allergy_type_lookup_result = self.allergy_type_lookup.get(lookup_combination)
# add resulting allergy type concept as to linked_concept
if allergy_type_lookup_result is not None:
concept.linked_concepts.append(
Concept(
id=str(allergy_type_lookup_result[0]),
name=allergy_type_lookup_result[1],
category=Category.ALLERGY_TYPE,
)
)
log.debug(
f"Linked allergy_type concept ({allergy_type_lookup_result[0]} | {allergy_type_lookup_result[1]})"
f" to allergen concept ({concept.id} | {concept.name}): valid meta model output + allergytype lookup"
)
else:
log.warning(f"Allergen and adverse reaction type combination not found: {lookup_combination}")
return True
def _process_meta_ann_by_paragraph(self, concept: Concept, paragraph: Paragraph):
"""
Process the meta annotations for a given concept and paragraph.
Args:
concept (Concept): The concept object.
paragraph (Paragraph): The paragraph object.
Returns:
None
"""
# if paragraph is structured meds to convert to corresponding relevance
if paragraph.type in self.structured_med_lists:
for meta in concept.meta:
if meta.name == "substance_category" and meta.value in [
SubstanceCategory.TAKING,
SubstanceCategory.IRRELEVANT,
]:
new_relevance = self.structured_med_lists[paragraph.type]
if meta.value != new_relevance:
log.debug(
f"Converted {meta.value} to "
f"{new_relevance} for concept ({concept.id} | {concept.name}): "
f"paragraph is {paragraph.type}"
)
meta.value = new_relevance
# if paragraph is probs or irrelevant section, convert substance to irrelevant
elif paragraph.type in self.structured_prob_lists or paragraph.type in self.irrelevant_paragraphs:
for meta in concept.meta:
if meta.name == "substance_category" and meta.value != SubstanceCategory.IRRELEVANT:
log.debug(
f"Converted {meta.value} to "
f"{SubstanceCategory.IRRELEVANT} for concept ({concept.id} | {concept.name}): "
f"paragraph is {paragraph.type}"
)
meta.value = SubstanceCategory.IRRELEVANT
def process_paragraphs(self, note: Note, concepts: List[Concept]) -> List[Concept]:
"""
Process the paragraphs in a note and update the list of concepts.
Args:
note (Note): The note object containing the paragraphs.
concepts (List[Concept]): The list of concepts to be updated.
Returns:
The updated list of concepts.
"""
for paragraph in note.paragraphs:
for concept in concepts:
if concept.start >= paragraph.start and concept.end <= paragraph.end:
# log.debug(f"({concept.name} | {concept.id}) is in {paragraph.type}")
if concept.meta:
self._process_meta_ann_by_paragraph(concept, paragraph)
return concepts
def postprocess(self, concepts: List[Concept], note: Note) -> List[Concept]:
"""
Postprocesses a list of concepts and links reactions to allergens.
Args:
concepts (List[Concept]): The list of concepts to be postprocessed.
note (Note): The note object associated with the concepts.
Returns:
The postprocessed list of concepts.
"""
# deepcopy so we still have reference to original list of concepts
all_concepts = deepcopy(concepts)
processed_concepts = []
for concept in all_concepts:
concept = self._validate_and_convert_concepts(concept)
processed_concepts.append(concept)
processed_concepts = self._link_reactions_to_allergens(processed_concepts, note)
return processed_concepts
def convert_VTM_to_VMP_or_text(self, concepts: List[Concept]) -> List[Concept]:
"""
Converts medication concepts from VTM (Virtual Therapeutic Moiety) to VMP (Virtual Medicinal Product) or text.
Args:
concepts (List[Concept]): A list of medication concepts.
Returns:
A list of medication concepts with updated IDs, names, and dosages.
"""
# Get medication concepts
med_concepts = [concept for concept in concepts if concept.category == Category.MEDICATION]
self.vtm_to_vmp_lookup["dose"] = self.vtm_to_vmp_lookup["dose"].astype(float)
med_concepts_with_dose = []
# I don't know man...Need to improve dosage methods
for concept in med_concepts:
if concept.dosage is not None:
if concept.dosage.dose:
if concept.dosage.dose.value is not None and concept.dosage.dose.unit is not None:
med_concepts_with_dose.append(concept)
med_concepts_no_dose = [concept for concept in concepts if concept not in med_concepts_with_dose]
# Create a temporary DataFrame to match vtmId, dose, and unit
temp_df = pd.DataFrame(
{
"vtmId": [int(concept.id) for concept in med_concepts_with_dose],
"dose": [float(concept.dosage.dose.value) for concept in med_concepts_with_dose],
"unit": [concept.dosage.dose.unit for concept in med_concepts_with_dose],
}
)
# Merge with the lookup df to get vmpId
merged_df = temp_df.merge(self.vtm_to_vmp_lookup, on=["vtmId", "dose", "unit"], how="left")
# Update id in the concepts list
for index, concept in enumerate(med_concepts_with_dose):
# Convert VTM to VMP id
vmp_id = merged_df.at[index, "vmpId"]
if not pd.isna(vmp_id):
log.debug(
f"Converted ({concept.id} | {concept.name}) to "
f"({int(vmp_id)} | {concept.name + ' ' + str(int(concept.dosage.dose.value)) + concept.dosage.dose.unit} "
f"tablets): valid extracted dosage + VMP lookup"
)
concept.id = str(int(vmp_id))
concept.name += " " + str(int(concept.dosage.dose.value)) + str(concept.dosage.dose.unit) + " tablets"
# If found VMP match change the dosage to 1 tablet
concept.dosage.dose.value = 1
concept.dosage.dose.unit = "{tbl}"
else:
# If no match with dose convert to text
lookup_result = self.vtm_to_text_lookup.get(int(concept.id))
if lookup_result is not None:
log.debug(
f"Converted ({concept.id} | {concept.name}) to (None | {lookup_result}: no match to VMP dosage lookup)"
)
concept.id = None
concept.name = lookup_result
# Convert rest of VTMs that have no dose for VMP conversion to text
for concept in med_concepts_no_dose:
lookup_result = self.vtm_to_text_lookup.get(int(concept.id))
if lookup_result is not None:
log.debug(f"Converted ({concept.id} | {concept.name}) to (None | {lookup_result}): no dosage detected")
concept.id = None
concept.name = lookup_result
return concepts