Source code for montebarcode.cli

"""Command-line interface for monte-barcode."""

from __future__ import annotations

from typing import TextIO, Union
from collections.abc import Callable, Mapping, Sequence
import argparse
import csv
from functools import reduce
from itertools import chain, permutations
from math import factorial
import operator
import sys

import nemony as nm
import streq as sq
from tqdm import tqdm

from .generate import codon_barcodes, infinite_barcodes, transition_matrix
from . import checks
from .utils import _print_err, pprint_dict, _CODONS

def _reader(input: TextIO, 
            field: Union[int, str]) -> Sequence[str]:

    if field.isdigit():
        infile = csv.reader(input, delimiter='\t')
        key = int(field) - 1
    else:
        infile = csv.DictReader(input, delimiter='\t')
        key = field

    barcode_list = [row[key] for row in infile]
    alphabet_used = set(letter for bc in barcode_list for letter in list(bc))

    assert all(letter in sq.sequences.DNA for letter in alphabet_used),\
               (f"ERROR: Letters other than {sq.sequences.DNA} are "
                f"used in column {field} of {input.name}")
    
    return barcode_list


def _writer(output: TextIO, 
            barcodes: Sequence[str], 
            levenshtein: bool = False) -> None:

    barcode_set_name = nm.encode(barcodes)

    outfile = csv.writer(output, 
                         delimiter='\t')
    min_distance, max_distance = checks.minmax_distance(barcodes, levenshtein)
    n = len(barcodes)

    try:

        for i, barcode in enumerate(barcodes):

            row = (f'{barcode_set_name}:'
                   f'l{len(barcode)}-n{n}-d{min_distance}:'
                   f'x{i}:{nm.encode(barcode)}'), barcode
            outfile.writerow(row)

    except BrokenPipeError:

        pass

    distance = 'Levenshtein' if levenshtein else 'Hamming'
        
    _print_err(f'Wrote barcode set called {barcode_set_name},',
               f'with minimum {distance} distance {min_distance} and',
               f'maximum {distance} distance {max_distance}.')
    
    return None


def _checker(args: argparse.Namespace, 
             barcode_list: Sequence[str],
             initial: Sequence[str] = []) -> Sequence[str]:

    checklist = [checks.Identities(), 
                 checks.Palindrome(),
                 checks.Distance(min_distance=args.distance,
                                 use_levenshtein=args.levenshtein),
                 checks.GCcontent(min=args.gc_min, 
                                  max=args.gc_max),
                 checks.Homopolymer(length=args.homopolymer),
                 checks.RestrictionSites()]
    
    if args.color:
        checklist += [checks.IlluminaColorBalance()]

    try:
        n = len(barcode_list)
        max_rejection_rate = 1.
    except TypeError:
        n = args.number
        max_rejection_rate = args.rejection_rate

    (_, _, 
     barcodes) = checks.make_checks(barcode_list,
                                    n=n, 
                                    max_rejection_rate=max_rejection_rate,
                                    checks=checklist,
                                    initial=initial)
    
    if len(barcodes) < n:

        _print_err(f'WARNING: Could only generate {len(barcodes)} barcodes,',
                   f'but {n} were requested.',
                   'You might need to try different settings.')
    
    _writer(args.output, barcodes, args.levenshtein)

    return barcodes


def _check_permutations(barcodes: Sequence[str], 
                        checklist: Sequence[Callable[[str, Sequence[str]], bool]],
                        constant: Sequence[str] = [],
                        n: int = None) -> Sequence[Mapping, Sequence[str]]:

    n = n or len(barcodes)
    n_permutations = int(factorial(len(barcodes)) / 
                         factorial(len(barcodes) - n))
   
    result = None

    for permutation in tqdm(permutations(barcodes, n), 
                            total=n_permutations):

        try:
            (fail_tally, _, 
             result) = checks.make_checks(constant + list(permutation),
                                          n=len(permutation),
                                          max_rejection_rate=0.,
                                          checks=checklist,
                                          max_tries=0,
                                          quiet=True)
        except ValueError:

            pass

        else:

            break

    if result is None:

        raise ValueError('Ordering could not be found to satisfy checks.')
    
    return fail_tally, result


