diff --git a/bin/Xnat_tools/XnatRepushScan b/bin/Xnat_tools/XnatRepushScan new file mode 100644 index 00000000..7ddc6cef --- /dev/null +++ b/bin/Xnat_tools/XnatRepushScan @@ -0,0 +1,294 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +''' +Repush project scans from PACS that are missing on XNAT + +Created April 26th, 2023 + +@author: William Duett, VUIIS CCI, Vanderbilt University +''' + +# Libraries +import ast +import csv +import logging +import os +import pandas as pd +import pyxnat +import re +import requests +import shutil +import sys +import time + +from dax import XnatUtils + +DEFAULT_ARGUMENTS = {'host': None, 'username': None, 'project': None, 'subject': None, 'session': None, 'scan': None} +DESCRIPTION = """What is the script doing : + *Repush project scans from PACS that are missing on XNAT + +Example: + *Push all missing examcards from given project + XnatRepushScan --proj PID --scan unknown + *Push all missing T1s from given project + XnatRepushScan --proj PID --scan T1 +""" + + +def setup_info_logger(name): + """ + Using logger for the executables output. + Setting the information for the logger. + + :param name: Name of the logger + :return: logging object + """ + handler = logging.StreamHandler() + logger = logging.getLogger(name) + logger.setLevel(logging.INFO) + logger.addHandler(handler) + return logger + + +def parse_args(): + """ + Method to parse arguments base on ArgumentParser + + :return: parser object + """ + from argparse import ArgumentParser, RawDescriptionHelpFormatter + argp = ArgumentParser(prog='XnatRepushScan', description=DESCRIPTION, + formatter_class=RawDescriptionHelpFormatter) + argp.add_argument('--host', dest='host', default=None, + help='Host for XNAT. Default: using $XNAT_HOST.') + argp.add_argument('-u', '--username', dest='username', default=None, + help='Username for XNAT. Default: using $XNAT_USER.') + argp.add_argument('--proj', dest='project', default=None, + help='Project to repush scans on') + argp.add_argument('--subj', dest='subject', default=None, + help='Subject to repush scans on') + argp.add_argument('--sess', dest='session', default=None, + help='Session to repush scans on') + argp.add_argument('--scan', dest='scan', default=None, + help='Scan type to repush') + return argp + + +def check_options(): + """ + Method to check the options specified by the user + + :return: True if OPTIONS are fine, False otherwise + """ + if not OPTIONS.project: + LOGGER.warn('ERROR: Project not specified. Use option --proj') + return False + if not OPTIONS.project: + LOGGER.warn('ERROR: Scan type not specified. Use option --scan') + return False + return True + + +def check_project(project): + """ + Method to check if the user has access to the project on XNAT + + :param project: project to send scans to + :return: True if project exists on XNAT and is accessable + """ + LOGGER.info('Checking ' + project.label() + ' project exists on XNAT') + if not project.exists(): + raise Exception('ERROR: Project ' + str(project) + ' NOT found in XNAT') + else: + LOGGER.info('Project ' + project.label() + ' found!') + + +def check_subject(subject): + """ + Method to check if the user has access to the subject on XNAT + + :param subject: subject to send scans to + :return: True if subject exists on XNAT and is accessable + """ + + LOGGER.info('Checking ' + OPTIONS.subject + ' subject exists on XNAT') + if not subject.exists(): + raise Exception('ERROR: Subject ' + OPTIONS.subject + ' NOT found in XNAT') + else: + LOGGER.info('Subject ' + OPTIONS.subject + ' found!') + + +def check_session(session): + """ + Method to check if the user has access to the session on XNAT + + :param session: session to send scans to + :return: True if session exists on XNAT and is accessable + """ + + LOGGER.info('Checking ' + OPTIONS.session + ' session exists on XNAT') + if not session.exists(): + raise Exception('ERROR: Session ' + OPTIONS.session + ' NOT found in XNAT') + else: + LOGGER.info('Session ' + OPTIONS.session + ' found!') + + +def repush(scans): + df = pd.DataFrame(scans) + df = df[['subject_label','session_label','scan_type','scan_label']] + LOGGER.info('*******************************************') + + if OPTIONS.session: + SESSION = OPTIONS.session + else: + SESSION = session['session_label'] + LOGGER.info(SESSION) + + patient_id = SESSION.split('_') + request = "http://10.109.20.19:8080/dcm4chee-arc/aets/DCM4CHEE57/rs/studies/?PatientID=*{}".format(patient_id[1]) + + res = requests.get(request) + response = str(res.content).split('},') + url = response[8].split('["') + study_url = url[1].split('"]') + study_url = study_url[0] + '/series' + + res = requests.get(study_url) + + scan_id = 9876543210 + flag = 0 + url_save = '' + + for x in res: + if '00081190' in str(x): + x = str(x).split('["') + url = x[1].replace('"]','') + elif '00200011' in str(x): + x = str(x).split('["') + scan_id = str(x).split(':[') + scan_id = scan_id[1].replace("]']",'') + scan_id = scan_id.split(']}') + print(scan_id[0]) + print('----------------------') + + print(str(url)) + print(scan_id) + print(flag) + + if scan_id == '0' and flag == 0: + url_save = str(url) + '/{}/instances?includefield=all' + print('000000000000000000000000000') + print(url_save) + scan_id_save = scan_id + flag = 1 + +# print('THIS IS WHERE WE REPUSH WHATEVER SCAN') + + +def main_display(): + """ + Main display of the executables before any process + + :return: None + """ + print('################################################################') + print('# XnatRepushScan #') + print('# #') + print('# Developed by the MASI Lab Vanderbilt University, TN, USA. #') + print('# If issues, please start a thread here: #') + print('# https://groups.google.com/forum/#!forum/vuiis-cci #') + print('# Usage: #') + print('# Repush project scans from PACS that are missing on XNAT #') + print('# #') + print('# Parameters : #') + if vars(OPTIONS) == DEFAULT_ARGUMENTS: + print('# No Arguments given #') + print('# See the help below or Use "XnatRepushScan -h" #') + print('################################################################\n') + PARSER.print_help() + sys.exit() + else: + if OPTIONS.host: + print('# %*s -> %*s#' % ( + -20, 'XNAT Host', -33, get_proper_str(OPTIONS.host))) + if OPTIONS.username: + print('# %*s -> %*s#' % ( + -20, 'XNAT User', -33, get_proper_str(OPTIONS.username))) + if OPTIONS.project: + print('# %*s -> %*s#' % ( + -20, 'Project ', -33, get_proper_str(OPTIONS.project))) + if OPTIONS.subject: + print('# %*s -> %*s#' % ( + -20, 'Subject Type ', -33, get_proper_str(OPTIONS.subject))) + if OPTIONS.session: + print('# %*s -> %*s#' % ( + -20, 'Session Type ', -33, get_proper_str(OPTIONS.session))) + if OPTIONS.scan: + print('# %*s -> %*s#' % ( + -20, 'Scan Type ', -33, get_proper_str(OPTIONS.scan))) + print('################################################################') + + +def get_proper_str(str_option, end=False): + """ + Method to shorten a string into the proper size for display + + :param str_option: string to shorten + :param end: keep the end of the string visible (default beginning) + :return: shortened string + """ + if len(str_option) > 32: + if end: + return '...' + str_option[-29:] + else: + return str_option[:29] + '...' + else: + return str_option + + +if __name__ == '__main__': + LOGGER = setup_info_logger('XnatRepushScan') + PARSER = parse_args() + OPTIONS = PARSER.parse_args() + main_display() + SHOULD_RUN = check_options() + + if SHOULD_RUN: + if OPTIONS.host: + host = OPTIONS.host + else: + host = os.environ['XNAT_HOST'] + user = OPTIONS.username + + with XnatUtils.get_interface(host=host, user=user) as XNAT: + repush_proj = XNAT.select.project(OPTIONS.project) + check_project(repush_proj) + + if OPTIONS.subject: + repush_subj = XNAT.select_subject(OPTIONS.project,OPTIONS.subject) + check_subject(repush_subj) + if OPTIONS.session: + repush_sess = XNAT.select_session(OPTIONS.project,OPTIONS.subject,OPTIONS.session) + check_session(repush_sess) + else: + repush_sess = None + else: + repush_subj = None + + if repush_subj: + if repush_sess: + scans = XNAT.get_scans(OPTIONS.project, OPTIONS.subject, OPTIONS.session) + repush(scans) + else: + sessions = XNAT.get_sessions(OPTIONS.project, OPTIONS.subject) + for session in sessions: + scans = XNAT.get_scans(OPTIONS.project, OPTIONS.subject, session['label']) + repush(scans) + else: + subjects = XNAT.get_subjects(OPTIONS.project) + for subject in subjects: + sessions = XNAT.get_sessions(OPTIONS.project, subject['label']) + for session in sessions: + scans = XNAT.get_scans(OPTIONS.project, subject['label'], session['label']) + repush(scans)