from concurrent.futures import ThreadPoolExecutor
from typing import Dict, Iterator, Iterable, List, Literal, Tuple, Union, overload
from urllib.parse import urlencode
from urllib.request import urlopen
from urllib.error import HTTPError
from bioseq.config import SYMBOL
from bioseq import DNA, RNA, Peptide, Sequence
# TODO: Merge fetch function
@overload
def fetchENS(uid: str) -> DNA:
...
@overload
def fetchENS(uid: List[str]) -> List[DNA]:
...
[docs]def fetchENS(uid):
"""Fetch sequence corresponding to UID from Ensemble REST api.
Args:
uid(str | List[str]): One or list of ENS's unique id
Returns:
DNA | List[DNA]: One or list of DNA sequence corresponding to UID
"""
ens_db_url = "https://rest.ensembl.org/sequence/id/{}?content-type=text/plain"
def fetch(uid):
raw_info = ""
for i in range(3):
try:
print(
f"[Try {i + 1}/3]Fetching {uid} from Ensemble REST API...")
raw_info = urlopen(ens_db_url.format(uid)).read().decode()
break
except HTTPError as e:
if e.code == 400:
print(f"{uid} not found")
break
else:
print(f"{uid} retry: {e}.")
except Exception as e:
print(e)
break
return DNA(raw_info, uid)
if isinstance(uid, str):
return fetch(uid)
elif isinstance(uid, Iterable):
with ThreadPoolExecutor(5) as executor:
return list(executor.map(fetch, uid))
else:
raise ValueError(f"{uid}'s type <{type(uid)}> is not a str or list of str")
@overload
def fetchNCBI(uid: str) -> Union[DNA, RNA, Peptide]:
...
@overload
def fetchNCBI(uid: List[str]) -> List[Sequence]:
...
[docs]def fetchNCBI(uid):
"""Fetch sequence corresponding to UID from NCBI E-utilities. Only support RNA, mRNA(DNA), Protein.
============= ==============================================================================
Prefix Explanation
============= ==============================================================================
NM_(mRNA) Protein-coding transcripts (usually curated)
NR_(RNA ) Non-protein-coding transcripts
XM_(mRNA) Predicted model protein-coding transcript
XR_(RNA ) Predicted model non-protein-coding transcript
AP_(Protein) Annotated on AC alternate assembly
NP_(Protein) Associated with an NM or NC accession
YP_(Protein) Annotated on genomic molecules without an instantiated transcript record
XP_(Protein) Predicted model, associated with an XM accession
WP_(Protein) Non-redundant across multiple strains and species
============= ==============================================================================
| NCBI RefSeq's document: https://www.ncbi.nlm.nih.gov/books/NBK21091/table/ch18.T.refseq_accession_numbers_and_mole
| some NCBI E-utilities's api: https://www.ncbi.nlm.nih.gov/books/NBK25499/table/chapter4.T._valid_values_of__retmode_and/
Args:
uid(str|List[str]): One or list of NCBI's unique id
Returns:
DNA | RNA | Peptide | List[Sequence]:
If uid is a list, the return is a list of Sequence(excluded the uid not found data on NCBI)
without ensure sequence's type, else the return is a Sequence corresponding to UID.
"""
eutils_url = "https://eutils.ncbi.nlm.nih.gov/entrez/eutils/efetch.fcgi?"
eutils_post = {
"db": "", # database
"rettype": "fasta", # text type
"retmode": "text", # data type
"id": "", # uid
}
def checkUID(uid: str) -> Tuple[str, str]:
"""
Check whether the uid is valid
Args:
uid: uid
Returns:
returns: ncbi_db_name, Sequence's type corresponding to uid
"""
if uid[:3] in ["AP_", "NP_", "YP_", "XP_", "WP_"]:
return "protein", "Peptide"
elif uid[:3] in ["NM_", "XM_"]:
return "nuccore", "DNA"
elif uid[:3] in ["NR_", "XR_"]:
return "nuccore", "RNA"
else:
raise ValueError(f"{uid} is not a support uid")
def fetch(data: Dict) -> List[Sequence]:
try:
print(f"Fetching {data['id']} from NCBI E-utilities...")
raw_info = urlopen(
eutils_url + urlencode(data)
).read().decode()
except HTTPError as e:
print(e)
return [Sequence("")]
else:
return parseFasta(raw_info)
if isinstance(uid, str):
eutils_post["id"] = uid
eutils_post["db"], seq_type = checkUID(uid)
sequence = fetch(eutils_post)[0]
return getattr(sequence, f"to{seq_type}")()
elif isinstance(uid, Iterable):
uids = {}
for id in uid:
db, seq_type = checkUID(id)
uids.setdefault(db, []).append((id, seq_type))
result = []
for db, seqs in uids.items():
eutils_post["db"] = db
for i in range(0, len(seqs), 200):
eutils_post["id"] = ",".join(
[info[0] for info in seqs[i:i + 200]])
result.extend(fetch(eutils_post))
return result
else:
raise ValueError(f"{uid}'s type <{type(uid)}> is not a str or list of str")
[docs]def parseFasta(fasta_text: str) -> List[Sequence]:
"""
Parse a FASTA formatted string.
Args:
fasta_text(str): string to be parsed
Returns:
List[Sequence]: Parsing result
"""
return [Sequence("".join((text := fasta.split("\n"))[1:]), text[0])
for fasta in fasta_text.split(">")
if fasta.strip()]
@overload
def loadFasta(filename: str) -> List[Sequence]:
...
@overload
def loadFasta(filename: str, iterator: Literal[True]) -> Iterator[Sequence]:
...
[docs]def loadFasta(filename, iterator=False):
"""Load fasta file
Args:
filename: the fasta file's name.
iterator: Set to True as reading a large file, it will return a iterator.
Returns:
List[Sequence] | Iterator
"""
def iter_parse(filename: str) -> Iterator[Sequence]:
seq = ""
info = ""
with open(filename, encoding="utf8") as f:
while line := f.readline():
# process the blank line
if not line.strip() and seq:
yield Sequence(seq, info)
seq = "" # reset seq
continue
if line.startswith(">"):
# yield previous sequence
if seq:
yield Sequence(seq, info)
seq = "" # reset seq
info = line[1:].strip()
else:
seq += line.strip()
if seq:
yield Sequence(seq, info)
if iterator:
return iter_parse(filename)
else:
with open(filename, encoding="utf8") as f:
return parseFasta(f.read())
[docs]def printAlign(
sequence1: str,
sequence2: str,
spacing: int = 10,
line_width: int = 30,
show_seq: bool = True):
"""
Print two sequence by a pretty format
Args:
sequence1(Iterator)
sequence2(Iterator)
spacing(int): A space each $spacing char
line_width(int): the width of each line
show_seq(bool): if False, only print the alignment result
"""
symbol_line = ""
format_seq1 = ""
format_seq2 = ""
count = 0
length = len(sequence1) if len(sequence1) < len(
sequence2) else len(sequence2)
match_symbol, mismatch_symbol, gap_symbol = SYMBOL["printAlign"]
for i in range(length):
base1 = sequence1[i]
base2 = sequence2[i]
if base1 == base2:
symbol_line += match_symbol
elif base1 == "-" or base2 == "-":
symbol_line += gap_symbol
else:
symbol_line += mismatch_symbol
format_seq1 += base1
format_seq2 += base2
count += 1
if count % spacing == 0:
format_seq1 += " "
format_seq2 += " "
symbol_line += " "
if count % line_width == 0:
count = 0
space_num = 0
space_each_line = line_width // spacing
for i in range(0, length, line_width):
start = i + space_num
end = start + line_width + space_each_line
space_num += space_each_line
if show_seq:
print(f"{i + 1:>5}", end=" ")
print(format_seq1[start: end])
print(f" " * 5, end=" ")
print(symbol_line[start: end])
print(f"{i + 1:>5}", end=" ")
print(format_seq2[start: end])
else:
print(f"{i + 1:>5}", end=" ")
print(symbol_line[start: end])
print()