Source code for pairpro.hmmer

"""
The following are various importable code for running HMMER
either locally via pyhmmer or via Interpro's API

The local version runs in parrallel using joblib.
The API version runs in parrallel using asyncio.

The local version is faster, but the API version is more accessible.

TODO:
    - Add error handling/tests
    - Update parsing function documentation
"""
# system dependecies
import asyncio
from collections import defaultdict
from concurrent.futures import ProcessPoolExecutor
import csv
import glob
import json
import logging
import math
import nest_asyncio
import os
import requests
import time
import urllib.parse
import tempfile
from typing import Dict, List, Tuple, Union

# library dependencies
import duckdb as ddb
import httpx
from joblib import Parallel, delayed
import pandas as pd
import pyhmmer

# local dependencies
import pairpro.utils

logger = logging.getLogger(__name__)

# API HMMER


[docs]async def send_request(semaphore, sequence, client): """ Asynchronously sends a POST request to the HMMER API, submitting a protein sequence for analysis. Args: semaphore (asyncio.Semaphore): An object that controls concurrent request submission, helping to avoid server overload. sequence (str): The protein sequence that is to be analyzed and included in the body of the POST request. client (httpx.AsyncClient): An HTTP client for sending the request. Returns: httpx.Response: Response received from the HMMER API. Raises: httpx.HTTPStatusError: If the HTTP request returned a status code that denotes an error. httpx.TimeoutException: If the request times out. """ url = 'https://www.ebi.ac.uk/Tools/hmmer/search/hmmscan' headers = {'Content-Type': 'application/x-www-form-urlencoded', 'Accept': 'application/json'} data = {'hmmdb': 'pfam', 'seq': f'>seq\n{sequence}'} data = urllib.parse.urlencode(data).encode('ascii') async with semaphore: response = await client.post(url, headers=headers, data=data, follow_redirects=False, timeout=15000) return response
[docs]async def process_response(semaphore, sequence, response, client, pair_id, max_retries=3): """ Processes the response received from the HMMER API, including retrying requests that have failed. Args: semaphore (asyncio.Semaphore): An object that controls concurrent request submission, helping to avoid server overload. sequence (str): The protein sequence associated with the response. response (httpx.Response): The response received from the HMMER API. client (httpx.AsyncClient): An HTTP client for sending subsequent requests. pair_id (int): The protein ID associated with the sequence. max_retries (int, optional): The maximum number of retries for failed requests. Defaults to 3. Returns: pd.DataFrame or None: A DataFrame containing the search results for the protein sequence, or None if an error occurred. Raises: KeyError: If expected key is not found in the response. json.JSONDecodeError: If JSON decoding fails. """ redirect_url = response.headers.get('Location') if redirect_url is None: print("Error: No redirect URL found in response.") else: headers = {'Accept': 'application/json'} async with semaphore: for attempt in range(max_retries): try: response2 = await client.get(redirect_url, headers=headers, timeout=15000) break except httpx.ReadTimeout: if attempt < max_retries - 1: # Exponential backoff await asyncio.sleep(5 ** attempt) else: raise try: results = response2.json() hits = results['results']['hits'] except KeyError: logger.info( f"Error: 'results' key not found in response for sequence {sequence}.") return None except json.JSONDecodeError: logger.info( f"Error: JSONDecodeError for sequence {sequence}. Response text: {response2.text}") return None if hits: loop = asyncio.get_event_loop() dfff = await loop.run_in_executor(None, pd.json_normalize, hits, 'domains', ['acc', 'name', 'score', 'evalue', 'pvalue', 'desc']) dfff.insert(0, 'sequence', sequence) # Add new column here dfff.insert(0, 'pair_id', pair_id) dfff = dfff.set_index('pair_id') # Set new column as index return dfff else: return None
[docs]async def hmmerscanner(df: pd.DataFrame, which: str, k: int, max_concurrent_requests: int, output_path: str): """ Scans multiple protein sequences using the HMMER API, asynchronously submitting and processing each request. Args: df (pd.DataFrame): A DataFrame containing protein sequences. which (str): The column name of the protein sequences. k (int): The number of protein sequences to search. max_concurrent_requests (int): The maximum number of concurrent requests to the HMMER API. output_path (str): The output directory where the data will be stored. Returns: pd.DataFrame: A DataFrame containing the search results for all protein sequences. Raises: ValueError: If the number of sequences exceeds the limit of 1000. """ if k > 1000: print("Use local function for the number of sequences more than 1000.") return pd.DataFrame() sequences = df[which][:k] # Get corresponding prot_pair_index values indices = df['pair_id'][:k] tasks = [] semaphore = asyncio.Semaphore(max_concurrent_requests) # Use a process pool to parallelize JSON processing and DataFrame creation with ProcessPoolExecutor() as executor: loop = asyncio.get_event_loop() async with httpx.AsyncClient() as client: for seq, idx in zip(sequences, indices): # Include the index here task = asyncio.create_task( send_request(semaphore, seq, client)) tasks.append(task) responses = await asyncio.gather(*tasks) tasks = [] for (seq, idx), response in zip( zip(sequences, indices), responses): # Include the index here task = asyncio.create_task(process_response( semaphore, seq, response, client, idx)) # idx is the prot tasks.append(task) results = await asyncio.gather(*tasks) common_columns = list(set.intersection( *(set(df.columns) for df in results if df is not None))) results_df = pd.concat([result[list(common_columns)] for result in results if result is not None]) # write result to csv results_df.to_csv(f'{output_path}{which}_API_output.csv') return results_df
[docs]def run_hmmerscanner( df: pd.DataFrame, which: str, k: int, max_concurrent_requests: int, output_path: str): """ Runs the asynchronous HMMER scanning operation in a new event loop. Args: df (pd.DataFrame): A DataFrame containing protein sequences. which (str): The column name of the protein sequences. k (int): The number of protein sequences to search. max_concurrent_requests (int): The maximum number of concurrent requests to the HMMER API. output_path (str): The output directory where the data will be stored. (like '/Users/amin/ValidProt/data/') Returns: pd.DataFrame: A DataFrame containing the search results for all protein sequences. Raises: nest_asyncio.NestingError: If the event loop is already running. Any exceptions raised by hmmerscanner function. """ # Set up the event loop and call the hmmerscanner function nest_asyncio.apply() return asyncio.run(hmmerscanner(df, which, k, max_concurrent_requests, output_path))
# Local HMMER
[docs]def hmmpress_hmms(hmms_path, pfam_data_folder): """ Presses the HMMs in the given HMM database and stores the resulting files in a specified directory. Args: hmms_path (str): Path to the HMM database. pfam_data_folder (str): Path to the directory where the HMMs should be stored. Returns: None Notes: This function uses HMMER's hmmpress program to compress the HMMs in the given HMM database and stores the resulting files in the specified directory for faster access during future HMMER runs. If the specified directory does not exist, it will be created. """ hmms = pyhmmer.plan7.HMMFile(hmms_path) pyhmmer.hmmer.hmmpress(hmms, pfam_data_folder)
[docs]def prefetch_targets(hmms_path: str): """ Prefetch HMM profiles from a given HMM database. Args: hmms_path (str): Path to the pressed HMM database. Returns: targets (pyhmmer.plan7.OptimizedProfileBlock): The HMM profiles loaded from the database. """ # amino acid alphabet and prefetched inputs amino_acids = pyhmmer.easel.Alphabet.amino() optimized_profiles = list(pyhmmer.plan7.HMMPressedFile(hmms_path)) targets = pyhmmer.plan7.OptimizedProfileBlock( amino_acids, optimized_profiles) return targets
[docs]def save_to_digital_sequences(dataframe: pd.DataFrame): """ Save protein sequences from a DataFrame to a digital sequence block. Args: dataframe (pd.DataFrame): DataFrame containing PIDs (Protein IDs) and sequences. Returns: pyhmmer.easel.DigitalSequenceBlock: A digital sequence block containing the converted sequences. """ # Create empty list seqlist = [] # Establish pyhmmer alphabet amino_acids = pyhmmer.easel.Alphabet.amino() # Convert proteins in dataframe to suitable format for _, row in dataframe.iterrows(): pid = bytes(row['pid'], encoding='utf-8') seq_str = row['protein_seq'] sequences = pyhmmer.easel.TextSequence(name=pid, sequence=seq_str) sequences = sequences.digitize(amino_acids) seqlist.append(sequences) # Convert so SequenceBlocks seqblock = pyhmmer.easel.DigitalSequenceBlock(amino_acids, seqlist) return seqblock
[docs]def run_pyhmmer( seqs: Union[pyhmmer.easel.DigitalSequenceBlock, str], hmms_path: str = None, pressed_path: str = None, prefetch: Union[bool, pyhmmer.plan7.OptimizedProfileBlock] = False, output_file: str = None, cpu: int = 4, scan: bool=True, eval_con: float = 1e-10, **kwargs ): """ Run HMMER's hmmscan program on a set of input sequences using HMMs from a database. Args: seqs (pyhmmer.easel.DigitalSequenceBlock): Digital sequence block of input sequences. hmms_path (str): Path to the HMM database. pressed_path (str): Path to the pressed HMM database. prefetch (bool, optional): Specifies whether to use prefetching mode for HMM storage. Defaults to False. output_file (str, optional): Path to the output file if the user wants to write the file. Defaults to None. cpu (int, optional): The number of CPUs to use. Defaults to 4. scan (bool, optional): Specifies whether to run hmmscan or hmmsearch. Defaults to True. eval_con (float, optional): E-value threshold for domain reporting. Defaults to 1e-10. Returns: Union[pyhmmer.plan7.TopHits, str]: If output_file is specified, the function writes the results to a domtblout file and returns the file path. Otherwise, it returns a list of pyhmmer.plan7.TopHits objects. Notes: This function runs HMMER's hmmscan program on a set of input sequences using HMMs from a given database. The function supports two modes: normal mode and prefetching mode. In normal mode, the HMMs are pressed and stored in a directory before execution. In prefetching mode, the HMMs are kept in memory for faster search. """ # check if both hmms_path and pressed_path are specified if not hmms_path and not pressed_path: raise ValueError("Must specifity one of hmm path (to a .hmm file) or pressed_path (containing .h3m, etc.)") # ensure output_file has .domtblout extension if output_file is not None and not output_file.endswith('.domtblout'): output_file = f"{os.path.splitext(output_file)[0]}.domtblout" # HMM profile modes if prefetch: if isinstance(prefetch, pyhmmer.plan7.OptimizedProfileBlock): targets = prefetch elif pressed_path is None: raise ValueError("Spcified prefetch but did not pass a path to pressed files") else: targets = prefetch_targets(pressed_path) else: if hmms_path is None: raise ValueError("Spcified prefetch but did not pass a path to the .hmm file") targets = pyhmmer.plan7.HMMFile(hmms_path) # are the sequences preloaded? if isinstance(seqs, str): seqs = pyhmmer.easel.SequenceFile(seqs, format='fasta', digital=True, alphabet=pyhmmer.easel.Alphabet.amino()) else: pass # HMMscan execution with or without saving output to file if hasattr(seqs, '__len__'): seqs_size = len(seqs) else: seqs_size = "In file, unknown length" if hasattr(targets, '__len__'): targets_size = len(targets) else: targets_size = "In file, unknown length" # run hmmscan or hmmsearch if scan: logger.info(f"Running hmmscan... {seqs_size} sequences against {targets_size} HMMs, using {cpu} CPUs, additional kwargs: {kwargs}") all_hits = pyhmmer.hmmer.hmmscan(seqs, targets, cpus=cpu, incE=eval_con, **kwargs) else: logger.info(f"Running hmmsearch... {targets_size} HMMs against {seqs_size} seqs, using {cpu} CPUs, additional kwargs: {kwargs}") all_hits = pyhmmer.hmmer.hmmsearch(targets, seqs, cpus=cpu, incE=eval_con, **kwargs) # check if we should save the output if output_file is not None: with open(output_file, "wb") as dst: for i, hits in enumerate(all_hits): hits.write(dst, format="domains", header=i == 0) return all_hits
[docs]def parse_pyhmmer(all_hits, chunk_query_ids, scanned: bool = True): """ Parses the TopHit pyhmmer objects, extracting query and accession IDs, and saves them to a DataFrame. Args: all_hits (list): A list of TopHit objects from pyhmmer. chunk_query_ids (list): A list of query IDs from the chunk. scanned (bool, optional): Specifies whether the sequences were scanned or searched. Defaults to True. Returns: pandas.DataFrame: A DataFrame containing the query and accession IDs. Notes: This function iterates over each protein hit in the provided list of TopHit objects and extracts the query and accession IDs. The resulting query and accession IDs are then saved to a DataFrame. Any query IDs that are missing from the parsed hits will be added to the DataFrame with a placeholder value indicating no accession information. """ # initialize an empty dictionary to store the data parsed_hits = {} # iterate over each protein hit for top_hits in all_hits: for hit in top_hits: # check e-value if hit.evalue > top_hits.incE: continue # extract the query and accession IDs and decode the query ID if scanned: query_id = hit.hits.query_name.decode('utf-8') accession_id = hit.accession.decode('utf-8') else: query_id = hit.name.decode('utf-8') accession_id = hit.hits.query_accession.decode('utf-8') # if the query_id already exists in the dictionary, append the accession_id # to the existing value if query_id in parsed_hits: parsed_hits[query_id].append(accession_id) # otherwise, create a new key-value pair in the dictionary else: parsed_hits[query_id] = [accession_id] # find the query IDs that are missing from the parsed hits missing_query_ids = set(chunk_query_ids) - set(parsed_hits.keys()) # add the missing query IDs with a placeholder value to indicate no # accession information for missing_query_id in missing_query_ids: parsed_hits[missing_query_id] = [""] # create the DataFrame from the dictionary df = pd.DataFrame( parsed_hits.items(), columns=[ "query_id", "accession_id"]) # convert list of accession IDs to string df["accession_id"] = df["accession_id"].apply( lambda x: ";".join(x) if x else "") return df
[docs]def local_hmmer_wrapper(chunk_index, chunked_inputs, press_path, hmm_path, out_dir, e_value: float=1e-6, prefetch=True, cpu=1, wakeup=None, scan=True, **kwargs): """ A wrapping function that runs and parses pyhmmer in chunks. Args: chunk_index (int): Number of sequence chunks. chunked_inputs (pandas.DataFrame): DataFrame containing chunked PID inputs press_path (str): Path to the pressed HMMs. hmm_path (str): Path to the HMMs. out_dir (str): Path to the output directory. e_value (float, optional): E-value threshold. Defaults to 1e-6. prefetch (bool, optional): Specifies whether to prefetch the HMMs. Defaults to True. cpu (int, optional): Number of CPUs to use. Defaults to 1. wakeup (int or None, optional): Delay in seconds before starting the execution. Default is None. scan (bool, optional): Specifies whether to run hmmscan or hmmsearch. Defaults to True. Returns: None Notes: This function performs the following steps: 1. Converts string sequences to pyhmmer digital blocks. 2. Runs HMMER via pyhmmer with the provided sequences. 3. Parses the pyhmmer output and saves it to a CSV file. The parsed pyhmmer output is saved in the directory specified by OUTPUT_DIR, with each chunk having its own separate output file named '{chunk_index}_output.csv'. If the wakeup parameter is specified, the function will wait for the specified number of seconds before starting the execution. """ # we want to wait for execution to see if this worker is actually being used # or if it is in the process of being killed if wakeup is not None: time.sleep(wakeup) # convert string sequences to pyhmmer digital blocks sequences = save_to_digital_sequences(chunked_inputs) # run HMMER via pyhmmer if prefetch: hits = run_pyhmmer( seqs=sequences, pressed_path=press_path, prefetch=prefetch, cpu=cpu, eval_con=e_value, scan=scan, **kwargs ) else: hits = run_pyhmmer( seqs=sequences, hmms_path=hmm_path, prefetch=False, cpu=cpu, eval_con=e_value, scan=scan, **kwargs ) # Parse pyhmmer output and save to CSV file accessions_parsed = parse_pyhmmer( all_hits=hits, chunk_query_ids=chunked_inputs['pid'].tolist(), scanned=scan) accessions_parsed.to_csv( f'{out_dir}/{chunk_index}_output.csv', index=False)
# parsing
[docs]def preprocess_accessions(meso_accession: str, thermo_accession: str): """ Preprocesses meso_accession and thermo_accession by converting them to sets. Args: meso_accession (str): Meso accession string separated by ';'. thermo_accession (str): Thermo accession string separated by ';'. Returns: tuple: A tuple containing the preprocessed meso_accession and thermo_accession sets. """ if meso_accession == '' or meso_accession == 'nan': meso_accession_set = set() else: meso_accession_set = set(meso_accession.split(';')) if thermo_accession == '' or thermo_accession == 'nan': thermo_accession_set = set() else: thermo_accession_set = set(thermo_accession.split(';')) return meso_accession_set, thermo_accession_set
[docs]def calculate_jaccard_similarity(meso_accession_set, thermo_accession_set): """ Calculates the Jaccard similarity between meso_pid and thermo_pid pairs based on their accessions. Jaccard similarity is defined as the size of the intersection divided by the size of the union of two sets. Args: meso_accession_set (set): Set of meso_pid accessions. thermo_accession_set (set): Set of thermo_pid accessions. Returns: float: Jaccard similarity between the two sets of accessions. Returns 0 if the union is empty. """ # Calculate Jaccard similarity intersection = len(meso_accession_set.intersection(thermo_accession_set)) union = len(meso_accession_set.union(thermo_accession_set)) jaccard_similarity = intersection / union if union > 0 else 0 return jaccard_similarity
[docs]def process_pairs_table( conn, dbname, chunk_size: int, output_directory, jaccard_threshold): """ Processes the pairs table, calculates Jaccard similarity, and generates output CSV. Parameters: conn (**): Path to the database file. dbname (str): Name of the database. chunk_size (int): Size of each query chunk to fetch from the database. output_directory (str): Directory path to save the output CSV files. jaccard_threshold (float): Threshold value for Jaccard similarity. Returns: None """ # Perform a join to get relevant information from the two tables query1 = f""" CREATE OR REPLACE TEMP TABLE joined_pairs AS SELECT p.meso_pid, p.thermo_pid, pr.accession AS meso_accession, pr2.accession AS thermo_accession FROM {dbname}.pairpro.final AS p INNER JOIN proteins_from_pairs AS pr ON (p.meso_pid = pr.pid) INNER JOIN proteins_from_pairs AS pr2 ON (p.thermo_pid = pr2.pid) """ conn.execute(query1) # Define the evaluation function for the apply function def evaluation_function(row, jaccard_threshold): """ Evaluates the Jaccard similarity between meso_pid and thermo_pid pairs based on their accessions. Notes: This function is used in the apply function to calculate the Jaccard similarity between meso_pid and thermo_pid pairs based on their accessions. There is a parsing logic for the accessions, which is described below. If both meso_accession and thermo_accession are empty, then the Jaccard similarity is None. If either meso_accession or thermo_accession is empty, then the Jaccard similarity is 0. If both meso_accession and thermo_accession are not empty, then the Jaccard similarity is calculated. """ # Get the accessions meso_acc = row['meso_accession'] thermo_acc = row['thermo_accession'] # Preprocess the accessions if type(meso_acc) == str: meso_acc_set, thermo_acc_set = preprocess_accessions(meso_acc, thermo_acc) elif type(meso_acc) == list: meso_acc_set = set(meso_acc) thermo_acc_set = set(thermo_acc) else: print("meso_acc: ", meso_acc) raise ValueError("meso_acc must be either a string or a list") # parsing accessions logic if not meso_acc_set and not thermo_acc_set: score = None functional = None elif meso_acc_set and thermo_acc_set: # Preprocess the accessions score = calculate_jaccard_similarity(meso_acc_set, thermo_acc_set) functional = score > jaccard_threshold else: # Handle unmatched rows score = 0.0 functional = False return {'functional': functional, 'score': score} # Generate output CSV file try: # Execute the query query = conn.execute("SELECT * FROM joined_pairs") data_remaining = True chunk_counter = 0 # Initialize the chunk counter while data_remaining: # Fetch the query result in chunks query_chunk = query.fetch_df_chunk(chunk_size) # Check if there is data remaining if query_chunk.empty: data_remaining = False break # Calculate Jaccard similarity and determine functional status # using apply function query_chunk[['functional', 'score']] = query_chunk.apply( evaluation_function, axis=1, args=(jaccard_threshold,), result_type='expand') # Write DataFrame to CSV chunk_counter += 1 # Increment the chunk counter query_chunk.to_csv( f'{output_directory}{chunk_counter}_output.csv', index=False, columns=[ 'meso_pid', 'thermo_pid', 'functional', 'score']) except IOError as e: logger.warning(f"Error writing to CSV file: {e}")
[docs]def local_hmmer_wrapper_example(chunk_index, dbpath, chunked_pid_inputs, press_path, out_dir, wakeup=None): """ A wrapping function that runs and parses pyhmmer in chunks. Args: chunk_index (int): Number of sequence chunks. dbpath (stf): Path to the database. chunked_pid_inputs (pandas.DataFrame): DataFrame containing chunked PID inputs. press_path (str): Path to the pressed HMM database. out_path (str): Path to directory where output will be saved. wakeup (int or None, optional): Delay in seconds before starting the execution. Default is None. Returns: None Notes: This function performs the following steps: 1. Queries the database to get sequences only from chunked_pid_inputs. 2. Converts the query result to a DataFrame. 3. Converts string sequences to pyhmmer digital blocks. 4. Runs HMMER via pyhmmer with the provided sequences. 5. Parses the pyhmmer output and saves it to a CSV file. The parsed pyhmmer output is saved in the directory specified by OUTPUT_DIR, with each chunk having its own separate output file named '{chunk_index}_output.csv'. If the wakeup parameter is specified, the function will wait for the specified number of seconds before starting the execution. """ # we want to wait for execution to see if this worker is actually being used # or if it is in the process of being killed if wakeup is not None: time.sleep(wakeup) # query the database to get sequences only from chunked_pid_inputs conn = ddb.connect(dbpath, read_only=True) # get the unique pids from the chunked_pid_inputs pids = set(chunked_pid_inputs["pid"]) # Only extract protein_seqs from the list of PID inputs placeholders = ', '.join(['?'] * len(pids)) query = f"SELECT pid, protein_seq FROM pairpro.proteins WHERE pid IN ({placeholders})" query_db = conn.execute(query, list(pids)).fetchall() # close db connection conn.close() # convert the query db to a dataframe result_df = pd.DataFrame(query_db, columns=['pid', 'protein_seq']) # convert string sequences to pyhmmer digital blocks sequences = save_to_digital_sequences(result_df) # run HMMER via pyhmmer hits = run_pyhmmer( seqs=sequences, hmms_path=press_path, press_path=press_path, prefetch=True, cpu=1, eval_con=1e-12) # get the query IDs from the chunked_pid_inputs chunk_query_ids = chunked_pid_inputs["pid"].tolist() # Parse pyhmmer output and save to CSV file accessions_parsed = parse_pyhmmer( all_hits=hits, chunk_query_ids=chunk_query_ids) accessions_parsed.to_csv( f'{out_dir}/{chunk_index}_output.csv', index=False)
# user input
[docs]def save_to_digital_sequences_user_query(dataframe: pd.DataFrame): """ Save protein sequences from a DataFrame to a digital sequence block. Args: dataframe (pd.DataFrame): DataFrame containing pair_id (Protein pair IDs) and sequences. Returns: pyhmmer.easel.DigitalSequenceBlock: A digital sequence block containing the converted sequences. """ # Create empty list query_seqlist = [] # Establish pyhmmer alphabet amino_acids = pyhmmer.easel.Alphabet.amino() # Convert proteins in dataframe to suitable format for _, row in dataframe.iterrows(): pair_id = bytes(str(row['pair_id']), encoding='utf-8') seq_str = row['query'] sequences = pyhmmer.easel.TextSequence(name=pair_id, sequence=seq_str) sequences = sequences.digitize(amino_acids) query_seqlist.append(sequences) # Convert so SequenceBlocks query_seqblock = pyhmmer.easel.DigitalSequenceBlock( amino_acids, query_seqlist) return query_seqblock
[docs]def save_to_digital_sequences_user_subject(dataframe: pd.DataFrame): """ Save protein sequences from a DataFrame to a digital sequence block. Args: dataframe (pd.DataFrame): DataFrame containing pair_id (Protein pair IDs) and sequences. Returns: pyhmmer.easel.DigitalSequenceBlock: A digital sequence block containing the converted sequences. """ # Create empty list subject_seqlist = [] # Establish pyhmmer alphabet amino_acids = pyhmmer.easel.Alphabet.amino() # Convert proteins in dataframe to suitable format for _, row in dataframe.iterrows(): pair_id = bytes(str(row['pair_id']), encoding='utf-8') seq_str = row['subject'] sequences = pyhmmer.easel.TextSequence(name=pair_id, sequence=seq_str) sequences = sequences.digitize(amino_acids) subject_seqlist.append(sequences) # Convert so SequenceBlocks subject_seqblock = pyhmmer.easel.DigitalSequenceBlock( amino_acids, subject_seqlist) return subject_seqblock
[docs]def parse_pyhmmer_user(all_hits, chunk_pair_ids): """ Parses the TopHit pyhmmer objects, extracting query and accession IDs, and saves them to a DataFrame. Args: all_hits (list): A list of TopHit objects from pyhmmer. chunk_pair_ids (list): A list of query IDs from the chunk. Returns: pandas.DataFrame: A DataFrame containing the pair and accession IDs. Notes: This function iterates over each protein hit in the provided list of TopHit objects and extracts the query and accession IDs. The resulting query and accession IDs are then saved to a DataFrame. Any pair IDs that are missing from the parsed hits will be added to the DataFrame with a placeholder value indicating no accession information. """ # initialize an empty dictionary to store the data parsed_hits = {} # iterate over each protein hit for top_hits in all_hits: for hit in top_hits: # extract the query and accession IDs and decode the query ID query_id = hit.hits.query_name.decode('utf-8') accession_id = hit.accession.decode('utf-8') # if the query_id already exists in the dictionary, append the accession_id # to the existing value if query_id in parsed_hits: parsed_hits[query_id].append(accession_id) # otherwise, create a new key-value pair in the dictionary else: parsed_hits[query_id] = [accession_id] # find the query IDs that are missing from the parsed hits missing_query_ids = set(chunk_pair_ids) - set(parsed_hits.keys()) # add the missing query IDs with a placeholder value to indicate no # accession information for missing_query_id in missing_query_ids: parsed_hits[missing_query_id] = [""] # create the DataFrame from the dictionary df = pd.DataFrame(parsed_hits.items(), columns=["pair_id", "accession_id"]) # convert list of accession IDs to string df["accession_id"] = df["accession_id"].apply( lambda x: ";".join(x) if x else "") return df
[docs]def user_local_hmmer_wrapper_query( chunk_index, press_path, sequences, out_dir): """ TODO """ # convert string sequences to pyhmmer digital blocks query_seqblock = save_to_digital_sequences_user_query(sequences) # run HMMER via pyhmmer query_hits = run_pyhmmer( seqs=query_seqblock, hmms_path=press_path, press_path=press_path, prefetch=True, cpu=1, eval_con=1e-5) # get the query IDs from the chunked_pid_inputs missing_pair_ids = sequences["pair_id"].tolist() # Parse pyhmmer output and save to CSV file accessions_parsed_query = parse_pyhmmer_user( all_hits=query_hits, chunk_pair_ids=missing_pair_ids) accessions_parsed_query.to_csv( f"{out_dir}query_result_{chunk_index}.csv", index=False)
[docs]def user_local_hmmer_wrapper_subject( chunk_index, press_path, sequences, out_dir): """ TODO """ # convert string sequences to pyhmmer digital blocks subject_seqblock = save_to_digital_sequences_user_subject(sequences) # run HMMER via pyhmmer subject_hits = run_pyhmmer( seqs=subject_seqblock, hmms_path=press_path, press_path=press_path, prefetch=True, cpu=1, eval_con=1e-5) # get the query IDs from the chunked_pid_inputs missing_pair_ids = sequences["pair_id"].tolist() accessions_parsed_subject = parse_pyhmmer_user( all_hits=subject_hits, chunk_pair_ids=missing_pair_ids) accessions_parsed_subject.to_csv( f"{out_dir}subject_result_{chunk_index}.csv", index=False)
#### API parsing #####
[docs]def parse_function_csv_API(file_path: str) -> Dict[str, List[str]]: """ Parses the CSV file with protein IDs and their corresponding accession IDs and returns a dictionary with protein IDs as keys and accession IDs as values. """ # Create a dictionary to store csv results protein_dict = defaultdict(list) with open(file_path, 'r') as csvfile: # read csv reader = csv.reader(csvfile) # get the header row header = next(reader) # find the index of the pair_id and accession columns pair_id_idx = header.index('pair_id') accession_idx = header.index('acc') for row in reader: pair_id = row[pair_id_idx] accessions = row[accession_idx].split(';') protein_dict[pair_id].extend(accessions) return protein_dict
[docs]def find_jaccard_similarity_API(set1: set, set2: set) -> float: """ Calculates the Jaccard similarity score between two sets. """ intersection = len(set1.intersection(set2)) union = len(set1.union(set2)) if union == 0: return 0.0 else: return intersection / union
[docs]def calculate_similarity_API( file1: str, file2: str, threshold: float) -> Dict[str, Tuple[str, float]]: """ Calculates the Jaccard similarity score between each protein in file1 and file2, and returns a dictionary with query IDs as keys and a tuple indicating whether the score threshold was met and the Jaccard similarity score. """ # Read the CSV files and create dictionaries with query IDs and accession # IDs dict1 = parse_function_csv_API(file1) dict2 = parse_function_csv_API(file2) # Create a dictionary to store the Jaccard similarity scores scores = defaultdict(float) # Calculate the Jaccard similarity score between each protein in file1 and # file2 for query1, accs1 in dict1.items(): for query2, accs2 in dict2.items(): if query1 == query2: score = find_jaccard_similarity_API(set(accs1), set(accs2)) scores[(query1, query2)] = score # Create a dictionary to store the functional tuple values functional = {} # Set the functional tuple value based on the Jaccard similarity score # threshold for (query1, query2), score in scores.items(): if score >= threshold: functional[(query1, query2)] = (True, score) else: functional[(query1, query2)] = (False, score) return functional
[docs]def write_function_output_API( output_dict: Dict[str, Tuple[str, float]], output_file: str): """ Writes a dictionary of protein pair IDs and functional tuple values to a CSV file. Args: output_dict : Dict[str, Tuple[str, float]] A dictionary of protein pair IDs and functional tuple values output_file : str File path to write the output CSV file """ with open(output_file, 'w', newline='') as csvfile: fieldnames = ['file1', 'file2', 'functional', 'jaccard Score'] writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() for pair_id, (functional, score) in output_dict.items(): writer.writerow({ 'file1': pair_id[0], 'file2': pair_id[1], 'functional': functional, 'jaccard Score': score })
[docs]def get_file_pairs_API(directory_path): """ A quick silly function to get pairs """ subject_files = glob.glob(f"{directory_path}/subject_API_output.csv") query_files = glob.glob(f"{directory_path}/query_API_output.csv") print(subject_files) subject_files.sort() query_files.sort() file_pairs = [] for subject_file, query_file in zip(subject_files, query_files): file_pairs.append((subject_file, query_file)) return file_pairs
[docs]def parse_function_csv_user(file_path: str) -> Dict[str, List[str]]: """ Parses the CSV file with protein IDs and their corresponding accession IDs and returns a dictionary with protein IDs as keys and accession IDs as values. """ # Create a dictionary to store csv results protein_dict = {} with open(file_path, 'r') as csvfile: # read csv reader = csv.reader(csvfile) # skip header next(reader) for row in reader: query_id = row[0] accessions = row[1].split(';') protein_dict[query_id] = accessions return protein_dict
[docs]def calculate_similarity_user( file1: str, file2: str, threshold: float) -> Dict[str, Tuple[str, float]]: """ Calculates the Jaccard similarity score between each protein in file1 and file2, and returns a dictionary with query IDs as keys and a tuple indicating whether the score threshold was met and the Jaccard similarity score. """ # Read the CSV files and create dictionaries with query IDs and accession # IDs dict1 = parse_function_csv_user(file1) dict2 = parse_function_csv_user(file2) # Create a dictionary to store the Jaccard similarity scores scores = defaultdict(float) # Calculate the Jaccard similarity score between each protein in file1 and # file2 for query1, accs1 in dict1.items(): for query2, accs2 in dict2.items(): if query1 == query2: score = find_jaccard_similarity_API(set(accs1), set(accs2)) scores[(query1, query2)] = score # Create a dictionary to store the functional tuple values functional = {} # Set the functional tuple value based on the Jaccard similarity score # threshold for (query1, query2), score in scores.items(): if score >= threshold: functional[(query1, query2)] = (True, score) else: functional[(query1, query2)] = (False, score) return functional
[docs]def get_file_pairs_user(directory_path): """ A quick silly function to get pairs """ query_files = glob.glob(f"{directory_path}/query_result_*.csv") subject_files = glob.glob(f"{directory_path}/subject_result_*.csv") print(query_files) query_files.sort() subject_files.sort() file_pairs = [] for query_file, subject_file in zip(query_files, subject_files): query_chunk_index = int(query_file.split("_")[-1].split(".")[0]) subject_chunk_index = int(subject_file.split("_")[-1].split(".")[0]) if query_chunk_index == subject_chunk_index: file_pairs.append((query_file, subject_file)) return file_pairs