Source code for flowcraft.templates.fastqc_report

#!/usr/bin/env python3

"""
Purpose
-------

This module is intended parse the results of FastQC for paired end FastQ \
samples. It parses two reports:

    - Categorical report
    - Nucleotide level report.

Expected input
--------------

The following variables are expected whether using NextFlow or the
:py:func:`main` executor.

- ``sample_id`` : Sample identification string
    - e.g.: ``'SampleA'``

- ``result_p1`` : Path to both FastQC result files for pair 1
    - e.g.: ``'SampleA_1_data SampleA_1_summary'``

- ``result_p2`` : Path to both FastQC result files for pair 2
    - e.g.: ``'SampleA_2_data SampleA_2_summary'``

- ``opts`` : *Specify additional arguments for executing fastqc_report. \
    The arguments should be a string of command line arguments,\
    The accepted arguments are:*
    - ``'--ignore-tests'`` : Ignores test results from FastQC categorical\
    summary. This is used in the first run of FastQC.

Generated output
----------------

The generated output are output files that contain an object, usually a string.

- ``fastqc_health`` : Stores the health check for the current sample. If it
    passes all checks, it contains only the string 'pass'. Otherwise, contains
    the summary categories and their respective results
    - e.g.: ``'pass'``
- ``optimal_trim`` : Stores a tuple with the optimal trimming positions for 5'
    and 3' ends of the reads.
    - e.g.: ``'15 151'``

Code documentation
------------------

"""

__version__ = "1.0.2"
__build__ = "12052018"
__template__ = "fastqc_report-nf"

import os
import json

from collections import OrderedDict

from flowcraft_utils.flowcraft_base import get_logger, MainWrapper

logger = get_logger(__file__)

if __file__.endswith(".command.sh"):
    RESULT_P1 = '$result_p1'.split()
    RESULT_P2 = '$result_p2'.split()
    SAMPLE_ID = '$sample_id'
    OPTS = '$opts'.split()
    logger.debug("Running {} with parameters:".format(
        os.path.basename(__file__)))
    logger.debug("RESULT_P1: {}".format(RESULT_P1))
    logger.debug("RESULT_P2: {}".format(RESULT_P2))
    logger.debug("SAMPLE_ID: {}".format(SAMPLE_ID))
    logger.debug("OPTS: {}".format(OPTS))


def _get_quality_stats(d, start_str, field_start=1, field_end=2):
    """

    Parameters
    ----------
    d

    Returns
    -------

    """

    min_parsed = False
    parse = False
    report = []
    start_str = start_str
    end_str = ">>END_MODULE"

    with open(d) as fh:

        for line in fh:

            if line.startswith(start_str):
                next(fh)
                parse = True
                status = line.strip().split()[-1]

            # Exit parser when end string is found
            elif parse and line.startswith(end_str):
                return report, status

            elif parse:

                fields = line.strip().split()

                # This is triggered when the first value of a line series is
                # not 1. If the starting point of the series is a number
                # different from 1, fill the report with 0 until that point
                if not min_parsed:
                    if fields[0] != "1":
                        try:
                            blank_points = int(fields[0]) - 1
                            report.extend([0] * blank_points)
                        except ValueError:
                            pass
                    min_parsed = True

                report.append(";".join([
                    str(round(float(x), 2)) for x in
                    fields[field_start: field_end]
                ]))


