Source code for flowcraft.templates.skesa

#!/usr/bin/env python3

"""
Purpose
-------

This module is intended execute Skesa on paired-end FastQ files.

Expected input
--------------

The following variables are expected whether using NextFlow or the
:py:func:`main` executor.

- ``sample_id`` : Sample Identification string.
    - e.g.: ``'SampleA'``
- ``fastq_pair`` : Pair of FastQ file paths.
    - e.g.: ``'SampleA_1.fastq.gz SampleA_2.fastq.gz'``
- ``clear`` : If 'true', remove the input fastq files at the end of the
    component run, IF THE FILES ARE IN THE WORK DIRECTORY

Generated output
----------------

- ``${sample_id}_*.assembly.fasta`` : Main output of skesawith the assembly
    - e.g.: ``sample_1_skesa.fasta``
- ``clear`` : If 'true', remove the input fastq files at the end of the
    component run, IF THE FILES ARE IN THE WORK DIRECTORY

Code documentation
------------------

"""

__version__ = "1.0.2"
__build__ = "29062018"
__template__ = "skesa-nf"

import os
import re
import subprocess

from subprocess import PIPE

from flowcraft_utils.flowcraft_base import get_logger, MainWrapper

logger = get_logger(__file__)


def __get_version_skesa():

    try:

        cli = ["skesa", "--version"]
        p = subprocess.Popen(cli, stdout=PIPE, stderr=PIPE)
        _, err = p.communicate()

        try:
            version = re.search("v((\\..*))-", err.decode("utf8")).group(1)
        except AttributeError:
            version = "undefined"

    except Exception as e:
        logger.debug(e)
        version = "undefined"

    return {
        "program": "skesa",
        "version": version,
    }


if __file__.endswith(".command.sh"):
    SAMPLE_ID = '$sample_id'
    FASTQ_PAIR = '$fastq_pair'.split()
    CLEAR = '$clear'
    logger.debug("Running {} with parameters:".format(
        os.path.basename(__file__)))
    logger.debug("SAMPLE_ID: {}".format(SAMPLE_ID))
    logger.debug("FASTQ_PAIR: {}".format(FASTQ_PAIR))
    logger.debug("CLEAR: {}".format(CLEAR))


[docs]def clean_up(fastq): """ Cleans the temporary fastq files. If they are symlinks, the link source is removed Parameters ---------- fastq : list List of fastq files. """ for fq in fastq: # Get real path of fastq files, following symlinks rp = os.path.realpath(fq) logger.debug("Removing temporary fastq file path: {}".format(rp)) if re.match(".*/work/.{2}/.{30}/.*", rp): os.remove(rp)
@MainWrapper def main(sample_id, fastq_pair, clear): """Main executor of the skesa template. Parameters ---------- sample_id : str Sample Identification string. fastq_pair : list Two element list containing the paired FastQ files. clear : str Can be either 'true' or 'false'. If 'true', the input fastq files will be removed at the end of the run, IF they are in the working directory """ logger.info("Starting skesa") # Determine output file if "_trim." in fastq_pair[0]: sample_id += "_trim" version = __get_version_skesa()["version"] output_file = "{}_skesa{}.fasta".format(sample_id, version.replace(".", "")) cli = [ "skesa", "--fastq", "{},{}".format(fastq_pair[0], fastq_pair[1]), "--gz", "--use_paired_ends", "--cores", "${task.cpus}" ] logger.debug("Running Skesa subprocess with command: {}".format(cli)) with open(output_file, "w") as fh: p = subprocess.Popen(cli, stdout=fh, stderr=PIPE) stdout, stderr = p.communicate() # Attempt to decode STDERR output from bytes. If unsuccessful, coerce to # string try: stderr = stderr.decode("utf8") stdout = stdout.decode("utf8") except (UnicodeDecodeError, AttributeError): stderr = str(stderr) stdout = str(stdout) logger.info("Finished Skesa subprocess with STDOUT:\\n" "======================================\\n{}".format(stdout)) logger.info("Fished Skesa subprocess with STDERR:\\n" "======================================\\n{}".format(stderr)) logger.info("Finished Skesa with return code: {}".format( p.returncode)) # Remove input fastq files when clear option is specified. # Only remove temporary input when the expected output exists. if clear == "true" and os.path.exists(output_file): clean_up(fastq_pair) with open(".status", "w") as fh: if p.returncode != 0: fh.write("error") raise SystemExit(p.returncode) else: fh.write("pass") if __name__ == '__main__': main(SAMPLE_ID, FASTQ_PAIR, CLEAR)