Source code for montebarcode.checks

"""Functions to check that barcodes conform."""

from __future__ import annotations

from collections.abc import Callable, Iterable, Mapping, Sequence
from collections import Counter, defaultdict
from itertools import zip_longest
import sys

import streq as sq

from .utils import _print_err, pprint_dict


def _print_rejection_reasons(x: Mapping, 
                             n_tried: str) -> None:
    
    key_val_str = {name: (s / n_tried) 
                   for name, s in x.items()}

    pprint_dict(key_val_str, '\nRejection reasons')

    return None


[docs]def minmax_distance(x: Sequence[str], use_levenshtein: bool = True) -> Sequence[int]: """Get minimum and maximum distances among a set of barcodes. This compares all the pairwise distances except for self-self (diagonal of the distance matrix). If there are repeated barcodes in the list, then the minimum distance will be zero. Parameters ---------- x : list | tuple | set Set of barcodes to check. use_levenshtein : bool, optional Whether to use the Levenshtein distance. Default: True. Returns ------- tuple Two integers: the first is the minimum distance, the second is the maximum distance. Examples -------- >>> minmax_distance(['AAA', 'AAA']) (0, 0) >>> minmax_distance(['AAA', 'TCG', 'AAT']) (1, 3) >>> minmax_distance(['AAA', 'TCG', 'AAAT'], use_levenshtein=False) (0, 3) >>> minmax_distance(['AAA', 'TCG', 'AAAT']) (1, 4) """ dist_function = sq.levenshtein if use_levenshtein else sq.hamming all_distances = [dist_function(seq1, seq2) for i, seq1 in enumerate(x) for j, seq2 in enumerate(x) if i != j] try: return min(all_distances), max(all_distances) except ValueError: return 0, 0
[docs]def Distance(min_distance: int = 2, use_levenshtein: bool = True) -> Callable[[str, Iterable], bool]: """Create a distance checking function. Uses the parameters to produce a function which takes as arguments a candidate barcode and a working list of barcodes, and returns True if the parameter conditions are not met. Parameters ---------- min_distance : int The minimum distance allowed among all barcodes. use_levenshtein : bool, optional Whether to use the Levenshtein distance. Default: True. Returns ------- function Checking function. Examples -------- >>> Distance(1)('ATA', ['TCG', 'AAT']) False >>> Distance(2)('AAA', ['TCG', 'AAT']) True """ dist_function = sq.levenshtein if use_levenshtein else sq.hamming def distance(word: str, corpus: Iterable) -> bool: for ref in corpus: hamming_dist = dist_function(word, ref) if hamming_dist < min_distance: return True return False return distance
[docs]def Identities() -> Callable[[str, Iterable], bool]: """Create an identity checking function. Produces a function which takes as arguments a candidate barcode and a working list of barcodes, and returns True if the candidate or its reverse complement is in the working list. Returns ------- function Checking function. Examples -------- >>> Identities()('AAA', ['TCG', 'AAT']) False >>> Identities()('AAA', ['TCG', 'AAA']) True >>> Identities()('AAA', ['TCG', 'TTT']) True """ def identity(word: str, corpus: Iterable) -> bool: return word in corpus or sq.reverse_complement(word) in corpus return identity
[docs]def Palindrome() -> Callable[[str, Iterable], bool]: """Create a palindrome checking function. Produces a function which takes as arguments a candidate barcode and a working list of barcodes, and returns True if the candidate is palindromic. (Working list is ignored.) Returns ------- function Checking function. Examples -------- >>> Palindrome()('AAA', []) False >>> Palindrome()('AATT', []) True """ def palindrome(word: str, _) -> bool: return word == sq.reverse_complement(word) return palindrome
[docs]def GCcontent(min: float = .35, max: float = .65) -> Callable[[str, Iterable], bool]: """Create GC content checking function. Produces a function which takes as arguments a candidate barcode and a working list of barcodes, and returns True if the candidate is not within the bounds. (Working list is ignored.) Parameters ---------- min : float Minimum acceptable proportion of GC content. max : float Maximum acceptable proportion of GC content. Returns ------- function Checking function. Examples -------- >>> GCcontent()('AATT', []) True >>> GCcontent()('AACG', []) False >>> GCcontent()('GGCG', []) True """ def gc_content(word: str, _) -> bool: gc_ = sum([letter in 'GC' for letter in word]) / len(word) return gc_ < min or gc_ > max return gc_content
def _illumina_color_balance(x: Iterable[str], **kwargs) -> bool: (image1_avg, image2_avg, dark_avg) = ([sum(b in channel for b in bases) / len(bases) for bases in zip(*x)] for channel in dict(**kwargs).values()) both_images = any(a <= 0. or b <= 0. for a, b in zip(image1_avg, image2_avg)) signal = any(a >= 1. for a in dark_avg) return both_images or signal
[docs]def base_usage(x: Iterable[str]) -> Mapping: """Calculate the proportional base usage for a set of barcodes. Returns a dictionary mapping position along sequence to the distribution among the bases. Does not check for legitimate DNA alphabet. Parameters ---------- x : Iterable Set of barcodes to check. Returns ------- dict Base usage per position. Examples -------- >>> base_usage(['AAA', 'TTT', 'GCT', 'CCA'])[0]['A'] 0.25 >>> base_usage(['AAA', 'TTT', 'GCT', 'CCA'])[1]['G'] 0 >>> base_usage(['AAA', 'TTT', 'GCT', 'CCA'])[2]['A'] 0.5 """ counter = defaultdict(Counter) for i, bases in enumerate(zip_longest(*x)): for j, base in enumerate(bases): counter[i][base] += 1 for base in counter[i]: counter[i][base] /= (j + 1) return counter
[docs]def IlluminaColorBalance(green_4ch='GT', red_4ch='AC', green_2ch='AT', red_blue_2ch='AC', image1_1ch='AT', image2_1ch='C', alphabet=sq.sequences.DNA) -> Callable[[str, Iterable], bool]: """Create a color balance checking function. Produces a function which takes as arguments a candidate barcode and a working list of barcodes, and returns True if adding the barcode to the working set would give suboptimal color balance across a range of Illumina SBS platforms. This is a very stringent check in order to ensure barcodes will wrok on multiple platforms. If there are 4 or fewer barcodes in total, the checking function will only check for two dark bases (for 1 channel chemistry) at the 5' end. If there are at least 5 barcodes in total, the checking function will test for color balance among the channels and images. Parameters ---------- green_4ch : str, optional Bases detected by the green channel in 4 channel chemistry. Default: 'GT', red_4ch : str, optional Bases detected by the red channel in 4 channel chemistry. Default: 'AC', green_2ch : str, optional Bases detected by the green channel in 2 channel chemistry. Default: 'AT', red_blue_2ch : str, optional Bases detected by the red/blue channel in 2 channel chemistry. Default: 'AC', image1_1ch : str, optional Bases detected by the first image in 1 channel chemistry. Default: 'AT', image2_1ch : str, optional Bases detected by the second image in 1 channel chemistry. Default: 'C', alphabet : str, optional Letters comprising the alphabet. Used to infer the dark bases for each chemistry. Returns ------- function Checking function. Examples -------- >>> IlluminaColorBalance()('GGAT', ['TCGC', 'AAAG']) True >>> IlluminaColorBalance()('AAAT', ['TCGC', 'AAAG']) False >>> IlluminaColorBalance()('AAAT', ['TCGC', 'ACAG', 'TGGC', 'ATCG']) True >>> IlluminaColorBalance()('AAAT', ['TCGC', 'CCAG', 'TGGC', 'ATCG']) False """ dark_4ch = ''.join(letter for letter in alphabet if letter not in (green_4ch + red_4ch)) dark_2ch = ''.join(letter for letter in alphabet if letter not in (green_2ch + red_blue_2ch)) dark_1ch = ''.join(letter for letter in alphabet if letter not in (image1_1ch + image2_1ch)) dark_x2 = dark_1ch * 2 def color_balance(word: str, corpus: Iterable) -> bool: start_GG = word.startswith(dark_x2) proposed_set = corpus + [word] if len(proposed_set) < 5: return start_GG else: _4ch = _illumina_color_balance(proposed_set, green=green_4ch, red=red_4ch, dark=dark_4ch) _2ch = _illumina_color_balance(proposed_set, green=green_2ch, red_blue=red_blue_2ch, dark=dark_2ch) _1ch = _illumina_color_balance(proposed_set, image1=image1_1ch, image2=image2_1ch, dark=dark_1ch) return any((start_GG, _4ch, _2ch, _1ch)) return color_balance
[docs]def Homopolymer(length: int = 4) -> Callable[[str, Iterable], bool]: """Create a homopolymer checking function. Produces a function which takes as arguments a candidate barcode and a working list of barcodes, and returns True if the candidate contains a homopolymer `length` or longer. (Working list is ignored.) Parameters ---------- length : int Minimum length of homopolymer to detect. Returns ------- function Checking function. Examples -------- >>> Homopolymer(3)('AAAT', []) True >>> Homopolymer(4)('AAAT', []) False """ hps = set([letter * length for letter in sq.sequences.DNA]) def homopolymer(word: str, _) -> bool: return any(hp in word for hp in hps) return homopolymer
[docs]def RestrictionSites(n: int = 0) -> Callable[[str, Iterable], bool]: """Create a Type IIS restriction site checking function. Produces a function which takes as arguments a candidate barcode and a working list of barcodes, and returns True if the candidate contains a Type IIS resitriction site commonly used in Golden Gate cloning. (Working list is ignored.) Parameters ---------- n : int Minimum number of restriction sites to tolerate. Returns ------- function Checking function. Examples -------- >>> RestrictionSites()('AAATGGTCTC', []) True >>> RestrictionSites(1)('AAATGGTCTC', []) False >>> RestrictionSites()('AAATGCTCTC', []) False """ def restriction_sites(word: str, _) -> bool: return sq.count_re_sites(word) > n return restriction_sites
[docs]def make_checks(barcodes: Iterable[str], n: int, checks: Iterable[Callable[[str, Sequence], bool]], max_rejection_rate: float = 1., max_tries: int = 10000, initial: Sequence[str] = None, quiet: bool = False) -> Sequence[dict, int, Sequence[str]]: """Check barcode list conforms to the checks. Keeps a tally of rejection rate and rejection reasons. Parameters ---------- barcodes : Iterable[str] List or generator of barcodes to check. n : int Minimum number of barcodes to accept. Stops when this is reached or checked all barcodes. checks : Iterable List of functions which take a candidate barcode and previous barcodes as arguments and return True if the candidate should be rejected. length : int Length of barcodes to generate. max_rejection_rate : float, optional Rejection rate above which to terminate. Default: 1. max_tries : int, optional Number of barcodes to try before enforcing `max_rejection_rate`. Default 100000. initial : list, optional An initial list to append new barcodes. quiet : bool, optional Whether to report progress. Default: True. Returns ------- dict Counts of rejections based on failing each check. int Number of random barcode candidates tried. list List of n random barcodes passing the checks. Raises ------ ValueError When rejection rate goes above the threshold. Examples -------- >>> make_checks(['AAAT', 'TCGC', 'ACAG', 'TGGC', 'ATCG'], 5, checks=[IlluminaColorBalance()], quiet=True) # doctest: +NORMALIZE_WHITESPACE (Counter({'color_balance': 1}), 5, ['AAAT', 'TCGC', 'ACAG', 'TGGC']) >>> make_checks(['AAAT', 'TCGC', 'CCAG', 'TGGC', 'ATCG'], 5, checks=[IlluminaColorBalance()], quiet=True) # doctest: +NORMALIZE_WHITESPACE (Counter(), 5, ['AAAT', 'TCGC', 'CCAG', 'TGGC', 'ATCG']) >>> make_checks(['AAAT', 'TCGC', 'ACAG', 'TGGC', 'ATCG'], 4, checks=[IlluminaColorBalance()], quiet=True) # doctest: +NORMALIZE_WHITESPACE (Counter(), 4, ['AAAT', 'TCGC', 'ACAG', 'TGGC']) >>> # >>> checks = [Homopolymer(), Palindrome()] >>> make_checks(['AAAAT', 'CCCGGG', 'ATCGCG', 'GCCGAT'], 4, checks=checks, quiet=True) # doctest: +NORMALIZE_WHITESPACE (Counter({'homopolymer': 1, 'palindrome': 1}), 4, ['ATCGCG', 'GCCGAT']) >>> make_checks(['AAAAT', 'CCCGGG', 'ATCGCG', 'GCCGAT'], 1, checks=checks, quiet=True) # doctest: +NORMALIZE_WHITESPACE (Counter({'homopolymer': 1, 'palindrome': 1}), 3, ['ATCGCG']) >>> # >>> make_checks(['AAAAT', 'CCCGGG', 'ATCGCG', 'GCCGAT'], 1, checks=checks, initial=['ATCGCG'], quiet=True) # doctest: +NORMALIZE_WHITESPACE (Counter({'homopolymer': 1, 'palindrome': 1}), 3, ['ATCGCG', 'ATCGCG']) >>> checks = [Homopolymer(), Palindrome(), Identities()] >>> make_checks(['AAAAT', 'CCCGGG', 'ATCGCG', 'GCCGAT'], 1, checks=checks, initial=['ATCGCG'], quiet=True) # doctest: +NORMALIZE_WHITESPACE (Counter({'homopolymer': 1, 'palindrome': 1, 'identity': 1}), 4, ['ATCGCG', 'GCCGAT']) """ accepted_barcodes = initial or [] fail_tally = Counter() n_tried, n_rejected, n_accepted = 0, 0, 0 rejection_rate = 0. for i, new_barcode in enumerate(barcodes): n_tried = i + 1 check_fails = [check.__name__ for check in checks if check(new_barcode, accepted_barcodes)] if any(check_fails): fail_tally += Counter(check_fails) n_rejected += 1 if n_tried > max_tries and rejection_rate >= max_rejection_rate: if not quiet: _print_rejection_reasons(fail_tally, n_tried) message = (f'Rejection rate reached {rejection_rate:.2f} ' f'({n_rejected} / {n_tried}), ' 'which is above the maximum ' f'threshold of {max_rejection_rate:.2f}') raise ValueError(message) else: accepted_barcodes.append(new_barcode) n_accepted += 1 rejection_rate = n_rejected / n_tried if not quiet: _print_err(f'\r> Tried {n_tried} barcodes,', f'rejected {n_rejected}, accepted {n_accepted};', f'rejection rate is {rejection_rate:.2f}', end='') if n_accepted >= n: if not quiet: print('', file=sys.stderr) break if not quiet: _print_rejection_reasons(fail_tally, n_tried) return fail_tally, n_tried, accepted_barcodes