[docs]def write_json_report(sample_id, data1, data2): """Writes the report Parameters ---------- data1 data2 Returns ------- """ parser_map = { "base_sequence_quality": ">>Per base sequence quality", "sequence_quality": ">>Per sequence quality scores", "base_gc_content": ">>Per sequence GC content", "base_n_content": ">>Per base N content", "sequence_length_dist": ">>Sequence Length Distribution", "per_base_sequence_content": ">>Per base sequence content" } json_dic = { "plotData": [{ "sample": sample_id, "data": { "base_sequence_quality": {"status": None, "data": []}, "sequence_quality": {"status": None, "data": []}, "base_gc_content": {"status": None, "data": []}, "base_n_content": {"status": None, "data": []}, "sequence_length_dist": {"status": None, "data": []}, "per_base_sequence_content": {"status": None, "data": []} } }] } for cat, start_str in parser_map.items(): if cat == "per_base_sequence_content": fs = 1 fe = 5 else: fs = 1 fe = 2 report1, status1 = _get_quality_stats(data1, start_str, field_start=fs, field_end=fe) report2, status2 = _get_quality_stats(data2, start_str, field_start=fs, field_end=fe) status = None for i in ["fail", "warn", "pass"]: if i in [status1, status2]: status = i json_dic["plotData"][0]["data"][cat]["data"] = [report1, report2] json_dic["plotData"][0]["data"][cat]["status"] = status return json_dic
[docs]def get_trim_index(biased_list): """Returns the trim index from a ``bool`` list Provided with a list of ``bool`` elements (``[False, False, True, True]``), this function will assess the index of the list that minimizes the number of True elements (biased positions) at the extremities. To do so, it will iterate over the boolean list and find an index position where there are two consecutive ``False`` elements after a ``True`` element. This will be considered as an optimal trim position. For example, in the following list:: [True, True, False, True, True, False, False, False, False, ...] The optimal trim index will be the 4th position, since it is the first occurrence of a ``True`` element with two False elements after it. If the provided ``bool`` list has no ``True`` elements, then the 0 index is returned. Parameters ---------- biased_list: list List of ``bool`` elements, where ``True`` means a biased site. Returns ------- x : index position of the biased list for the optimal trim. """ # Return index 0 if there are no biased positions if set(biased_list) == {False}: return 0 if set(biased_list[:5]) == {False}: return 0 # Iterate over the biased_list array. Keep the iteration going until # we find a biased position with the two following positions unbiased # (e.g.: True, False, False). # When this condition is verified, return the last biased position # index for subsequent trimming. for i, val in enumerate(biased_list): if val and set(biased_list[i+1:i+3]) == {False}: return i + 1 # If the previous iteration could not find and index to trim, it means # that the whole list is basically biased. Return the length of the # biased_list return len(biased_list)
[docs]def trim_range(data_file): """Assess the optimal trim range for a given FastQC data file. This function will parse a single FastQC data file, namely the *'Per base sequence content'* category. It will retrieve the A/T and G/C content for each nucleotide position in the reads, and check whether the G/C and A/T proportions are between 80% and 120%. If they are, that nucleotide position is marked as biased for future removal. Parameters ---------- data_file: str Path to FastQC data file. Returns ------- trim_nt: list List containing the range with the best trimming positions for the corresponding FastQ file. The first element is the 5' end trim index and the second element is the 3' end trim index. """ logger.debug("Starting trim range assessment") # Target string for nucleotide bias assessment target_nuc_bias = ">>Per base sequence content" logger.debug("Target string to start nucleotide bias assessment set to " "{}".format(target_nuc_bias)) # This flag will become True when gathering base proportion data # from file. gather = False # This variable will store a boolean array on the biased/unbiased # positions. Biased position will be True, while unbiased positions # will be False biased = [] with open(data_file) as fh: for line in fh: # Start assessment of nucleotide bias if line.startswith(target_nuc_bias): # Skip comment line logger.debug("Found target string at line: {}".format(line)) next(fh) gather = True # Stop assessment when reaching end of target module elif line.startswith(">>END_MODULE") and gather: logger.debug("Stopping parsing at line: {}".format(line)) break elif gather: # Get proportions of each nucleotide g, a, t, c = [float(x) for x in line.strip().split()[1:]] # Get 'GC' and 'AT content gc = (g + 0.1) / (c + 0.1) at = (a + 0.1) / (t + 0.1) # Assess bias if 0.8 <= gc <= 1.2 and 0.8 <= at <= 1.2: biased.append(False) else: biased.append(True) logger.debug("Finished bias assessment with result: {}".format(biased)) # Split biased list in half to get the 5' and 3' ends biased_5end, biased_3end = biased[:int(len(biased)/2)],\ biased[int(len(biased)/2):][::-1] logger.debug("Getting optimal trim range from biased list") trim_nt = [0, 0] # Assess number of nucleotides to clip at 5' end trim_nt[0] = get_trim_index(biased_5end) logger.debug("Optimal trim range at 5' end set to: {}".format(trim_nt[0])) # Assess number of nucleotides to clip at 3' end trim_nt[1] = len(biased) - get_trim_index(biased_3end) logger.debug("Optimal trim range at 3' end set to: {}".format(trim_nt[1])) return trim_nt
[docs]def get_sample_trim(p1_data, p2_data): """Get the optimal read trim range from data files of paired FastQ reads. Given the FastQC data report files for paired-end FastQ reads, this function will assess the optimal trim range for the 3' and 5' ends of the paired-end reads. This assessment will be based on the *'Per sequence GC content'*. Parameters ---------- p1_data: str Path to FastQC data report file from pair 1 p2_data: str Path to FastQC data report file from pair 2 Returns ------- optimal_5trim: int Optimal trim index for the 5' end of the reads optima_3trim: int Optimal trim index for the 3' end of the reads See Also -------- trim_range """ sample_ranges = [trim_range(x) for x in [p1_data, p2_data]] # Get the optimal trim position for 5' end optimal_5trim = max([x[0] for x in sample_ranges]) # Get optimal trim position for 3' end optimal_3trim = min([x[1] for x in sample_ranges]) return optimal_5trim, optimal_3trim
[docs]def get_summary(summary_file): """Parses a FastQC summary report file and returns it as a dictionary. This function parses a typical FastQC summary report file, retrieving only the information on the first two columns. For instance, a line could be:: 'PASS Basic Statistics SH10762A_1.fastq.gz' This parser will build a dictionary with the string in the second column as a key and the QC result as the value. In this case, the returned ``dict`` would be something like:: {"Basic Statistics": "PASS"} Parameters ---------- summary_file: str Path to FastQC summary report. Returns ------- summary_info: :py:data:`OrderedDict` Returns the information of the FastQC summary report as an ordered dictionary, with the categories as strings and the QC result as values. """ summary_info = OrderedDict() logger.debug("Retrieving summary information from file: {}".format( summary_file)) with open(summary_file) as fh: for line in fh: # Skip empty lines if not line.strip(): continue # Populate summary info fields = [x.strip() for x in line.split("\t")] summary_info[fields[1]] = fields[0] logger.debug("Retrieved summary information from file: {}".format( summary_info)) return summary_info
[docs]def check_summary_health(summary_file, **kwargs): """Checks the health of a sample from the FastQC summary file. Parses the FastQC summary file and tests whether the sample is good or not. There are four categories that cannot fail, and two that must pass in order for the sample pass this check. If the sample fails the quality checks, a list with the failing categories is also returned. Categories that cannot fail:: fail_sensitive = [ "Per base sequence quality", "Overrepresented sequences", "Sequence Length Distribution", "Per sequence GC content" ] Categories that must pass:: must_pass = [ "Per base N content", "Adapter Content" ] Parameters ---------- summary_file: str Path to FastQC summary file. Returns ------- x : bool Returns ``True`` if the sample passes all tests. ``False`` if not. summary_info : list A list with the FastQC categories that failed the tests. Is empty if the sample passes all tests. """ # Store the summary categories that cannot fail. If they fail, do not # proceed with this sample fail_sensitive = kwargs.get("fail_sensitive", [ "Per base sequence quality", "Overrepresented sequences", "Sequence Length Distribution", "Per sequence GC content" ]) logger.debug("Fail sensitive categories: {}".format(fail_sensitive)) # Store summary categories that must pass. If they do not, do not proceed # with that sample must_pass = kwargs.get("must_pass", [ "Per base N content", "Adapter Content" ]) logger.debug("Must pass categories: {}".format(must_pass)) warning_fail_sensitive = kwargs.get("warning_fail_sensitive", [ "Per base sequence quality", "Overrepresented sequences", ]) warning_must_pass = kwargs.get("warning_must_pass", [ "Per base sequence content" ]) # Get summary dictionary summary_info = get_summary(summary_file) # This flag will change to False if one of the tests fails health = True # List of failing categories failed = [] # List of warning categories warning = [] for cat, test in summary_info.items(): logger.debug("Assessing category {} with result {}".format(cat, test)) # FAILURES # Check for fail sensitive if cat in fail_sensitive and test == "FAIL": health = False failed.append("{}:{}".format(cat, test)) logger.error("Category {} failed a fail sensitive " "category".format(cat)) # Check for must pass if cat in must_pass and test != "PASS": health = False failed.append("{}:{}".format(cat, test)) logger.error("Category {} failed a must pass category".format( cat)) # WARNINGS # Check for fail sensitive if cat in warning_fail_sensitive and test == "FAIL": warning.append("Failed category: {}".format(cat)) logger.warning("Category {} flagged at a fail sensitive " "category".format(cat)) if cat in warning_must_pass and test != "PASS": warning.append("Did not pass category: {}".format(cat)) logger.warning("Category {} flagged at a must pass " "category".format(cat)) # Passed all tests return health, failed, warning
@MainWrapper def main(sample_id, result_p1, result_p2, opts): """Main executor of the fastqc_report template. If the "--ignore-tests" option is present in the ``opts`` argument, the health check of the sample will be bypassed, and it will pass the check. This option is used in the first run of FastQC. In the second run (after filtering with trimmomatic) this option is not provided and the samples are submitted to a health check before proceeding in the pipeline. Parameters ---------- sample_id : str Sample Identification string. result_p1 : list Two element list containing the path to the FastQC report files to the first FastQ pair. The first must be the nucleotide level report and the second the categorical report. result_p2: list Two element list containing the path to the FastQC report files to the second FastQ pair. The first must be the nucleotide level report and the second the categorical report. opts : list List of arbitrary options. See `Expected input`_. """ logger.info("Starting fastqc report") json_dic = {} with open("{}_trim_report".format(sample_id), "w") as trep_fh, \ open("optimal_trim", "w") as trim_fh, \ open("{}_status_report".format(sample_id), "w") as rep_fh, \ open(".status", "w") as status_fh, \ open(".warning", "w") as warn_fh, \ open(".fail", "w") as fail_fh, \ open(".report.json", "w") as report_fh: # Perform health check according to the FastQC summary report for # each pair. If both pairs pass the check, send the 'pass' information # to the 'fastqc_health' channel. If at least one fails, send the # summary report. if "--ignore-tests" not in opts: # Get reports for each category in json format json_dic = write_json_report(sample_id, result_p1[0], result_p2[0]) logger.info("Performing FastQ health check") for p, fastqc_summary in enumerate([result_p1[1], result_p2[1]]): logger.debug("Checking files: {}".format(fastqc_summary)) # Get the boolean health variable and a list of failed # categories, if any health, f_cat, warnings = check_summary_health(fastqc_summary) logger.debug("Health checked: {}".format(health)) logger.debug("Failed categories: {}".format(f_cat)) # Write any warnings if warnings: json_dic["warnings"] = [{ "sample": sample_id, "table": "qc", "value": [] }] for w in warnings: warn_fh.write("{}\\n".format(w)) json_dic["warnings"][0]["value"].append(w) # Rename category summary file to the channel that will publish # The results output_file = "{}_{}_summary.txt".format(sample_id, p) os.rename(fastqc_summary, output_file) logger.debug("Setting summary file name to {}".format( output_file)) # If one of the health flags returns False, send the summary # report through the status channel if not health: fail_msg = "Sample failed quality control checks:" \ " {}".format(",".join(f_cat)) logger.warning(fail_msg) fail_fh.write(fail_msg) json_dic["fail"] = [{ "sample": sample_id, "table": "qc", "value": [fail_msg] }] report_fh.write( json.dumps(json_dic, separators=(",", ":"))) status_fh.write("fail") trim_fh.write("fail") rep_fh.write("{}, {}\\n".format(sample_id, ",".join(f_cat))) trep_fh.write("{},fail,fail\\n".format(sample_id)) return logger.info("Sample passed quality control checks") status_fh.write("pass") rep_fh.write("{}, pass\\n".format(sample_id)) logger.info("Assessing optimal trim range for sample") # Get optimal trimming range for sample, based on the per base sequence # content optimal_trim = get_sample_trim(result_p1[0], result_p2[0]) logger.info("Optimal trim range set to: {}".format(optimal_trim)) trim_fh.write("{}".format(" ".join([str(x) for x in optimal_trim]))) trep_fh.write("{},{},{}\\n".format(sample_id, optimal_trim[0], optimal_trim[1])) # The json dict report is only populated when the FastQC quality # checks are performed, that is, when the --ignore-tests option # is not provide if json_dic: report_fh.write(json.dumps(json_dic, separators=(",", ":"))) if __name__ == '__main__': main(SAMPLE_ID, RESULT_P1, RESULT_P2, OPTS)