Source code for pairpro.user_blast

"""
To do: Raise exception for invalid inputs, try capitalization before removing
rows
"""
from Bio import Align
from Bio.Align import substitution_matrices
import numpy as np
import pandas as pd
import re
import duckdb


[docs]def make_blast_df(df, mode='local', path='./data/blast_db.db'): """ This function generates pairwise alignment scores for a set of protein sequences. Args: df (pandas.core.DataFrame): A 2-column DataFrame containing the query and subject sequences for alignment. mode (str): Alignment type is 'local' or 'global'. Default: 'local'. Returns: blast_df (pandas.core.DataFrame): A dataframe with the input sequence pairs, associated id values, and alignment scores. """ # Rename input data columns for compatibility original_cols = df.columns df.rename(columns={original_cols[0]: 'query', original_cols[1]: 'subject'}, inplace=True) # Remove any rows with NaN or containing non-amino acid letters rows_with_nan = [] for index, row in df.iterrows(): is_nan_series = row.isnull() if is_nan_series.any(): rows_with_nan.append(index) if len(rows_with_nan) != 0: df.drop(np.unique(rows_with_nan), inplace=True) print(f'Found and skipped {len(rows_with_nan)} row(s) containing NaN.') df.reset_index(drop=True, inplace=True) # All valid 1-letter amino acid codes. amino_acids = set("CSTAGPDEQNHRKMILVWYF") invalid_rows = [] for seqs in ['query', 'subject']: for i, seq in enumerate(df[seqs]): if sequence_validate(seq, amino_acids) is False: invalid_rows.append(i) if len(invalid_rows) != 0: df.drop(np.unique(invalid_rows), inplace=True) print(f"""Found and skipped {len(invalid_rows)} row(s) containing invalid amino acid codes.""") df.reset_index(drop=True, inplace=True) # Generate protein ids for each unique query and subject sequence n_unique_1 = np.unique(df['query']) n_unique_2 = np.unique(df['subject']) qid_dict = dict(zip(n_unique_1, range(len(n_unique_1)))) sid_dict = dict(zip(n_unique_2, range(len(n_unique_2)))) df['query_id'] = [qid_dict[i] for i in df.iloc[:, 0]] df['subject_id'] = [sid_dict[i] for i in df.iloc[:, 1]] # Use unique row index as a pair id df['pair_id'] = df.index # Metrics to be calculated metrics = ['local_gap_compressed_percent_id', 'scaled_local_query_percent_id', 'scaled_local_symmetric_percent_id', 'query_align_len', 'query_align_cov', 'subject_align_len', 'subject_align_cov', 'bit_score'] data_dict = df.to_dict('records') # Initialize PairWiseAligner with required parameters for model aligner = Align.PairwiseAligner() aligner.substitution_matrix = substitution_matrices.load("BLOSUM62") aligner.open_gap_score = -11 aligner.extend_gap_score = -1 aligner.mode = mode final_data = [] # Iterate through all pairs and calculate best alignment and metrics for row in data_dict: query = row['query'] subject = row['subject'] alignment = aligner.align(subject, query) best_alignment = max(alignment, key=lambda x: x.score) seq1_aligned = format(best_alignment).split('\n')[0] seq2_aligned = format(best_alignment).split('\n')[2] # Coverage subject_cov = sum(c != '-' for c in seq1_aligned) / len(seq1_aligned) query_cov = sum(c != '-' for c in seq2_aligned) / len(seq2_aligned) # Sequence length l_subject = len(query) l_query = len(subject) # Percent ids n_matches, n_gaps, n_columns, n_comp_gaps = get_matches_gaps( seq2_aligned, seq1_aligned) gap_comp_pct_id = gap_compressed_percent_id( n_matches, n_gaps, n_columns, n_comp_gaps) scaled_local_symmetric_percent_id = 2*n_matches / (l_subject + l_query) scaled_local_query_percent_id = n_matches / l_query # Collect calculated metrics into a list for addition to final_data new_row = [gap_comp_pct_id, scaled_local_query_percent_id, scaled_local_symmetric_percent_id, l_query, query_cov, l_subject, subject_cov, best_alignment.score, row['query_id'], row['subject_id']] final_data.append(new_row) # Construct temporary dataframe from collected metrics columns = metrics + ['query_id', 'subject_id'] final_df = pd.DataFrame(final_data, columns=columns) # Merge final_df with sequences and ids from input df blast_df = df.merge(final_df, left_on=['query_id', 'subject_id'], right_on=['query_id', 'subject_id']) con = duckdb.connect(path) cmd = """CREATE OR REPLACE TABLE protein_pairs AS SELECT * FROM blast_df""" con.execute(cmd) return blast_df, con
[docs]def get_matches_gaps(query, subject): """ Parses sequence alignment text to calculate the number of matches, gaps, compressed gaps, and total columns. Args: query (str): Query aligned sequence. subject (str): Subject aligned sequence. Returns: n_matches (int): Number of matching amino acids in the sequence alignment. n_gaps (int): Total number of gaps across both aligned sequences. n_columns (int): Length of the aligned query sequence. n_comp_gaps (int): Number of compressed gaps. """ n_columns = len(query) n_gaps = sum(i == '-' for i in query) + sum(i == '-' for i in subject) n_matches = sum(i == j and i != '-' for i, j in zip(query, subject)) pattern = r'(-{1,})' instances = re.findall(pattern, query) + re.findall(pattern, subject) n_comp_gaps = len(instances) return n_matches, n_gaps, n_columns, n_comp_gaps
[docs]def gap_compressed_percent_id(n_matches, n_gaps, n_columns, n_comp_gaps): """ Calculates the percent id with compressed gaps. Args: n_matches (int): Number of matches in match columns n_gaps (int): Number of gaps in match columns n_columns (int): Total number of alignment match columns n_compressed_gaps (int): Number of compressed gaps in match columns Returns: n_matches / (n_columns - n_gaps + n_comp_gaps) """ return n_matches / (n_columns - n_gaps + n_comp_gaps)
[docs]def sequence_validate(seq, alph): """ Makes sure sequence complies with alphabet. Args: seq (int): Number of matches in match columns alph (int): Number of gaps in match columns Returns: (bool): True if sequence is valid, False if not """ extra = set(seq) - alph return not extra