[docs]def sort_barcodes(args: argparse.Namespace) -> None: pprint_dict(vars(args), 'Sorting barcodes with the following parameters') barcode_list = _reader(args.input, args.field) checklist = [checks.IlluminaColorBalance()] try: _, _, barcodes = checks.make_checks(barcode_list, n=len(barcode_list), max_rejection_rate=0., checks=checklist, max_tries=0, quiet=True) except ValueError: _, starter_barcodes = _check_permutations(barcode_list, checklist, n=6) remaining_barcodes = list(set(barcode_list) - set(starter_barcodes)) _, barcodes = _check_permutations(remaining_barcodes, checklist, constant=list(starter_barcodes)) if len(barcodes) < len(barcode_list): _print_err(f'Could only generate {len(barcodes)} barcodes,', f'but {len(barcode_list)} were provided.', 'You might need to try different settings.') _writer(args.output, barcodes) return None
[docs]def check_barcodes(args: argparse.Namespace) -> None: pprint_dict(vars(args), 'Checking barcodes with the following parameters') barcode_list = _reader(args.input, args.field) barcodes = _checker(args, barcode_list) return None
[docs]def generate(args: argparse.Namespace) -> None: pprint_dict(vars(args), 'Generating barcodes with the following parameters') if args.subcommand != 'sample' and args.amino_acid is not None: invalid_aa = [aa for aa in args.amino_acid if aa not in _CODONS] assert len(invalid_aa) == 0, \ "The following amino acids are invalid: {}".format(", ".join(invalid_aa)) length = len(args.amino_acid) * 3 combinations = reduce(operator.mul, (len(_CODONS[aa]) for aa in args.amino_acid)) _print_err(f'Using amino acid sequence {args.amino_acid} with', f'length {length} and {combinations} possible combinations.') barcodes = codon_barcodes(args.amino_acid) else: if args.subcommand == 'sample': barcode_list = _reader(args.input, args.field) alphabet = transition_matrix(barcode_list) length = len(alphabet) alphabet_length = len(list(set(chain(*barcode_list)))) check_used = False # TODO: Currently is an upper limit; make accurate. combinations = reduce(operator.mul, (max(len(letters) for _, (letters, _) in position.items()) for position in alphabet)) else: length = args.length alphabet = sq.sequences.DNA alphabet_length = len(alphabet) check_used = True combinations = alphabet_length ** length _print_err(f'Requested barcodes with length {length},', f'and {combinations} possible combinations.') barcodes = infinite_barcodes(length, alphabet=alphabet, check_used=check_used) if args.append is not None: initial = _reader(args.append, args.append_field) else: initial = [] assert combinations > args.number,\ f'There are not {args.number} unique {length}-mers. '\ f'Maximum is {combinations}. You might need to try'\ 'different settings.' barcodes = _checker(args, barcodes, initial=initial) return None
[docs]def main() -> None: parser = argparse.ArgumentParser(description=''' Generate random DNA barcodes conforming to contraints, or check sets of barcodes for their conformance. ''') subcommands = parser.add_subparsers(title='Sub-commands', dest='subcommand', help='Use these commands to specify the action you want.') barcode = subcommands.add_parser('barcode', help='Generate random barcodes.') barcode.set_defaults(func=generate) check = subcommands.add_parser('check', help='Check barcode list.') check.set_defaults(func=check_barcodes) sort = subcommands.add_parser('sort', help='Sort barcode list for optimal color balance.') sort.set_defaults(func=sort_barcodes) sample = subcommands.add_parser('sample', help='Generate barcode list by sampling nucleotides ' 'from an existing list of sequences.') sample.set_defaults(func=generate) barcode.add_argument('--length', '-l', type=int, default=12, help='Barcode length. Default: %(default)s') barcode.add_argument('--amino-acid', '-a', type=str, default=None, help='Generate barcodes encoding this amino acid sequence. Default: do not use.') for p in (barcode, sample): p.add_argument('--number', '-n', type=int, required=True, help='Number of barcodes to generate. Required.') p.add_argument('--rejection-rate', '-r', type=float, default=.85, help='Rate of rejection before aborting. Default: %(default)s') p.add_argument('--append', type=argparse.FileType('r'), default=None, help='File to take a list of barcodes to extend. Default: do not use') p.add_argument('--append_field', type=str, default='1', help='Column name or number to take barcodes from for appending. Default: %(default)s') for p in (barcode, check, sample): p.add_argument('--distance', '-d', type=int, default=1, help='Minimum distance between barcodes. Default: %(default)s') p.add_argument('--homopolymer', '-p', type=int, default=3, help='Maximum homopolymer length. Default: %(default)s') p.add_argument('--levenshtein', '-e', action='store_true', help='Use Levenshtein distance. Otherwise ' 'using Hamming diatnce. Default: %(default)s') p.add_argument('--color', '-c', action='store_true', help='Check optimal Illumina color balance. Default: %(default)s') p.add_argument('--gc_min', '-g', type=float, default=.4, help='Minimum GC content. Default: %(default)s') p.add_argument('--gc_max', '-j', type=float, default=.6, help='Maximum GC content. Default: %(default)s') for p in (check, sort, sample): p.add_argument('input', type=argparse.FileType('r'), nargs='?', default=sys.stdin, help='Input file. Default: STDIN.') p.add_argument('--field', '-f', type=str, default='1', help='Column name or number for barcode sequences. Default: %(default)s') for p in (barcode, check, sort, sample): p.add_argument('--output', '-o', type=argparse.FileType('w'), default=sys.stdout, help='Output file. Default: STDOUT') args = parser.parse_args() args.func(args) return None
if __name__ == "__main__": main()