Source code for pairpro.preprocessing

'''
This package builds the PairProphet database from learn2thermDB.

Functions:
    connect_df: Establishes connection to DuckDB database using local or
                remote input path. Reports time to connection.

    build_pairpro: Constructs pairprophet database from learn2therm database.
'''

import time
import duckdb
import os


[docs]def connect_db(path: str, empty=False): ''' Runs duckdb.connect() function on database path. Returns a duckdb.DuckDBPyConnection object and prints execution time. Args: path (str): Path to DuckDB database file containing learn2therm. Returns: con (duckdb.DuckDBPyConnection): A DuckDB connection object linking script to learn2therm database. Raises: AttributeError: Input database contains no tables. ''' s_time = time.time() print('Connecting to database...') con = duckdb.connect(path) if empty is False: tables = con.execute("""SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE='BASE TABLE'""").df() if tables.shape[0] < 1: raise AttributeError('Input database is empty.') else: tables = [] e_time = time.time() elapsed_time = e_time - s_time print(f'Connection established! Execution time: {elapsed_time} seconds') return con, tables
[docs]def build_pairpro(con, out_db_path, min_ogt_diff: int = 20, min_16s: int = 1300): ''' Converts learn2therm DuckDB database into a DuckDB database for PairProphet by adding filtered and constructed tables. Ensure at lease 20 GB of free disk space and 30 GB of system memory are available before running on the full database. Args: con (duckdb.DuckDBPyConnection): DuckDB connection object. Links script to DuckDB SQL database. out_db_path (str): Path to PairProphet output database file. min_ogt_diff (int): Cutoff for minimum difference in optimal growth temperature between thermophile and mesophile pairs. Default 20 deg C. min_16s (int): Cutoff for minimum 16S read length for taxa. Default 1300 bp. Filters out organisms with poor or incomplete 16S sequencing. Returns: None. Database object is modified in place. Raises: ValueError: Optimal growth temperature difference must be positive. ValueError: Minimum 16S sequence read is 1 bp. AttributeError: Database must be in the learn2therm format. ''' if min_ogt_diff < 0: raise ValueError("""Optimal growth temperature difference must be positive.""") if min_16s < 1: raise ValueError('16S must have at least 1 bp read.') tables = con.execute("""SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_TYPE='BASE TABLE'""").df() # Check if proper tables exist in database. If they do not, raise an error. if (item in tables for item in ['proteins', 'protein_pairs', 'taxa', 'taxa_pairs']): pass else: raise AttributeError('Database is not formatted for learn2therm.') s_time = time.time() print('Constructing pairpro_taxa_pairs...') # Builds PairProphet taxa pair table using paired taxa from learn2therm taxa_pairs_cmd = """CREATE OR REPLACE TEMP TABLE pairpro_taxa_pairs AS SELECT * FROM taxa_pairs INNER JOIN taxa_pairs_lab ON (taxa_pairs.__index_level_0__ = taxa_pairs_lab.__index_level_0__) WHERE taxa_pairs_lab.is_pair = True""" con.execute(taxa_pairs_cmd) e_time = time.time() elapsed_time = e_time - s_time print(f"""Finished constructing pairpro_taxa_pairs. Execution time: {elapsed_time} seconds""") print('Constructing pairpro_taxa...') # Builds PairProphet taxa table using only paired taxa from learn2therm. taxa_cmd = """CREATE OR REPLACE TEMP TABLE pairpro_taxa AS SELECT * FROM taxa WHERE taxid IN (SELECT DISTINCT query_id FROM pairpro_taxa_pairs) OR taxid IN (SELECT DISTINCT subject_id FROM pairpro_taxa_pairs)""" con.execute(taxa_cmd) e_time2 = time.time() elapsed_time = e_time2 - e_time print(f"""Finished constructing pairpro_taxa. Execution time: {elapsed_time} seconds""") print('Filtering on ogt and 16S sequence parameters...') # Builds PairProphet table containing taxa pairs and their associated # optimal growth temperatures (ogt). Excludes 16S sequences and ogt # difference below cutoff values from function input. ogt_pairs_cmd = f"""CREATE OR REPLACE TEMP TABLE pairpro_ogt_taxa_pairs AS SELECT pairpro_taxa_pairs.*, taxa_m.temperature AS meso_ogt, taxa_t.temperature AS thermo_ogt, taxa_t.temperature - taxa_m.temperature AS ogt_diff, taxa_m."16s_len" AS meso_16s_len, taxa_t."16s_len" AS thermo_16s_len FROM pairpro_taxa_pairs JOIN pairpro_taxa AS taxa_m ON (pairpro_taxa_pairs.subject_id = taxa_m.taxid) JOIN pairpro_taxa AS taxa_t ON (pairpro_taxa_pairs.query_id = taxa_t.taxid) WHERE ogt_diff >= {min_ogt_diff} AND meso_16s_len >= {min_16s} AND thermo_16s_len >= {min_16s}""" con.execute(ogt_pairs_cmd) e_time3 = time.time() elapsed_time = e_time3 - e_time2 print(f'Finished filtering. Execution time: {elapsed_time} seconds') print('Constructing pairpro_protein_pairs...') # Builds PairProphet table containing protein pairs protein_pair_cmd = """CREATE OR REPLACE TEMP TABLE pairpro_protein_pairs AS SELECT protein_pairs.*, otp.meso_ogt AS m_ogt, otp.thermo_ogt AS t_ogt, otp.ogt_diff AS ogt_difference FROM protein_pairs INNER JOIN pairpro_ogt_taxa_pairs AS otp ON (protein_pairs.thermo_taxid = otp.query_id) AND (protein_pairs.meso_taxid = otp.subject_id)""" con.execute(protein_pair_cmd) e_time4 = time.time() elapsed_time = e_time4 - e_time3 print(f"""Finished constructing pairpro_protein_pairs. Execution time: {elapsed_time} seconds""") print('Constructing pairpro_proteins...') # Builds PairProphet table containing proteins that belong to taxa from # pairpro_taxa_pairs. prot_filt_cmd = """CREATE OR REPLACE TEMP TABLE pairpro_proteins AS SELECT * FROM proteins WHERE pid IN (SELECT DISTINCT meso_pid FROM pairpro_protein_pairs) OR pid IN (SELECT DISTINCT thermo_pid FROM pairpro_protein_pairs) """ con.execute(prot_filt_cmd) e_time5 = time.time() elapsed_time = e_time5 - e_time4 print(f"""Finished constructing pairpro_proteins. Execution time: {elapsed_time} seconds""") print('Constructing final dataset...') # Builds final PairProphet data table for downstream sampling. big_table_cmd = """CREATE OR REPLACE TEMP TABLE pairpro_final AS SELECT pairpro_protein_pairs.*, proteins_m.protein_seq AS m_protein_seq, proteins_t.protein_seq AS t_protein_seq, proteins_m.pdb_id AS meso_pdb, proteins_t.pdb_id AS thermo_pdb FROM pairpro_protein_pairs JOIN pairpro_proteins AS proteins_m ON (pairpro_protein_pairs.meso_pid = proteins_m.pid) JOIN pairpro_proteins AS proteins_t ON (pairpro_protein_pairs.thermo_pid = proteins_t.pid)""" con.execute(big_table_cmd) if os.path.exists(out_db_path): filename, ext = os.path.splitext(out_db_path) counter = 1 while os.path.exists(f'{filename}_{counter}{ext}'): counter += 1 out_db_path = f'{filename}_{counter}{ext}' filename = f'{filename.split("/")[2]}_{counter}' else: filename = os.path.splitext(out_db_path)[0].split("/")[2] print(f'Transferring data to new database {out_db_path}') con.execute(f"""ATTACH '{out_db_path}' AS out_db""") con.execute("""CREATE SCHEMA out_db.pairpro""") con.execute("""CREATE OR REPLACE TABLE out_db.pairpro.final AS SELECT * FROM main.pairpro_final""") con.execute("""CREATE OR REPLACE TABLE out_db.pairpro.proteins AS SELECT * FROM main.pairpro_proteins""") con.execute("""DETACH out_db""") print('Finishing up...') con.commit() con.close() con2, _ = connect_db(out_db_path) # Add pair IDs to final table con2.execute(f"""CREATE TEMP TABLE pair_ids AS SELECT ROW_NUMBER() OVER(ORDER BY meso_pid, thermo_pid) AS pair_id, meso_pid, thermo_pid FROM {filename}.pairpro.final""") con2.execute(f"""ALTER TABLE {filename}.pairpro.final ADD pair_id int""") con2.execute(f"""UPDATE {filename}.pairpro.final AS f SET pair_id = pair_ids.pair_id::int FROM pair_ids WHERE pair_ids.meso_pid = f.meso_pid AND pair_ids.thermo_pid = f.thermo_pid """) et_final = time.time() elapsed_time = et_final - e_time5 print(f'Finished. Total execution time: {elapsed_time} seconds') return con2, filename