#!/usr/bin/env python3 """ Main entry point for DICOM data retrieval and routing automation. """ import os import sys import json import argparse import requests from datetime import datetime from config import settings from utils.logger import main_logger as logger from utils.dicom_utils import save_json_data, create_directory_if_not_exists from utils.cleanup import register_exit_handlers, register_cleanup_dir from services.dicom_finder import DicomFinder from services.dicom_retriever import DicomRetriever from services.dicom_sender import DicomSender import signal def parse_args(): """Parse command line arguments.""" parser = argparse.ArgumentParser(description='DICOM Data Retrieval and Routing Automation') # Command options subparsers = parser.add_subparsers(dest='command', help='Command to execute') # Find studies command find_parser = subparsers.add_parser('find-studies', help='Find studies by date range') find_parser.add_argument('--start-date', required=True, help='Start date in YYYYMMDD format') find_parser.add_argument('--end-date', required=True, help='End date in YYYYMMDD format') find_parser.add_argument('--output-file', help='JSON output file (default: studies_YYYYMMDD.json)') # Find series command series_parser = subparsers.add_parser('find-series', help='Find series for a study') series_parser.add_argument('--study-uid', required=True, help='Study Instance UID') series_parser.add_argument('--output-file', help='JSON output file (default: series_STUDYUID.json)') # Find first instance command instance_parser = subparsers.add_parser('find-instance', help='Find first instance for a series') instance_parser.add_argument('--study-uid', required=True, help='Study Instance UID') instance_parser.add_argument('--series-uid', required=True, help='Series Instance UID') instance_parser.add_argument('--output-file', help='JSON output file (default: instance_SERIESUID.json)') # Get study command get_study_parser = subparsers.add_parser('get-study', help='Retrieve a complete study') get_study_parser.add_argument('--study-uid', required=True, help='Study Instance UID to retrieve') # Get series command get_series_parser = subparsers.add_parser('get-series', help='Retrieve a specific series') get_series_parser.add_argument('--study-uid', required=True, help='Study Instance UID') get_series_parser.add_argument('--series-uid', required=True, help='Series Instance UID to retrieve') # Send study command send_study_parser = subparsers.add_parser('send-study', help='Send a complete study to destination PACS') send_study_parser.add_argument('--study-uid', required=True, help='Study Instance UID to send') send_study_parser.add_argument('--study-path', help='Path to study directory (default: DICOM_STORE_DIR/study_uid)') # Send series command send_series_parser = subparsers.add_parser('send-series', help='Send a specific series to destination PACS') send_series_parser.add_argument('--study-uid', required=True, help='Study Instance UID') send_series_parser.add_argument('--series-uid', required=True, help='Series Instance UID to send') # Send file command send_file_parser = subparsers.add_parser('send-file', help='Send a single DICOM file to destination PACS') send_file_parser.add_argument('--file-path', required=True, help='Path to DICOM file to send') # Add to the parse_args() function after the 'process' command process_study_parser = subparsers.add_parser('process-study', help='Process full workflow for a specific study by StudyInstanceUID') process_study_parser.add_argument('--study-uid', required=True, help='StudyInstanceUID to process') process_study_parser.add_argument('--log-dir', help='Directory to save JSON logs (default: JSON_OUTPUT_DIR)') # Process command - full workflow: find, get, and send DICOM data for a date range process_parser = subparsers.add_parser('process', help='Process full workflow (find, get, send) for a date range') process_parser.add_argument('--start-date', required=True, help='Start date in YYYYMMDD format') process_parser.add_argument('--end-date', required=True, help='End date in YYYYMMDD format') process_parser.add_argument('--log-dir', help='Directory to save JSON logs (default: JSON_OUTPUT_DIR)') process_parser.add_argument('--skip-existing', action='store_true', help='Skip studies that already have JSON logs') return parser.parse_args() def find_studies(args): """ Find studies by date range and save results to JSON. Args: args: Command line arguments """ start_date = args.start_date end_date = args.end_date # Validate date format try: datetime.strptime(start_date, '%Y%m%d') datetime.strptime(end_date, '%Y%m%d') except ValueError: logger.error("Invalid date format. Use YYYYMMDD format.") sys.exit(1) finder = DicomFinder() try: # Find studies studies = finder.find_studies_by_date_range(start_date, end_date) # Convert to list of dictionaries for JSON serialization study_data = [] for study in studies: study_dict = { 'StudyInstanceUID': study.StudyInstanceUID, 'StudyDate': study.StudyDate, 'StudyTime': getattr(study, 'StudyTime', ''), 'AccessionNumber': getattr(study, 'AccessionNumber', ''), 'PatientID': getattr(study, 'PatientID', ''), # MedrecID 'StudyDescription': getattr(study, 'StudyDescription', ''), 'NumberOfStudyRelatedSeries': getattr(study, 'NumberOfStudyRelatedSeries', '') } study_data.append(study_dict) # Save to JSON output_file = args.output_file or f"studies_{start_date}_to_{end_date}.json" save_json_data(study_data, output_file, settings.JSON_OUTPUT_DIR) logger.info(f"Found {len(study_data)} studies from {start_date} to {end_date}") logger.info(f"Results saved to {os.path.join(settings.JSON_OUTPUT_DIR, output_file)}") except Exception as e: logger.error(f"Error finding studies: {str(e)}") sys.exit(1) def find_series(args): """ Find series for a study and save results to JSON. Args: args: Command line arguments """ study_uid = args.study_uid finder = DicomFinder() try: # Find series series_list = finder.find_series_for_study(study_uid) # Convert to list of dictionaries for JSON serialization series_data = [] for series in series_list: series_dict = { 'StudyInstanceUID': series.StudyInstanceUID, 'SeriesInstanceUID': series.SeriesInstanceUID, 'SeriesNumber': getattr(series, 'SeriesNumber', ''), 'SeriesDescription': getattr(series, 'SeriesDescription', ''), 'Modality': getattr(series, 'Modality', ''), 'NumberOfSeriesRelatedInstances': getattr(series, 'NumberOfSeriesRelatedInstances', '') } series_data.append(series_dict) # Save to JSON output_file = args.output_file or f"series_{study_uid}.json" save_json_data(series_data, output_file, settings.JSON_OUTPUT_DIR) logger.info(f"Found {len(series_data)} series for study {study_uid}") logger.info(f"Results saved to {os.path.join(settings.JSON_OUTPUT_DIR, output_file)}") except Exception as e: logger.error(f"Error finding series: {str(e)}") sys.exit(1) def find_instance(args): """ Find first instance for a series and save results to JSON. Args: args: Command line arguments """ study_uid = args.study_uid series_uid = args.series_uid finder = DicomFinder() try: # Find first instance instance = finder.find_first_instance_for_series(study_uid, series_uid) if instance: # Convert to dictionary for JSON serialization instance_dict = { 'StudyInstanceUID': instance.StudyInstanceUID, 'SeriesInstanceUID': instance.SeriesInstanceUID, 'SOPInstanceUID': instance.SOPInstanceUID, 'SOPClassUID': getattr(instance, 'SOPClassUID', ''), 'InstanceNumber': getattr(instance, 'InstanceNumber', '') } # Save to JSON output_file = args.output_file or f"instance_{series_uid}.json" save_json_data(instance_dict, output_file, settings.JSON_OUTPUT_DIR) logger.info(f"Found first instance for series {series_uid}") logger.info(f"Results saved to {os.path.join(settings.JSON_OUTPUT_DIR, output_file)}") else: logger.warning(f"No instances found for series {series_uid}") except Exception as e: logger.error(f"Error finding instance: {str(e)}") sys.exit(1) def get_study(args): """ Retrieve a complete study using C-GET. Args: args: Command line arguments """ study_uid = args.study_uid logger.info(f"Starting retrieval of study: {study_uid}") retriever = DicomRetriever() try: # Retrieve the study result = retriever.retrieve_study(study_uid) if result['success']: logger.info(f"Successfully retrieved study {study_uid}") logger.info(f"Retrieved {result['successful_instances']} of {result['total_instances']} instances") logger.info(f"DICOM files stored in: {result['study_dir']}") else: logger.error(f"Failed to retrieve study {study_uid}: {result['status']}") if result['error']: logger.error(f"Error details: {result['error']}") sys.exit(1) except Exception as e: logger.error(f"Error retrieving study: {str(e)}") sys.exit(1) def get_series(args): """ Retrieve a specific series using C-GET. Args: args: Command line arguments """ study_uid = args.study_uid series_uid = args.series_uid logger.info(f"Starting retrieval of series: {series_uid} from study: {study_uid}") retriever = DicomRetriever() try: # Retrieve the series result = retriever.retrieve_series(study_uid, series_uid) if result['success']: logger.info(f"Successfully retrieved series {series_uid}") logger.info(f"Retrieved {result['successful_instances']} of {result['total_instances']} instances") logger.info(f"DICOM files stored in: {result['series_dir']}") else: logger.error(f"Failed to retrieve series {series_uid}: {result['status']}") if result['error']: logger.error(f"Error details: {result['error']}") sys.exit(1) except Exception as e: logger.error(f"Error retrieving series: {str(e)}") sys.exit(1) def send_study(args): """ Send a complete study to destination PACS using C-STORE. Args: args: Command line arguments """ study_uid = args.study_uid study_path = args.study_path or os.path.join(settings.DICOM_STORE_DIR, study_uid) logger.info(f"Starting sending of study: {study_uid} from {study_path}") sender = DicomSender() try: # Send the study result = sender.send_study(study_path) if result['success']: logger.info(f"Successfully sent study {study_uid}") logger.info(f"Sent {result['successful_sends']} of {result['total_files']} files") if result['failed_sends'] > 0: logger.warning(f"Failed to send {result['failed_sends']} files") else: logger.error(f"Failed to send study {study_uid}") if result.get('error'): logger.error(f"Error details: {result['error']}") sys.exit(1) except Exception as e: logger.error(f"Error sending study: {str(e)}") sys.exit(1) def send_series(args): """ Send a specific series to destination PACS using C-STORE. Args: args: Command line arguments """ study_uid = args.study_uid series_uid = args.series_uid logger.info(f"Starting sending of series: {series_uid} from study: {study_uid}") sender = DicomSender() try: # Send the series result = sender.send_series(study_uid, series_uid) if result['success']: logger.info(f"Successfully sent series {series_uid}") logger.info(f"Sent {result['successful_sends']} of {result['total_files']} files") if result['failed_sends'] > 0: logger.warning(f"Failed to send {result['failed_sends']} files") else: logger.error(f"Failed to send series {series_uid}") if result.get('error'): logger.error(f"Error details: {result['error']}") sys.exit(1) except Exception as e: logger.error(f"Error sending series: {str(e)}") sys.exit(1) def send_file(args): """ Send a single DICOM file to destination PACS using C-STORE. Args: args: Command line arguments """ file_path = args.file_path logger.info(f"Starting sending of DICOM file: {file_path}") sender = DicomSender() try: # Send the file result = sender.send_file(file_path) if result['success']: logger.info(f"Successfully sent file {file_path}") else: logger.error(f"Failed to send file {file_path}: {result['status']}") if result['error']: logger.error(f"Error details: {result['error']}") sys.exit(1) except Exception as e: logger.error(f"Error sending file: {str(e)}") sys.exit(1) def process_single_study(study_uid, accession_number=None, log_dir=None): """ Process a single study: retrieve it, send to destination PACS, and create detailed JSON logs. Args: study_uid: Study Instance UID accession_number: Optional accession number log_dir: Directory to save logs Returns: dict: Result information """ log_dir = log_dir or settings.JSON_OUTPUT_DIR # Create services finder = DicomFinder() retriever = DicomRetriever() sender = DicomSender() # First, get the study metadata with a query if not provided if not accession_number: study_metadata = finder.find_study_by_uid(study_uid) if not study_metadata: logger.error(f"Study not found: {study_uid}") return {'success': False, 'error': 'Study not found'} accession_number = getattr(study_metadata, 'AccessionNumber', '') else: study_metadata = None logger.info(f"Processing study: {accession_number} with Study_IUID ({study_uid})") # STEP 1: Retrieve the study retrieve_result = retriever.retrieve_study(study_uid, accession_number=accession_number) if not retrieve_result['success']: logger.error(f"Failed to retrieve study {study_uid}: {retrieve_result['status']}") return {'success': False, 'error': f"Failed to retrieve study: {retrieve_result['status']}"} logger.info(f"Retrieved {retrieve_result['successful_instances']} instances for study {study_uid}") # STEP 2: Send the study to destination PACS send_result = sender.send_study(os.path.join(settings.DICOM_STORE_DIR, study_uid)) if not send_result['success']: logger.error(f"Failed to send study {study_uid}: {send_result.get('error', 'Unknown error')}") logger.info(f"Sent {send_result['successful_sends']} of {send_result['total_files']} files to destination PACS") # STEP 3: Find all series and create detailed logs series_list = finder.find_series_for_study(study_uid, accession_number=accession_number) # Get metadata from study_metadata if available, otherwise from the first element of study list if study_metadata: study_date = getattr(study_metadata, 'StudyDate', '') study_time = getattr(study_metadata, 'StudyTime', '000000') patient_id = getattr(study_metadata, 'PatientID', '') study_description = getattr(study_metadata, 'StudyDescription', '') else: # We need to query for it again study_details = finder.find_study_by_uid(study_uid) study_date = getattr(study_details, 'StudyDate', '') if study_details else '' study_time = getattr(study_details, 'StudyTime', '000000') if study_details else '000000' patient_id = getattr(study_details, 'PatientID', '') if study_details else '' study_description = getattr(study_details, 'StudyDescription', '') if study_details else '' study_datetime = f"{study_date}{study_time}" study_log = { 'Study_IUID': study_uid, 'AccessionNumber': accession_number, 'PatientID': patient_id, 'StudyDescription': study_description, 'StudyDateTime': study_datetime, 'CstoreSuccess': send_result['success'], 'Series': [] } for series in series_list: series_uid = series.SeriesInstanceUID # Get first instance for the series instance = finder.find_first_instance_for_series(study_uid, series_uid) # Kalau null atau None, forced ke '' def safe_get_attr(obj, attr_name, default=''): """Get attribute value, ensuring None is converted to default.""" value = getattr(obj, attr_name, default) return default if value is None else str(value) series_info = { 'Series_IUID': safe_get_attr(series, 'SeriesInstanceUID'), 'SeriesNumber': safe_get_attr(series, 'SeriesNumber'), 'SeriesDescription': safe_get_attr(series, 'SeriesDescription'), 'NumberOfInstances': safe_get_attr(series, 'NumberOfSeriesRelatedInstances'), 'SOP_IUID': safe_get_attr(instance, 'SOPInstanceUID') if instance else '' } study_log['Series'].append(series_info) # STEP 4: Send study_log to HIS API his_url = f"http://{settings.HIS_HOST}{settings.HIS_URL}" try: headers = { 'id': 'Vmtaa2MySnRUblJTYWtKb1ZucHNNVlZVU2pSaFIwNTBZa1JDYkZaV1NtOWFSV1JIVmxkSmQxSnJUbFpTVlZwRlZsaGpPVkJSUFQwPQ==', 'Content-Type': 'application/json' } response = requests.post(his_url, json=study_log, headers=headers) # Add response data to the log study_log['HisApiResponse'] = { 'StatusCode': response.status_code, 'ResponseText': response.text, 'Success': False, 'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } # Safer JSON parsing try: response_data = response.json() if response.content else {} study_log['HisApiResponse']['ResponseData'] = response_data if response.status_code == 200 and response_data.get('OK') == "1": study_log['HisApiResponse']['Success'] = True logger.info(f"Successfully sent JSON {accession_number} to HIS API") return {'success': True, 'study_log': study_log, 'his_success': True} else: error_msg = response_data.get('message', 'Unknown error') logger.error(f"Failed to send JSON for {accession_number} to HIS API: {error_msg}. Study_IUID: {study_uid}") return {'success': True, 'study_log': study_log, 'his_success': False} except ValueError: study_log['HisApiResponse']['ParseError'] = "Invalid JSON response" logger.error(f"Failed to parse response for {accession_number}, invalid JSON response. Status code: {response.status_code}") return {'success': True, 'study_log': study_log, 'his_success': False} except Exception as e: study_log['HisApiResponse'] = { 'Success': False, 'Error': str(e), 'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S') } logger.error(f"Error sending study for {accession_number} to HIS API: {str(e)}") return {'success': True, 'study_log': study_log, 'his_success': False, 'error': str(e)} def process_workflow(args): """ Process the full workflow: find studies by date range, retrieve them, and send to destination PACS. Also creates detailed JSON logs for each series. Args: args: Command line arguments """ start_date = args.start_date end_date = args.end_date log_dir = args.log_dir or settings.JSON_OUTPUT_DIR skip_existing = args.skip_existing # Validate date format try: datetime.strptime(start_date, '%Y%m%d') datetime.strptime(end_date, '%Y%m%d') except ValueError: logger.error("Invalid date format. Use YYYYMMDD format.") sys.exit(1) logger.info(f"Starting full workflow process for studies from {start_date} to {end_date}") # Create services finder = DicomFinder() retriever = DicomRetriever() sender = DicomSender() # Create directory for logs create_directory_if_not_exists(log_dir) create_directory_if_not_exists("logs") # STEP 1: Find studies in the date range try: studies = finder.find_studies_by_date_range(start_date, end_date) logger.info(f"Found {len(studies)} studies between {start_date} and {end_date}") # Save studies summary studies_summary = [{ 'StudyInstanceUID': study.StudyInstanceUID, 'StudyDate': getattr(study, 'StudyDate', ''), 'StudyTime': getattr(study, 'StudyTime', ''), 'AccessionNumber': getattr(study, 'AccessionNumber', ''), 'PatientID': getattr(study, 'PatientID', ''), 'StudyDescription': getattr(study, 'StudyDescription', '') } for study in studies] save_json_data(studies_summary, f"studies_{start_date}_to_{end_date}.json", log_dir) # STEP 2, 3, 4: For each study, retrieve, send, and log details his_log = [] his_fail_log = [] his_log_filename = f"sendtohis_{start_date}_{end_date}.json" his_fail_log_filename = f"fail_sendtohis_{start_date}_{end_date}.json" # Register signal handler for SIGINT original_sigint_handler = signal.getsignal(signal.SIGINT) def sigint_handler(sig, frame): logger.warning("Process interrupted by user. Saving logs before exiting...") # Save HIS logs if his_log: save_json_data(his_log, his_log_filename, "logs") logger.info(f"Saved {len(his_log)} successful logs to {his_log_filename}") if his_fail_log: save_json_data(his_fail_log, his_fail_log_filename, "logs") logger.info(f"Saved {len(his_fail_log)} failed logs to {his_fail_log_filename}") # Restore original signal handler and raise KeyboardInterrupt signal.signal(signal.SIGINT, original_sigint_handler) sys.exit(1) # Set up our custom signal handler signal.signal(signal.SIGINT, sigint_handler) for i, study in enumerate(studies): accession_number = study.AccessionNumber study_uid = study.StudyInstanceUID logger.info(f"Processing study {i+1}/{len(studies)}: {accession_number} with Study_IUID ({study_uid})") # Check if we should skip this study log_file_path = os.path.join(log_dir, f"{study_uid}.json") if skip_existing and os.path.exists(log_file_path): logger.info(f"Skipping study {study_uid} - log file already exists") continue try: result = process_single_study(study_uid, accession_number, log_dir) if not result['success']: logger.error(f"Failed to process study {study_uid}: {result.get('error', 'Unknown error')}") continue # Record results for HIS logs if result.get('his_success', False): his_log.append(result['study_log']) else: his_fail_log.append(result['study_log']) # Save progress incrementally every 5 studies if (i + 1) % 5 == 0: if his_log: save_json_data(his_log, his_log_filename, "logs") if his_fail_log: save_json_data(his_fail_log, his_fail_log_filename, "logs") logger.info(f"Saved progress after processing {i+1} studies") except Exception as e: logger.error(f"Error processing study {accession_number}: {str(e)}") continue # Restore original signal handler signal.signal(signal.SIGINT, original_sigint_handler) # Final Save HIS logs if his_log: save_json_data(his_log, his_log_filename, "logs") if his_fail_log: save_json_data(his_fail_log, his_fail_log_filename, "logs") logger.info(f"Completed processing {len(studies)} studies from {start_date} to {end_date}") except Exception as e: logger.error(f"Error during workflow processing: {str(e)}") sys.exit(1) def process_workflow_by_study(args): """ Process the full workflow for a specific study. Args: args: Command line arguments """ study_uid = args.study_uid log_dir = args.log_dir or settings.JSON_OUTPUT_DIR logger.info(f"Starting full workflow process for study: {study_uid}") # Create directory for logs create_directory_if_not_exists(log_dir) create_directory_if_not_exists("logs") # Register signal handler for SIGINT original_sigint_handler = signal.getsignal(signal.SIGINT) # Create empty containers for logs his_log = [] his_fail_log = [] his_log_filename = f"sendtohis_study_{study_uid}.json" his_fail_log_filename = f"fail_sendtohis_study_{study_uid}.json" def sigint_handler(sig, frame): logger.warning("Process interrupted by user. Saving logs before exiting...") # Save individual study log if 'result' in locals() and result.get('success', False): save_json_data(result['study_log'], f"{study_uid}.json", log_dir) # Save HIS logs if result.get('his_success', False): save_json_data([result['study_log']], his_log_filename, "logs") logger.info(f"Saved successful log to {his_log_filename}") else: save_json_data([result['study_log']], his_fail_log_filename, "logs") logger.info(f"Saved failed log to {his_fail_log_filename}") # Restore original signal handler and exit signal.signal(signal.SIGINT, original_sigint_handler) sys.exit(1) # Set up our custom signal handler signal.signal(signal.SIGINT, sigint_handler) try: result = process_single_study(study_uid, log_dir=log_dir) if not result['success']: logger.error(f"Failed to process study {study_uid}: {result.get('error', 'Unknown error')}") sys.exit(1) # Save individual study log save_json_data(result['study_log'], f"{study_uid}.json", log_dir) # Save logs if result.get('his_success', False): save_json_data([result['study_log']], his_log_filename, "logs") else: save_json_data([result['study_log']], his_fail_log_filename, "logs") logger.info(f"Completed processing study: {study_uid}") except Exception as e: logger.error(f"Error during study processing: {str(e)}") sys.exit(1) finally: # Restore original signal handler signal.signal(signal.SIGINT, original_sigint_handler) def main(): """Main function.""" # Register cleanup handlers for graceful exit register_exit_handlers() # Create necessary directories create_directory_if_not_exists(settings.JSON_OUTPUT_DIR) create_directory_if_not_exists(settings.DICOM_STORE_DIR) # Register the main DICOM store directory for cleanup if os.path.exists(settings.DICOM_STORE_DIR): # Only register top-level directories in the DICOM store directory for item in os.listdir(settings.DICOM_STORE_DIR): item_path = os.path.join(settings.DICOM_STORE_DIR, item) if os.path.isdir(item_path): register_cleanup_dir(item_path) # Parse arguments args = parse_args() if args.command == 'find-studies': find_studies(args) elif args.command == 'find-series': find_series(args) elif args.command == 'find-instance': find_instance(args) elif args.command == 'get-study': get_study(args) elif args.command == 'get-series': get_series(args) elif args.command == 'send-study': send_study(args) elif args.command == 'send-series': send_series(args) elif args.command == 'send-file': send_file(args) elif args.command == 'process': process_workflow(args) elif args.command == 'process-study': process_workflow_by_study(args) else: logger.error("No command specified. Use --help for options.") sys.exit(1) if __name__ == "__main__": main()