Source code for flowcraft.templates.pipeline_status

#!/usr/bin/env python3

"""
Purpose
-------

This module is intended to collect pipeline run statistics (such as
time, cpu, RAM for each tasks) into a report JSON

Expected input
--------------

- ``trace_file`` : *Trace file generated by nextflow*


Code documentation
------------------

"""

__version__ = "1.0.0"
__build__ = "16012018"
__template__ = "pipeline_status-nf"


import os
import json
import traceback

from os.path import join

from flowcraft_utils.flowcraft_base import get_logger, MainWrapper

logger = get_logger(__file__)


LOG_STATS = ".pipeline_status.json"

if __file__.endswith(".command.sh"):
    fastq_id = 'sample_id'
    TRACE_FILE = 'pipeline_stats.txt'
    WORKDIR = '${workflow.projectDir}'


[docs]def get_json_info(fields, header): """ Parameters ---------- fields Returns ------- """ json_dic = dict((x, y) for x, y in zip(header, fields)) return json_dic
[docs]def get_previous_stats(stats_path): """ Parameters ---------- workdir Returns ------- """ logger.debug("Path to pipeline status data set to: {}".format(stats_path)) if os.path.exists(stats_path): logger.debug("Existing pipeline status data found. Loading JSON.") with open(stats_path) as fh: stats_json = json.load(fh) else: logger.debug("No pipeline status data found.") stats_json = {} return stats_json
@MainWrapper def main(sample_id, trace_file, workdir): """ Parses a nextflow trace file, searches for processes with a specific tag and sends a JSON report with the relevant information The expected fields for the trace file are:: 0. task_id 1. process 2. tag 3. status 4. exit code 5. start timestamp 6. container 7. cpus 8. duration 9. realtime 10. queue 11. cpu percentage 12. memory percentage 13. real memory size of the process 14. virtual memory size of the process Parameters ---------- trace_file : str Path to the nextflow trace file """ # Determine the path of the stored JSON for the sample_id stats_suffix = ".stats.json" stats_path = join(workdir, sample_id + stats_suffix) trace_path = join(workdir, trace_file) logger.info("Starting pipeline status routine") logger.debug("Checking for previous pipeline status data") stats_array = get_previous_stats(stats_path) logger.info("Stats JSON object set to : {}".format(stats_array)) # Search for this substring in the tags field. Only lines with this # tag will be processed for the reports tag = " getStats" logger.debug("Tag variable set to: {}".format(tag)) logger.info("Starting parsing of trace file: {}".format(trace_path)) with open(trace_path) as fh: header = next(fh).strip().split() logger.debug("Header set to: {}".format(header)) for line in fh: fields = line.strip().split("\t") # Check if tag substring is in the tag field of the nextflow trace if tag in fields[2] and fields[3] == "COMPLETED": logger.debug( "Parsing trace line with COMPLETED status: {}".format( line)) current_json = get_json_info(fields, header) stats_array[fields[0]] = current_json else: logger.debug( "Ignoring trace line without COMPLETED status" " or stats specific tag: {}".format( line)) with open(join(stats_path), "w") as fh, open(".report.json", "w") as rfh: fh.write(json.dumps(stats_array, separators=(",", ":"))) rfh.write(json.dumps(stats_array, separators=(",", ":"))) if __name__ == "__main__": main(fastq_id, TRACE_FILE, WORKDIR)