Source code for querynator.query_api.civic_api

""" Query the Clinical Interpretations of Variants In Cancer (CIViC) API via its python tool CIViCPY """

import warnings

warnings.simplefilter(action="ignore", category=FutureWarning)

import os
import random
from datetime import date

import civicpy
import numpy as np
import pandas as pd
import vcf
from civicpy import civic

from querynator.helper_functions import (
    get_num_from_chr,
    gunzip_compressed_files,
    gzipped,
)

# load the civic cache (necessary for bulk run)
civic.load_cache()


[docs]def check_vcf_input(vcf_path, logger): """ Checks whether input is vcf-file with all necessary columns. :param vcf_path: Variant Call Format (VCF) file (Version >= 4.0) :type vcf_path: str :return: None """ header = False needed_cols = ["chrom", "pos", "ref", "alt"] if not vcf_path.endswith(".vcf"): logger.error("Given File does not end with '.vcf'") exit(1) else: for line in open(vcf_path, "r"): if line.startswith("#") and not line.startswith("##"): header = True missing_cols = [i for i in needed_cols if i not in line.lower()] if len(missing_cols) != 0: logger.error(f"vcf file is missing crucial columns: {', '.join(missing_cols).upper()}") exit(1) if not header: logger.error(f"vcf file requires header column!") exit(1)
[docs]def vcf_file(vcf_path): """ Checks whether input is vcf-file. :param vcf_path: Variant Call Format (VCF) file (Version 4.2) :type vcf_path: str :return: None """ if vcf_path.endswith(".vcf") or vcf_path.endswith(".vcf.gz"): return True else: return False
[docs]def get_coordinates_from_vcf(input, build, logger): """ Read in vcf file using "pyVCF3", creates CoordinateQuery objects for each variant. This function does find (ref-alt): SNPs (A-T) DelIns (AA-TT) Deletions (TTTCA - AT) :param input: list of pyVCF3 records or vcf file to query :type input: list or str :param build: reference genome :type build: str :return: CoordinateQuery objects :rtype: list """ if type(input) == list: variant_file = input else: if vcf_file(input): if gzipped(input): variant_file = vcf.Reader(open(gunzip_compressed_files(input, logger))) else: variant_file = vcf.Reader(open(input)) coord_dict = {} for record in variant_file: if "QID" in record.INFO.keys(): querynator_id = record.INFO["QID"] else: # filter_vep not applied and no rerun with filtered vcf, generate random QID for following steps which will not be reported in results querynator_id = random.randint(1000000, 9999999) for alt_base in record.ALT: # INSERTION if len(record.REF) < len(alt_base): coord_dict.update( { civic.CoordinateQuery( chr=get_num_from_chr(record.CHROM), start=int(record.start) + 1, stop=int(record.start) + 2, alt=str(alt_base)[1:], ref="", build=build, ): querynator_id } ) # DELETION elif len(record.REF) > len(alt_base) and len(alt_base) == 1: coord_dict.update( { civic.CoordinateQuery( chr=get_num_from_chr(record.CHROM), start=int(record.start) + 1, stop=int(record.end), alt="", ref=record.REF, build=build, ): querynator_id } ) # SNPs, DelIns else: coord_dict.update( { civic.CoordinateQuery( chr=get_num_from_chr(record.CHROM), start=int(record.start) + 1, stop=int(record.end), alt=str(alt_base), ref=record.REF, build=build, ): querynator_id } ) return coord_dict
[docs]def append_to_dict(dict1, dict2): """ appends values of a dictionary to another dictionary with lists as values :param dict1: dictionary to append to :type dict1: dict :param dict2: dictionary with values to append :type: dict2: dict :return: appended dict :rtype: dict """ for key, value in dict2.items(): if key in dict1: if type(dict1[key]) == list: dict1[key].append(value) else: if dict1[key] == "": dict1[key] = dict2[key] else: dict1[key] = [dict1[key], dict2[key]] return dict1
[docs]def smoothen_dict(dict, s): """ makes string out of lists :param dict: dict with lists as values :type dict: dict :param s: True if string and special string character needed :type s: bool :return: dict with strings as values :rtype: dict """ for key in dict: if type(dict[key]) == list: filtered_list = [i if i is not None else "" for i in dict[key]] if s: if key in ["evidence_source", "evidence_description"]: dict[key] = "|".join(str(i) for i in filtered_list) else: dict[key] = ",".join(str(i) for i in filtered_list) else: dict[key] = ",".join(str(i) for i in filtered_list) return dict
[docs]def access_civic_by_coordinate(coord_dict, logger, build): """ Query CIViC API for individual variants :param coord_list: List of CoordinateQuery objects :type coord_list: list :param build: reference genome :type build: str :return: CIViC variant objects of successfully queried variants :rtype: list """ # bulk search to quickly focus on variants found in the civic-db # time-intensive search must then only be done for variants that will be hits # only possible for GRCh37 ref genome if build == "GRCh37": bulk = civic.bulk_search_variants_by_coordinates(list(coord_dict.keys()), search_mode="exact") # reconnect coordinates that passed bulk search and respective IDs input_dict = {key: coord_dict[key] for key in bulk} else: input_dict = coord_dict # actual search for each variant variant_list = [] for coord_obj, querynator_id in input_dict.items(): variant = civic.search_variants_by_coordinates(coord_obj, search_mode="exact") if variant != None and len(variant) > 0: for variant_obj in variant: variant_list.append([{coord_obj: querynator_id}, [variant_obj]]) # break if no variants are found if variant_list == None: logger.error("No hits are found in CIViC for this vcf file") exit(1) return variant_list
[docs]def get_variant_information_from_variant(variant_obj): """ Get all variant information from a single CIViC variant object :param variant_obj: single CIViC variant object :type variant_ob: CIViC variant object :return: Variant information for respective CIViC variant object :rtype: dict """ return { "variant_name": variant_obj.name, "variant_aliases": ", ".join(variant_obj.aliases), "variant_type": ", ".join([i.name for i in variant_obj.types]), "variant_clinvar_entries": ", ".join(variant_obj.clinvar_entries), "variant_entrez_id": variant_obj.entrez_id, "variant_entrez_name": variant_obj.entrez_name, "variant_hgvs_expressions": ", ".join(variant_obj.hgvs_expressions), "variant_groups": ", ".join([i.name for i in variant_obj.variant_groups]), }
[docs]def get_molecular_profile_information_from_variant(variant_obj): """ Get all molecular profile information from a single CIViC variant object :param variant_obj: single CIViC variant object :type variant_ob: CIViC variant object :return: Molecular profile information for respective CIViC variant object :rtype: dict """ mol_profile_dict = {"mol_profile_name": "", "mol_profile_definition": "", "mol_profile_score": ""} for mol_profile in variant_obj.molecular_profiles: try: # mol_profile = variant_obj.molecular_profiles[0] new_dict = { "mol_profile_name": mol_profile.name, "mol_profile_definition": mol_profile.description, "mol_profile_score": mol_profile.molecular_profile_score, } except IndexError: new_dict = {"mol_profile_name": None, "mol_profile_definition": None, "mol_profile_score": None} mol_profile_dict = append_to_dict(mol_profile_dict, new_dict) return smoothen_dict(mol_profile_dict, False)
[docs]def get_gene_information_from_variant(variant_obj): """ Get all gene information from a single CIViC variant object :param variant_obj: single CIViC variant object :type variant_ob: CIViC variant object :return: Gene information for respective CIViC variant object :rtype: dict """ gene = variant_obj.gene return { "gene_name": gene.name, "gene_aliases": ", ".join(gene.aliases), "gene_description": gene.description, "gene_entrez_id": gene.entrez_id, "gene_source": ", ".join([i.name for i in gene.sources]), }
[docs]def get_assertion_information_from_variant(variant_obj): """ Get all assertion information from a single CIViC variant object :param variant_obj: single CIViC variant object :type variant_ob: CIViC variant object :return: Assertion information for respective CIViC variant object :rtype: dict """ assertion_dict = { "assertion_name": "", "assertion_acmg_codes": "", "assertion_acmg_codes_description": "", "assertion_amp_level": "", "assertion_direction": "", "assertion_type": "", "assertion_description": "", "assertion_disease_name": "", "assertion_disease_doid": "", "assertion_disease_url": "", "assertion_disease_aliases": "", "assertion_phenotypes": "", "assertion_significance": "", "assertion_status": "", "assertion_summary": "", "assertion_therapies_name": "", "assertion_therapies_ncit_id": "", "assertion_therapies_aliases": "", "assertion_therapies_interaction_type": "", "assertion_variant_origin": "", } for mol_prof in variant_obj.molecular_profiles: for assertion in mol_prof.assertions: try: new_dict = { "assertion_name": assertion.name, "assertion_acmg_codes": ", ".join([i.code for i in assertion.acmg_codes]), "assertion_acmg_codes_description": ", ".join([i.description for i in assertion.acmg_codes]), "assertion_amp_level": assertion.amp_level, "assertion_direction": assertion.assertion_direction, "assertion_type": assertion.assertion_type, "assertion_description": assertion.description, "assertion_disease_name": ", ".join([i.name for i in assertion.disease]), "assertion_disease_doid": ", ".join([i.doid for i in assertion.disease]), "assertion_disease_url": ", ".join([i.disease_url for i in assertion.disease]), "assertion_disease_aliases": ", ".join([i.aliases for i in assertion.disease]), "assertion_phenotypes": ", ".join([i.name for i in assertion.phenotypes]), "assertion_significance": assertion.significance, "assertion_status": assertion.status, "assertion_summary": assertion.summary, "assertion_therapies_name": ", ".join([i.name for i in assertion.therapies]), "assertion_therapies_ncit_id": ", ".join([i.ncit_id for i in assertion.therapies]), "assertion_therapies_aliases": ", ".join([", ".join(i.aliases) for i in assertion.therapies]), "assertion_therapies_interaction_type": assertion.therapy_interaction_type, "assertion_variant_origin": assertion.variant_origin, } except IndexError: new_dict = { "assertion_name": np.nan, "assertion_acmg_codes": np.nan, "assertion_acmg_codes_description": np.nan, "assertion_amp_level": np.nan, "assertion_direction": np.nan, "assertion_type": np.nan, "assertion_description": np.nan, "assertion_disease_name": np.nan, "assertion_disease_doid": np.nan, "assertion_disease_url": np.nan, "assertion_disease_aliases": np.nan, "assertion_phenotypes": np.nan, "assertion_significance": np.nan, "assertion_status": np.nan, "assertion_summary": np.nan, "assertion_therapies_name": np.nan, "assertion_therapies_ncit_id": np.nan, "assertion_therapies_aliases": np.nan, "assertion_therapies_interaction_type": np.nan, "assertion_variant_origin": np.nan, } assertion_dict = append_to_dict(assertion_dict, new_dict) return smoothen_dict(assertion_dict, False)
[docs]def get_evidence_information_from_variant(variant_obj): """ Get all evidence from a single CIViC variant object :param variant_obj: single CIViC variant object :type variant_ob: CIViC variant object :return: Evidence information for respective CIViC variant object :rtype: dict """ evidence_dict = { "evidence_name": "", "evidence_description": "", "evidence_disease": "", "evidence_level": "", "evidence_support": "", "evidence_type": "", "evidence_phenotypes": "", "evidence_rating": "", "evidence_significance": "", "evidence_source": "", "evidence_status": "", "evidence_therapies": "", "evidence_therapy_interaction_type": "", } for mol_prof in variant_obj.molecular_profiles: for evidence in mol_prof.evidence: try: # evidence = variant_obj.molecular_profiles[0].evidence[0] new_dict = { "evidence_name": evidence.name, "evidence_description": evidence.description, "evidence_disease": evidence.disease.name if type(evidence.disease) != dict else np.nan, "evidence_level": evidence.evidence_level, "evidence_support": evidence.evidence_direction, "evidence_type": evidence.evidence_type, "evidence_phenotypes": ", ".join([i.name for i in evidence.phenotypes]), "evidence_rating": evidence.rating, "evidence_significance": evidence.significance, "evidence_source": evidence.source.name, "evidence_status": evidence.status, "evidence_therapies": ", ".join([i.name for i in evidence.therapies]), "evidence_therapy_interaction_type": evidence.therapy_interaction_type, } except IndexError: new_dict = { "evidence_name": np.nan, "evidence_description": np.nan, "evidence_disease": np.nan, "evidence_level": np.nan, "evidence_support": np.nan, "evidence_type": np.nan, "evidence_phenotypes": np.nan, "evidence_rating": np.nan, "evidence_significance": np.nan, "evidence_source": np.nan, "evidence_status": np.nan, "evidence_therapies": np.nan, "evidence_therapy_interaction_type": np.nan, } evidence_dict = append_to_dict(evidence_dict, new_dict) return smoothen_dict(evidence_dict, True)
[docs]def get_positional_information_from_coord_obj(coord_obj): """ Get information about the position of the variant in the genome :param coord_obj: CoordinateQuery Object to respective variant object :type coord_obj: CIViC CoordinateQuery Object :return: Positional information for respective CIViC variant object :rtype: dict """ return {"chr": coord_obj[0], "start": coord_obj[1], "stop": coord_obj[2], "ref": coord_obj[4], "alt": coord_obj[3]}
[docs]def get_querynator_id(querynator_id): """ Get the querynator id in dict format :param querynator_id: Querynator id :type querynator_id: str :return: Querynator id for respective CIViC variant object :rtype: dict """ return {"querynator_id": querynator_id}
[docs]def concat_dicts(coord_id_dict, variant_obj, filter_vep): """ Create and combine different dictionaries created for single CIViC variant object :param coord_obj: CoordinateQuery Object to respective variant object :type coord_obj: CIViC CoordinateQuery Object :param variant_obj: single CIViC variant object :type variant_ob: CIViC variant object :return: All information for respective CIViC variant object :rtype: dict """ coordinates_info = get_positional_information_from_coord_obj(list(coord_id_dict.keys())[0]) variant_info = get_variant_information_from_variant(variant_obj[0]) gene_info = get_gene_information_from_variant(variant_obj[0]) mol_profile_info = get_molecular_profile_information_from_variant(variant_obj[0]) assertion_info = get_assertion_information_from_variant(variant_obj[0]) evidence_info = get_evidence_information_from_variant(variant_obj[0]) if filter_vep: querynator_id_info = get_querynator_id(coord_id_dict[list(coord_id_dict.keys())[0]]) return { **coordinates_info, **querynator_id_info, **variant_info, **gene_info, **mol_profile_info, **assertion_info, **evidence_info, } else: return {**coordinates_info, **variant_info, **gene_info, **mol_profile_info, **assertion_info, **evidence_info}
[docs]def create_civic_results(variant_list, out_path, logger, filter_vep): """ Combine result dictionaries of all CIViC variant objects to a table and write it to user-specified file :param variant_list: List of CIViC variant objects of successfully queried variants :type variant_list: list :param out_path: Name for directory in which result-table will be stored :type out_path: str :param filter_vep: flag whether VEP based filtering should be performed :type filter_vep: bool :return: None :rtype: None """ civic_result_df = pd.DataFrame() for coord_id_dict, variant in variant_list: civic_result_df = civic_result_df.append(concat_dicts(coord_id_dict, variant, filter_vep), ignore_index=True) logger.info("CIViC Query finished") logger.info("Creating Results") try: civic_result_df.to_csv(f"{out_path}/{os.path.basename(out_path)}.civic_results.tsv", sep="\t", index=False) except OSError: os.mkdir(out_path) civic_result_df.to_csv(f"{out_path}/{os.path.basename(out_path)}.civic_results.tsv", sep="\t", index=False)
[docs]def sort_coord_list(coord_dict): """ Sort the input list to the bulk search :param coord_list: List of CoordinateQuery objects :type coord_list: list :return: sorted coordinates :rtype: list """ return { key: value for key, value in sorted( coord_dict.items(), key=lambda x: ( int(x[0][0]) if x[0][0] != "X" and x[0][0] != "Y" and x[0][0] != "M" else sort_rules(x[0][0]), x[0][1], x[0][2], ), ) }
[docs]def sort_rules(s): """ Set rules to correctly sort chromosomes X,Y,M :param s: "string" chromosome (X,Y,M) :type s: str :return: integer to sort by :rtype: int """ if s == "X": return 100 elif s == "Y": return 1000 elif s == "M": return 10000
[docs]def add_civic_metadata(out_path, input_file, search_mode, genome, filter_vep): """ Attach metadata to civic query :param out_path: Name of directory in which results are stored :type out_path: str :param input_file: path of original input file :type input_file: str :param search_mode: search mode used in CIViC Query :type search_mode: str :param filter_vep: flag whether VEP based filtering should be performed :type filter_vep: bool :return: None :rtype: None """ with open(out_path + "/metadata.txt", "w") as f: f.write("CIViC query date: " + str(date.today())) f.write("\nCIViCpy version: " + str(civicpy.version())) f.write("\nSearch mode: " + str(search_mode)) f.write("\nReference genome: " + str(genome)) if filter_vep: f.write("\nFiltered out synonymous & low impact variants based on VEP annotation") f.write("\nInput File: " + str(input_file)) f.close()
[docs]def query_civic(vcf, out_path, logger, input_file, genome, filter_vep): """ Command to query the CIViC API :param vcf: Variant Call Format (VCF) file (Version 4.2) or list of pyVCF3 variant records :type vcf: str or list :param out_path: Name for directory in which result-table will be stored :type out_path: str :param input_file: path of original input file :type input_file: str :param filter_vep: flag whether VEP based filtering should be performed :type filter_vep: bool :return: None :rtype: None """ logger.info("Querying") coord_dict = get_coordinates_from_vcf(vcf, genome, logger) # coordinates needs to be sorted for bulk search coord_dict = sort_coord_list(coord_dict) # create result table create_civic_results(access_civic_by_coordinate(coord_dict, logger, genome), out_path, logger, filter_vep) add_civic_metadata(out_path, input_file, "exact", genome, filter_vep) logger.info("CIViC Analysis done")