750 lines
30 KiB
Python
750 lines
30 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Main entry point for DICOM data retrieval and routing automation.
|
|
"""
|
|
import os
|
|
import sys
|
|
import json
|
|
import argparse
|
|
import requests
|
|
from datetime import datetime
|
|
from config import settings
|
|
from utils.logger import main_logger as logger
|
|
from utils.dicom_utils import save_json_data, create_directory_if_not_exists
|
|
from utils.cleanup import register_exit_handlers, register_cleanup_dir
|
|
from services.dicom_finder import DicomFinder
|
|
from services.dicom_retriever import DicomRetriever
|
|
from services.dicom_sender import DicomSender
|
|
import signal
|
|
|
|
def parse_args():
|
|
"""Parse command line arguments."""
|
|
parser = argparse.ArgumentParser(description='DICOM Data Retrieval and Routing Automation')
|
|
|
|
# Command options
|
|
subparsers = parser.add_subparsers(dest='command', help='Command to execute')
|
|
|
|
# Find studies command
|
|
find_parser = subparsers.add_parser('find-studies', help='Find studies by date range')
|
|
find_parser.add_argument('--start-date', required=True, help='Start date in YYYYMMDD format')
|
|
find_parser.add_argument('--end-date', required=True, help='End date in YYYYMMDD format')
|
|
find_parser.add_argument('--output-file', help='JSON output file (default: studies_YYYYMMDD.json)')
|
|
|
|
# Find series command
|
|
series_parser = subparsers.add_parser('find-series', help='Find series for a study')
|
|
series_parser.add_argument('--study-uid', required=True, help='Study Instance UID')
|
|
series_parser.add_argument('--output-file', help='JSON output file (default: series_STUDYUID.json)')
|
|
|
|
# Find first instance command
|
|
instance_parser = subparsers.add_parser('find-instance', help='Find first instance for a series')
|
|
instance_parser.add_argument('--study-uid', required=True, help='Study Instance UID')
|
|
instance_parser.add_argument('--series-uid', required=True, help='Series Instance UID')
|
|
instance_parser.add_argument('--output-file', help='JSON output file (default: instance_SERIESUID.json)')
|
|
|
|
# Get study command
|
|
get_study_parser = subparsers.add_parser('get-study', help='Retrieve a complete study')
|
|
get_study_parser.add_argument('--study-uid', required=True, help='Study Instance UID to retrieve')
|
|
|
|
# Get series command
|
|
get_series_parser = subparsers.add_parser('get-series', help='Retrieve a specific series')
|
|
get_series_parser.add_argument('--study-uid', required=True, help='Study Instance UID')
|
|
get_series_parser.add_argument('--series-uid', required=True, help='Series Instance UID to retrieve')
|
|
|
|
# Send study command
|
|
send_study_parser = subparsers.add_parser('send-study', help='Send a complete study to destination PACS')
|
|
send_study_parser.add_argument('--study-uid', required=True, help='Study Instance UID to send')
|
|
send_study_parser.add_argument('--study-path', help='Path to study directory (default: DICOM_STORE_DIR/study_uid)')
|
|
|
|
# Send series command
|
|
send_series_parser = subparsers.add_parser('send-series', help='Send a specific series to destination PACS')
|
|
send_series_parser.add_argument('--study-uid', required=True, help='Study Instance UID')
|
|
send_series_parser.add_argument('--series-uid', required=True, help='Series Instance UID to send')
|
|
|
|
# Send file command
|
|
send_file_parser = subparsers.add_parser('send-file', help='Send a single DICOM file to destination PACS')
|
|
send_file_parser.add_argument('--file-path', required=True, help='Path to DICOM file to send')
|
|
|
|
# Add to the parse_args() function after the 'process' command
|
|
process_study_parser = subparsers.add_parser('process-study', help='Process full workflow for a specific study by StudyInstanceUID')
|
|
process_study_parser.add_argument('--study-uid', required=True, help='StudyInstanceUID to process')
|
|
process_study_parser.add_argument('--log-dir', help='Directory to save JSON logs (default: JSON_OUTPUT_DIR)')
|
|
|
|
# Process command - full workflow: find, get, and send DICOM data for a date range
|
|
process_parser = subparsers.add_parser('process', help='Process full workflow (find, get, send) for a date range')
|
|
process_parser.add_argument('--start-date', required=True, help='Start date in YYYYMMDD format')
|
|
process_parser.add_argument('--end-date', required=True, help='End date in YYYYMMDD format')
|
|
process_parser.add_argument('--log-dir', help='Directory to save JSON logs (default: JSON_OUTPUT_DIR)')
|
|
process_parser.add_argument('--skip-existing', action='store_true', help='Skip studies that already have JSON logs')
|
|
|
|
return parser.parse_args()
|
|
|
|
def find_studies(args):
|
|
"""
|
|
Find studies by date range and save results to JSON.
|
|
|
|
Args:
|
|
args: Command line arguments
|
|
"""
|
|
start_date = args.start_date
|
|
end_date = args.end_date
|
|
|
|
# Validate date format
|
|
try:
|
|
datetime.strptime(start_date, '%Y%m%d')
|
|
datetime.strptime(end_date, '%Y%m%d')
|
|
except ValueError:
|
|
logger.error("Invalid date format. Use YYYYMMDD format.")
|
|
sys.exit(1)
|
|
|
|
finder = DicomFinder()
|
|
|
|
try:
|
|
# Find studies
|
|
studies = finder.find_studies_by_date_range(start_date, end_date)
|
|
|
|
# Convert to list of dictionaries for JSON serialization
|
|
study_data = []
|
|
for study in studies:
|
|
study_dict = {
|
|
'StudyInstanceUID': study.StudyInstanceUID,
|
|
'StudyDate': study.StudyDate,
|
|
'StudyTime': getattr(study, 'StudyTime', ''),
|
|
'AccessionNumber': getattr(study, 'AccessionNumber', ''),
|
|
'PatientID': getattr(study, 'PatientID', ''), # MedrecID
|
|
'StudyDescription': getattr(study, 'StudyDescription', ''),
|
|
'NumberOfStudyRelatedSeries': getattr(study, 'NumberOfStudyRelatedSeries', '')
|
|
}
|
|
study_data.append(study_dict)
|
|
|
|
# Save to JSON
|
|
output_file = args.output_file or f"studies_{start_date}_to_{end_date}.json"
|
|
save_json_data(study_data, output_file, settings.JSON_OUTPUT_DIR)
|
|
|
|
logger.info(f"Found {len(study_data)} studies from {start_date} to {end_date}")
|
|
logger.info(f"Results saved to {os.path.join(settings.JSON_OUTPUT_DIR, output_file)}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error finding studies: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
def find_series(args):
|
|
"""
|
|
Find series for a study and save results to JSON.
|
|
|
|
Args:
|
|
args: Command line arguments
|
|
"""
|
|
study_uid = args.study_uid
|
|
|
|
finder = DicomFinder()
|
|
|
|
try:
|
|
# Find series
|
|
series_list = finder.find_series_for_study(study_uid)
|
|
|
|
# Convert to list of dictionaries for JSON serialization
|
|
series_data = []
|
|
for series in series_list:
|
|
series_dict = {
|
|
'StudyInstanceUID': series.StudyInstanceUID,
|
|
'SeriesInstanceUID': series.SeriesInstanceUID,
|
|
'SeriesNumber': getattr(series, 'SeriesNumber', ''),
|
|
'SeriesDescription': getattr(series, 'SeriesDescription', ''),
|
|
'Modality': getattr(series, 'Modality', ''),
|
|
'NumberOfSeriesRelatedInstances': getattr(series, 'NumberOfSeriesRelatedInstances', '')
|
|
}
|
|
series_data.append(series_dict)
|
|
|
|
# Save to JSON
|
|
output_file = args.output_file or f"series_{study_uid}.json"
|
|
save_json_data(series_data, output_file, settings.JSON_OUTPUT_DIR)
|
|
|
|
logger.info(f"Found {len(series_data)} series for study {study_uid}")
|
|
logger.info(f"Results saved to {os.path.join(settings.JSON_OUTPUT_DIR, output_file)}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error finding series: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
def find_instance(args):
|
|
"""
|
|
Find first instance for a series and save results to JSON.
|
|
|
|
Args:
|
|
args: Command line arguments
|
|
"""
|
|
study_uid = args.study_uid
|
|
series_uid = args.series_uid
|
|
|
|
finder = DicomFinder()
|
|
|
|
try:
|
|
# Find first instance
|
|
instance = finder.find_first_instance_for_series(study_uid, series_uid)
|
|
|
|
if instance:
|
|
# Convert to dictionary for JSON serialization
|
|
instance_dict = {
|
|
'StudyInstanceUID': instance.StudyInstanceUID,
|
|
'SeriesInstanceUID': instance.SeriesInstanceUID,
|
|
'SOPInstanceUID': instance.SOPInstanceUID,
|
|
'SOPClassUID': getattr(instance, 'SOPClassUID', ''),
|
|
'InstanceNumber': getattr(instance, 'InstanceNumber', '')
|
|
}
|
|
|
|
# Save to JSON
|
|
output_file = args.output_file or f"instance_{series_uid}.json"
|
|
save_json_data(instance_dict, output_file, settings.JSON_OUTPUT_DIR)
|
|
|
|
logger.info(f"Found first instance for series {series_uid}")
|
|
logger.info(f"Results saved to {os.path.join(settings.JSON_OUTPUT_DIR, output_file)}")
|
|
else:
|
|
logger.warning(f"No instances found for series {series_uid}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error finding instance: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
def get_study(args):
|
|
"""
|
|
Retrieve a complete study using C-GET.
|
|
|
|
Args:
|
|
args: Command line arguments
|
|
"""
|
|
study_uid = args.study_uid
|
|
|
|
logger.info(f"Starting retrieval of study: {study_uid}")
|
|
|
|
retriever = DicomRetriever()
|
|
|
|
try:
|
|
# Retrieve the study
|
|
result = retriever.retrieve_study(study_uid)
|
|
|
|
if result['success']:
|
|
logger.info(f"Successfully retrieved study {study_uid}")
|
|
logger.info(f"Retrieved {result['successful_instances']} of {result['total_instances']} instances")
|
|
logger.info(f"DICOM files stored in: {result['study_dir']}")
|
|
else:
|
|
logger.error(f"Failed to retrieve study {study_uid}: {result['status']}")
|
|
if result['error']:
|
|
logger.error(f"Error details: {result['error']}")
|
|
sys.exit(1)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving study: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
def get_series(args):
|
|
"""
|
|
Retrieve a specific series using C-GET.
|
|
|
|
Args:
|
|
args: Command line arguments
|
|
"""
|
|
study_uid = args.study_uid
|
|
series_uid = args.series_uid
|
|
|
|
logger.info(f"Starting retrieval of series: {series_uid} from study: {study_uid}")
|
|
|
|
retriever = DicomRetriever()
|
|
|
|
try:
|
|
# Retrieve the series
|
|
result = retriever.retrieve_series(study_uid, series_uid)
|
|
|
|
if result['success']:
|
|
logger.info(f"Successfully retrieved series {series_uid}")
|
|
logger.info(f"Retrieved {result['successful_instances']} of {result['total_instances']} instances")
|
|
logger.info(f"DICOM files stored in: {result['series_dir']}")
|
|
else:
|
|
logger.error(f"Failed to retrieve series {series_uid}: {result['status']}")
|
|
if result['error']:
|
|
logger.error(f"Error details: {result['error']}")
|
|
sys.exit(1)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error retrieving series: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
def send_study(args):
|
|
"""
|
|
Send a complete study to destination PACS using C-STORE.
|
|
|
|
Args:
|
|
args: Command line arguments
|
|
"""
|
|
study_uid = args.study_uid
|
|
study_path = args.study_path or os.path.join(settings.DICOM_STORE_DIR, study_uid)
|
|
|
|
logger.info(f"Starting sending of study: {study_uid} from {study_path}")
|
|
|
|
sender = DicomSender()
|
|
|
|
try:
|
|
# Send the study
|
|
result = sender.send_study(study_path)
|
|
|
|
if result['success']:
|
|
logger.info(f"Successfully sent study {study_uid}")
|
|
logger.info(f"Sent {result['successful_sends']} of {result['total_files']} files")
|
|
if result['failed_sends'] > 0:
|
|
logger.warning(f"Failed to send {result['failed_sends']} files")
|
|
else:
|
|
logger.error(f"Failed to send study {study_uid}")
|
|
if result.get('error'):
|
|
logger.error(f"Error details: {result['error']}")
|
|
sys.exit(1)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending study: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
def send_series(args):
|
|
"""
|
|
Send a specific series to destination PACS using C-STORE.
|
|
|
|
Args:
|
|
args: Command line arguments
|
|
"""
|
|
study_uid = args.study_uid
|
|
series_uid = args.series_uid
|
|
|
|
logger.info(f"Starting sending of series: {series_uid} from study: {study_uid}")
|
|
|
|
sender = DicomSender()
|
|
|
|
try:
|
|
# Send the series
|
|
result = sender.send_series(study_uid, series_uid)
|
|
|
|
if result['success']:
|
|
logger.info(f"Successfully sent series {series_uid}")
|
|
logger.info(f"Sent {result['successful_sends']} of {result['total_files']} files")
|
|
if result['failed_sends'] > 0:
|
|
logger.warning(f"Failed to send {result['failed_sends']} files")
|
|
else:
|
|
logger.error(f"Failed to send series {series_uid}")
|
|
if result.get('error'):
|
|
logger.error(f"Error details: {result['error']}")
|
|
sys.exit(1)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending series: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
def send_file(args):
|
|
"""
|
|
Send a single DICOM file to destination PACS using C-STORE.
|
|
|
|
Args:
|
|
args: Command line arguments
|
|
"""
|
|
file_path = args.file_path
|
|
|
|
logger.info(f"Starting sending of DICOM file: {file_path}")
|
|
|
|
sender = DicomSender()
|
|
|
|
try:
|
|
# Send the file
|
|
result = sender.send_file(file_path)
|
|
|
|
if result['success']:
|
|
logger.info(f"Successfully sent file {file_path}")
|
|
else:
|
|
logger.error(f"Failed to send file {file_path}: {result['status']}")
|
|
if result['error']:
|
|
logger.error(f"Error details: {result['error']}")
|
|
sys.exit(1)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error sending file: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
def process_single_study(study_uid, accession_number=None, log_dir=None):
|
|
"""
|
|
Process a single study: retrieve it, send to destination PACS,
|
|
and create detailed JSON logs.
|
|
|
|
Args:
|
|
study_uid: Study Instance UID
|
|
accession_number: Optional accession number
|
|
log_dir: Directory to save logs
|
|
|
|
Returns:
|
|
dict: Result information
|
|
"""
|
|
log_dir = log_dir or settings.JSON_OUTPUT_DIR
|
|
|
|
# Create services
|
|
finder = DicomFinder()
|
|
retriever = DicomRetriever()
|
|
sender = DicomSender()
|
|
|
|
# First, get the study metadata with a query if not provided
|
|
if not accession_number:
|
|
study_metadata = finder.find_study_by_uid(study_uid)
|
|
if not study_metadata:
|
|
logger.error(f"Study not found: {study_uid}")
|
|
return {'success': False, 'error': 'Study not found'}
|
|
accession_number = getattr(study_metadata, 'AccessionNumber', '')
|
|
else:
|
|
study_metadata = None
|
|
|
|
logger.info(f"Processing study: {accession_number} with Study_IUID ({study_uid})")
|
|
|
|
# STEP 1: Retrieve the study
|
|
retrieve_result = retriever.retrieve_study(study_uid, accession_number=accession_number)
|
|
|
|
if not retrieve_result['success']:
|
|
logger.error(f"Failed to retrieve study {study_uid}: {retrieve_result['status']}")
|
|
return {'success': False, 'error': f"Failed to retrieve study: {retrieve_result['status']}"}
|
|
|
|
logger.info(f"Retrieved {retrieve_result['successful_instances']} instances for study {study_uid}")
|
|
|
|
# STEP 2: Send the study to destination PACS
|
|
send_result = sender.send_study(os.path.join(settings.DICOM_STORE_DIR, study_uid))
|
|
|
|
if not send_result['success']:
|
|
logger.error(f"Failed to send study {study_uid}: {send_result.get('error', 'Unknown error')}")
|
|
|
|
logger.info(f"Sent {send_result['successful_sends']} of {send_result['total_files']} files to destination PACS")
|
|
|
|
# STEP 3: Find all series and create detailed logs
|
|
series_list = finder.find_series_for_study(study_uid, accession_number=accession_number)
|
|
|
|
# Get metadata from study_metadata if available, otherwise from the first element of study list
|
|
if study_metadata:
|
|
study_date = getattr(study_metadata, 'StudyDate', '')
|
|
study_time = getattr(study_metadata, 'StudyTime', '000000')
|
|
patient_id = getattr(study_metadata, 'PatientID', '')
|
|
study_description = getattr(study_metadata, 'StudyDescription', '')
|
|
else:
|
|
# We need to query for it again
|
|
study_details = finder.find_study_by_uid(study_uid)
|
|
study_date = getattr(study_details, 'StudyDate', '') if study_details else ''
|
|
study_time = getattr(study_details, 'StudyTime', '000000') if study_details else '000000'
|
|
patient_id = getattr(study_details, 'PatientID', '') if study_details else ''
|
|
study_description = getattr(study_details, 'StudyDescription', '') if study_details else ''
|
|
|
|
study_datetime = f"{study_date}{study_time}"
|
|
|
|
study_log = {
|
|
'Study_IUID': study_uid,
|
|
'AccessionNumber': accession_number,
|
|
'PatientID': patient_id,
|
|
'StudyDescription': study_description,
|
|
'StudyDateTime': study_datetime,
|
|
'CstoreSuccess': send_result['success'],
|
|
'Series': []
|
|
}
|
|
|
|
for series in series_list:
|
|
series_uid = series.SeriesInstanceUID
|
|
|
|
# Get first instance for the series
|
|
instance = finder.find_first_instance_for_series(study_uid, series_uid)
|
|
|
|
# Kalau null atau None, forced ke ''
|
|
def safe_get_attr(obj, attr_name, default=''):
|
|
"""Get attribute value, ensuring None is converted to default."""
|
|
value = getattr(obj, attr_name, default)
|
|
return default if value is None else str(value)
|
|
|
|
series_info = {
|
|
'Series_IUID': safe_get_attr(series, 'SeriesInstanceUID'),
|
|
'SeriesNumber': safe_get_attr(series, 'SeriesNumber'),
|
|
'SeriesDescription': safe_get_attr(series, 'SeriesDescription'),
|
|
'NumberOfInstances': safe_get_attr(series, 'NumberOfSeriesRelatedInstances'),
|
|
'SOP_IUID': safe_get_attr(instance, 'SOPInstanceUID') if instance else ''
|
|
}
|
|
|
|
study_log['Series'].append(series_info)
|
|
|
|
# STEP 4: Send study_log to HIS API
|
|
his_url = f"http://{settings.HIS_HOST}{settings.HIS_URL}"
|
|
try:
|
|
headers = {
|
|
'id': 'Vmtaa2MySnRUblJTYWtKb1ZucHNNVlZVU2pSaFIwNTBZa1JDYkZaV1NtOWFSV1JIVmxkSmQxSnJUbFpTVlZwRlZsaGpPVkJSUFQwPQ==',
|
|
'Content-Type': 'application/json'
|
|
}
|
|
response = requests.post(his_url, json=study_log, headers=headers)
|
|
|
|
# Add response data to the log
|
|
study_log['HisApiResponse'] = {
|
|
'StatusCode': response.status_code,
|
|
'ResponseText': response.text,
|
|
'Success': False,
|
|
'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
}
|
|
|
|
# Safer JSON parsing
|
|
try:
|
|
response_data = response.json() if response.content else {}
|
|
study_log['HisApiResponse']['ResponseData'] = response_data
|
|
|
|
if response.status_code == 200 and response_data.get('OK') == "1":
|
|
study_log['HisApiResponse']['Success'] = True
|
|
logger.info(f"Successfully sent JSON {accession_number} to HIS API")
|
|
return {'success': True, 'study_log': study_log, 'his_success': True}
|
|
else:
|
|
error_msg = response_data.get('message', 'Unknown error')
|
|
logger.error(f"Failed to send JSON for {accession_number} to HIS API: {error_msg}. Study_IUID: {study_uid}")
|
|
return {'success': True, 'study_log': study_log, 'his_success': False}
|
|
except ValueError:
|
|
study_log['HisApiResponse']['ParseError'] = "Invalid JSON response"
|
|
logger.error(f"Failed to parse response for {accession_number}, invalid JSON response. Status code: {response.status_code}")
|
|
return {'success': True, 'study_log': study_log, 'his_success': False}
|
|
|
|
except Exception as e:
|
|
study_log['HisApiResponse'] = {
|
|
'Success': False,
|
|
'Error': str(e),
|
|
'Timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S')
|
|
}
|
|
logger.error(f"Error sending study for {accession_number} to HIS API: {str(e)}")
|
|
return {'success': True, 'study_log': study_log, 'his_success': False, 'error': str(e)}
|
|
|
|
def process_workflow(args):
|
|
"""
|
|
Process the full workflow: find studies by date range, retrieve them, and send to destination PACS.
|
|
Also creates detailed JSON logs for each series.
|
|
|
|
Args:
|
|
args: Command line arguments
|
|
"""
|
|
start_date = args.start_date
|
|
end_date = args.end_date
|
|
log_dir = args.log_dir or settings.JSON_OUTPUT_DIR
|
|
skip_existing = args.skip_existing
|
|
|
|
# Validate date format
|
|
try:
|
|
datetime.strptime(start_date, '%Y%m%d')
|
|
datetime.strptime(end_date, '%Y%m%d')
|
|
except ValueError:
|
|
logger.error("Invalid date format. Use YYYYMMDD format.")
|
|
sys.exit(1)
|
|
|
|
logger.info(f"Starting full workflow process for studies from {start_date} to {end_date}")
|
|
|
|
# Create services
|
|
finder = DicomFinder()
|
|
|
|
# Create directory for logs
|
|
create_directory_if_not_exists(log_dir)
|
|
create_directory_if_not_exists("logs")
|
|
|
|
# STEP 1: Find studies in the date range
|
|
try:
|
|
studies = finder.find_studies_by_date_range(start_date, end_date)
|
|
logger.info(f"Found {len(studies)} studies between {start_date} and {end_date}")
|
|
|
|
# Save studies summary
|
|
studies_summary = [{
|
|
'StudyInstanceUID': study.StudyInstanceUID,
|
|
'StudyDate': getattr(study, 'StudyDate', ''),
|
|
'StudyTime': getattr(study, 'StudyTime', ''),
|
|
'AccessionNumber': getattr(study, 'AccessionNumber', ''),
|
|
'PatientID': getattr(study, 'PatientID', ''),
|
|
'StudyDescription': getattr(study, 'StudyDescription', '')
|
|
} for study in studies]
|
|
|
|
save_json_data(studies_summary, f"studies_{start_date}_to_{end_date}.json", log_dir)
|
|
|
|
# STEP 2, 3, 4: For each study, retrieve, send, and log details
|
|
his_log = []
|
|
his_fail_log = []
|
|
his_log_filename = f"sendtohis_{start_date}_{end_date}.json"
|
|
his_fail_log_filename = f"fail_sendtohis_{start_date}_{end_date}.json"
|
|
|
|
# Register signal handler for SIGINT
|
|
original_sigint_handler = signal.getsignal(signal.SIGINT)
|
|
|
|
def sigint_handler(sig, frame):
|
|
logger.warning("Process interrupted by user. Saving logs before exiting...")
|
|
# Save HIS logs
|
|
if his_log:
|
|
save_json_data(his_log, his_log_filename, "logs")
|
|
logger.info(f"Saved {len(his_log)} successful logs to {his_log_filename}")
|
|
if his_fail_log:
|
|
save_json_data(his_fail_log, his_fail_log_filename, "logs")
|
|
logger.info(f"Saved {len(his_fail_log)} failed logs to {his_fail_log_filename}")
|
|
# Restore original signal handler and raise KeyboardInterrupt
|
|
signal.signal(signal.SIGINT, original_sigint_handler)
|
|
sys.exit(1)
|
|
|
|
# Set up our custom signal handler
|
|
signal.signal(signal.SIGINT, sigint_handler)
|
|
|
|
|
|
for i, study in enumerate(studies):
|
|
accession_number = study.AccessionNumber
|
|
study_uid = study.StudyInstanceUID
|
|
logger.info(f"Processing study {i+1}/{len(studies)}: {accession_number} with Study_IUID ({study_uid})")
|
|
|
|
# Check if we should skip this study
|
|
log_file_path = os.path.join(log_dir, f"{study_uid}.json")
|
|
if skip_existing and os.path.exists(log_file_path):
|
|
logger.info(f"Skipping study {study_uid} - log file already exists")
|
|
continue
|
|
|
|
try:
|
|
result = process_single_study(study_uid, accession_number, log_dir)
|
|
|
|
if not result['success']:
|
|
logger.error(f"Failed to process study {study_uid}: {result.get('error', 'Unknown error')}")
|
|
continue
|
|
|
|
# Record results for HIS logs
|
|
if result.get('his_success', False):
|
|
his_log.append(result['study_log'])
|
|
else:
|
|
his_fail_log.append(result['study_log'])
|
|
|
|
# Save progress incrementally every 5 studies
|
|
if (i + 1) % 5 == 0:
|
|
if his_log:
|
|
save_json_data(his_log, his_log_filename, "logs")
|
|
if his_fail_log:
|
|
save_json_data(his_fail_log, his_fail_log_filename, "logs")
|
|
logger.info(f"Saved progress after processing {i+1} studies")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing study {accession_number}: {str(e)}")
|
|
continue
|
|
|
|
# Restore original signal handler
|
|
signal.signal(signal.SIGINT, original_sigint_handler)
|
|
|
|
# Final Save HIS logs
|
|
if his_log:
|
|
save_json_data(his_log, his_log_filename, "logs")
|
|
if his_fail_log:
|
|
save_json_data(his_fail_log, his_fail_log_filename, "logs")
|
|
|
|
logger.info(f"Completed processing {len(studies)} studies from {start_date} to {end_date}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during workflow processing: {str(e)}")
|
|
sys.exit(1)
|
|
|
|
def process_workflow_by_study(args):
|
|
"""
|
|
Process the full workflow for a specific study.
|
|
|
|
Args:
|
|
args: Command line arguments
|
|
"""
|
|
study_uid = args.study_uid
|
|
log_dir = args.log_dir or settings.JSON_OUTPUT_DIR
|
|
|
|
logger.info(f"Starting full workflow process for study: {study_uid}")
|
|
|
|
# Create directory for logs
|
|
create_directory_if_not_exists(log_dir)
|
|
create_directory_if_not_exists("logs")
|
|
|
|
# Register signal handler for SIGINT
|
|
original_sigint_handler = signal.getsignal(signal.SIGINT)
|
|
|
|
# Create empty containers for logs
|
|
his_log = []
|
|
his_fail_log = []
|
|
his_log_filename = f"sendtohis_study_{study_uid}.json"
|
|
his_fail_log_filename = f"fail_sendtohis_study_{study_uid}.json"
|
|
|
|
def sigint_handler(sig, frame):
|
|
logger.warning("Process interrupted by user. Saving logs before exiting...")
|
|
# Save individual study log
|
|
if 'result' in locals() and result.get('success', False):
|
|
save_json_data(result['study_log'], f"{study_uid}.json", log_dir)
|
|
|
|
# Save HIS logs
|
|
if result.get('his_success', False):
|
|
save_json_data([result['study_log']], his_log_filename, "logs")
|
|
logger.info(f"Saved successful log to {his_log_filename}")
|
|
else:
|
|
save_json_data([result['study_log']], his_fail_log_filename, "logs")
|
|
logger.info(f"Saved failed log to {his_fail_log_filename}")
|
|
|
|
# Restore original signal handler and exit
|
|
signal.signal(signal.SIGINT, original_sigint_handler)
|
|
sys.exit(1)
|
|
|
|
# Set up our custom signal handler
|
|
signal.signal(signal.SIGINT, sigint_handler)
|
|
|
|
try:
|
|
result = process_single_study(study_uid, log_dir=log_dir)
|
|
|
|
if not result['success']:
|
|
logger.error(f"Failed to process study {study_uid}: {result.get('error', 'Unknown error')}")
|
|
sys.exit(1)
|
|
|
|
# Save individual study log
|
|
save_json_data(result['study_log'], f"{study_uid}.json", log_dir)
|
|
|
|
# Save logs
|
|
if result.get('his_success', False):
|
|
save_json_data([result['study_log']], his_log_filename, "logs")
|
|
else:
|
|
save_json_data([result['study_log']], his_fail_log_filename, "logs")
|
|
|
|
logger.info(f"Completed processing study: {study_uid}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during study processing: {str(e)}")
|
|
sys.exit(1)
|
|
finally:
|
|
# Restore original signal handler
|
|
signal.signal(signal.SIGINT, original_sigint_handler)
|
|
|
|
def main():
|
|
"""Main function."""
|
|
# Register cleanup handlers for graceful exit
|
|
register_exit_handlers()
|
|
|
|
# Create necessary directories
|
|
create_directory_if_not_exists(settings.JSON_OUTPUT_DIR)
|
|
create_directory_if_not_exists(settings.DICOM_STORE_DIR)
|
|
|
|
# Register the main DICOM store directory for cleanup
|
|
if os.path.exists(settings.DICOM_STORE_DIR):
|
|
# Only register top-level directories in the DICOM store directory
|
|
for item in os.listdir(settings.DICOM_STORE_DIR):
|
|
item_path = os.path.join(settings.DICOM_STORE_DIR, item)
|
|
if os.path.isdir(item_path):
|
|
register_cleanup_dir(item_path)
|
|
|
|
# Parse arguments
|
|
args = parse_args()
|
|
|
|
if args.command == 'find-studies':
|
|
find_studies(args)
|
|
elif args.command == 'find-series':
|
|
find_series(args)
|
|
elif args.command == 'find-instance':
|
|
find_instance(args)
|
|
elif args.command == 'get-study':
|
|
get_study(args)
|
|
elif args.command == 'get-series':
|
|
get_series(args)
|
|
elif args.command == 'send-study':
|
|
send_study(args)
|
|
elif args.command == 'send-series':
|
|
send_series(args)
|
|
elif args.command == 'send-file':
|
|
send_file(args)
|
|
elif args.command == 'process':
|
|
process_workflow(args)
|
|
elif args.command == 'process-study':
|
|
process_workflow_by_study(args)
|
|
else:
|
|
logger.error("No command specified. Use --help for options.")
|
|
sys.exit(1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |