Source code for cromwell_tools.cli

import argparse
import requests
from cromwell_tools.cromwell_api import CromwellAPI
from cromwell_tools.cromwell_auth import CromwellAuth
from cromwell_tools.diag import task_runtime
from cromwell_tools import __version__ as cromwell_tools_version
import sys


diagnostic_index = {'task_runtime': task_runtime.run}


[docs]class DefaultHelpParser(argparse.ArgumentParser):
[docs] def error(self, message): sys.stderr.write('error: %s\n' % message) self.print_help() sys.exit(2)
def parser(arguments=None): # TODO: dynamically walk through the commands and automatcally create parsers here main_parser = DefaultHelpParser() # Check the installed version of Cromwell-tools main_parser.add_argument( '-V', '--version', action='version', version='%(prog)s ' + cromwell_tools_version, ) subparsers = main_parser.add_subparsers(help='sub-command help', dest='command') # sub-commands of cromwell-tools submit = subparsers.add_parser( 'submit', help='submit help', description='Submit a WDL workflow on Cromwell.' ) wait = subparsers.add_parser( 'wait', help='wait help', description='Wait for one or more running workflow to finish.', ) status = subparsers.add_parser( 'status', help='status help', description='Get the status of one or more workflows.', ) abort = subparsers.add_parser( 'abort', help='abort help', description='Request Cromwell to abort a running workflow by UUID.', ) release_hold = subparsers.add_parser( 'release_hold', help='release_hold help', description='Request Cromwell to release the hold on a workflow.', ) metadata = subparsers.add_parser( 'metadata', help='metadata help', description='Retrieve the workflow and call-level metadata for a specified workflow by UUID.', ) query = subparsers.add_parser( 'query', help='query help', description='[NOT IMPLEMENTED IN CLI] Query for workflows.', ) health = subparsers.add_parser( 'health', help='health help', description='Check that cromwell is running and that provided authentication is valid.', ) task_runtime = subparsers.add_parser( 'task_runtime', help='task_runtime help', description='Output tsv breakdown of task runtimes by execution event categories', ) # cromwell url and authentication arguments apply to all sub-commands cromwell_sub_commands = ( submit, wait, status, abort, release_hold, metadata, query, health, task_runtime, ) auth_args = { 'url': 'The URL to the Cromwell server. e.g. "https://cromwell.server.org/"', 'username': 'Cromwell username for HTTPBasicAuth.', 'password': 'Cromwell password for HTTPBasicAuth.', 'secrets_file': 'Path to the JSON file containing username, password, and url fields.', 'service_account_key': 'Path to the JSON key file for authenticating with CaaS.', } def add_auth_args(subcommand_parser): for arg_dest, help_text in auth_args.items(): subcommand_parser.add_argument( '--{arg}'.format(arg=arg_dest.replace('_', '-')), dest=arg_dest, default=None, type=str, help=help_text, ) # TODO: this should be a group which is called authentication for p in cromwell_sub_commands: add_auth_args(p) # submit arguments submit.add_argument( '-w', '--wdl-file', dest='wdl_file', type=str, required=True, help='Path to the workflow source file to submit for execution.', ) submit.add_argument( '-i', '--inputs-files', dest='inputs_files', nargs='+', type=str, required=True, help='Path(s) to the input file(s) containing input data in JSON format, separated by space.', ) submit.add_argument( '-d', '--deps-file', dest='dependencies', nargs='+', type=str, help='Path to the Zip file containing dependencies, or a list of raw dependency files to ' 'be zipped together separated by space.', ) submit.add_argument( '-o', '--options-file', dest='options_file', type=str, help='Path to the Cromwell configs JSON file.', ) # TODO: add a mutually exclusive group to make it easy to add labels for users submit.add_argument( '-l', '--label-file', dest='label_file', type=str, default=None, help='Path to the JSON file containing a collection of key/value pairs for workflow labels.', ) submit.add_argument( '-c', '--collection-name', dest='collection_name', type=str, default=None, help='Collection in SAM that the workflow should belong to, if use CaaS.', ) submit.add_argument( '--on-hold', dest='on_hold', type=bool, default=False, help='Whether to submit the workflow in "On Hold" status.', ) submit.add_argument( '--validate-labels', dest='validate_labels', type=bool, default=False, help='Whether to validate cromwell labels.', ) # wait arguments wait.add_argument('workflow_ids', nargs='+') wait.add_argument( '--timeout-minutes', dest='timeout_minutes', type=int, default=120, help='number of minutes to wait before timeout.', ) wait.add_argument( '--poll-interval-seconds', dest='poll_interval_seconds', type=int, default=30, help='seconds between polling cromwell for workflow status.', ) wait.add_argument( '--silent', dest='verbose', action='store_false', help='whether to silently print verbose workflow information while polling cromwell.', ) # status arguments status.add_argument( '--uuid', required=True, help='A Cromwell workflow UUID, which is the workflow identifier.', ) # abort arguments abort.add_argument( '--uuid', required=True, help='A Cromwell workflow UUID, which is the workflow identifier.', ) # release_hold arguments release_hold.add_argument( '--uuid', required=True, help='A Cromwell workflow UUID, which is the workflow identifier.', ) # metadata arguments metadata.add_argument( '--uuid', required=True, help='A Cromwell workflow UUID, which is the workflow identifier.', ) # TODO: add a mutually exclusive group to make it fail early metadata.add_argument( '--includeKey', nargs='+', default=None, help='When specified key(s) to include from the metadata. Matches any key starting with the value. May not be used with excludeKey.', ) metadata.add_argument( '--excludeKey', nargs='+', default=None, help='When specified key(s) to exclude from the metadata. Matches any key starting with the value. May not be used with includeKey.', ) metadata.add_argument( '--expandSubWorkflows', default=False, help='When true, metadata for sub workflows will be fetched and inserted automatically in the metadata response.', ) either_runtime = task_runtime.add_mutually_exclusive_group(required=True) either_runtime.add_argument( '--metadata', dest='metadata', help='Metadata json file to calculate cost on', ) either_runtime.add_argument( '--uuid', dest='uuid', help='A Cromwell workflow UUID, which is the workflow identifier.', ) # query arguments # TODO: implement CLI entry for query API. # group all of the arguments args = vars(main_parser.parse_args(arguments)) # Return help messages if no arguments provided if not args['command']: main_parser.error("No commands/arguments provided!") # TODO: see if this can be moved or if the commands can be populated from above if args['command'] in ( 'submit', 'wait', 'status', 'abort', 'release_hold', 'health', 'metadata', 'task_runtime', ): auth_arg_dict = {k: args.get(k) for k in auth_args.keys()} auth = CromwellAuth.harmonize_credentials(**auth_arg_dict) args['auth'] = auth for k in auth_args: if k in args: del args[k] command = getattr(CromwellAPI, args['command'], False) if not command: try: command = diagnostic_index[args['command']] except KeyError: raise KeyError("%s is not a valid command." % args['command']) del args['command'] return command, args # this should just getattr from CromwellAPI and call the func with args. # TODO: refactor this module into class-based parsers def main(arguments=None): command, args = parser(arguments) result = command(**args) if isinstance(result, requests.Response): print(result.text) else: print(result)