Files
pydicom-migrasi-clarity/main.py
2025-05-30 12:54:51 +07:00

661 lines
27 KiB
Python

#!/usr/bin/env python3
"""
Main entry point for DICOM data retrieval and routing automation.
"""
import os
import sys
import json
import argparse
import requests
from datetime import datetime
from config import settings
from utils.logger import main_logger as logger
from utils.dicom_utils import save_json_data, create_directory_if_not_exists
from utils.cleanup import register_exit_handlers, register_cleanup_dir
from services.dicom_finder import DicomFinder
from services.dicom_retriever import DicomRetriever
from services.dicom_sender import DicomSender
def parse_args():
"""Parse command line arguments."""
parser = argparse.ArgumentParser(description='DICOM Data Retrieval and Routing Automation')
# Command options
subparsers = parser.add_subparsers(dest='command', help='Command to execute')
# Find studies command
find_parser = subparsers.add_parser('find-studies', help='Find studies by date range')
find_parser.add_argument('--start-date', required=True, help='Start date in YYYYMMDD format')
find_parser.add_argument('--end-date', required=True, help='End date in YYYYMMDD format')
find_parser.add_argument('--output-file', help='JSON output file (default: studies_YYYYMMDD.json)')
# Find series command
series_parser = subparsers.add_parser('find-series', help='Find series for a study')
series_parser.add_argument('--study-uid', required=True, help='Study Instance UID')
series_parser.add_argument('--output-file', help='JSON output file (default: series_STUDYUID.json)')
# Find first instance command
instance_parser = subparsers.add_parser('find-instance', help='Find first instance for a series')
instance_parser.add_argument('--study-uid', required=True, help='Study Instance UID')
instance_parser.add_argument('--series-uid', required=True, help='Series Instance UID')
instance_parser.add_argument('--output-file', help='JSON output file (default: instance_SERIESUID.json)')
# Get study command
get_study_parser = subparsers.add_parser('get-study', help='Retrieve a complete study')
get_study_parser.add_argument('--study-uid', required=True, help='Study Instance UID to retrieve')
# Get series command
get_series_parser = subparsers.add_parser('get-series', help='Retrieve a specific series')
get_series_parser.add_argument('--study-uid', required=True, help='Study Instance UID')
get_series_parser.add_argument('--series-uid', required=True, help='Series Instance UID to retrieve')
# Send study command
send_study_parser = subparsers.add_parser('send-study', help='Send a complete study to destination PACS')
send_study_parser.add_argument('--study-uid', required=True, help='Study Instance UID to send')
send_study_parser.add_argument('--study-path', help='Path to study directory (default: DICOM_STORE_DIR/study_uid)')
# Send series command
send_series_parser = subparsers.add_parser('send-series', help='Send a specific series to destination PACS')
send_series_parser.add_argument('--study-uid', required=True, help='Study Instance UID')
send_series_parser.add_argument('--series-uid', required=True, help='Series Instance UID to send')
# Send file command
send_file_parser = subparsers.add_parser('send-file', help='Send a single DICOM file to destination PACS')
send_file_parser.add_argument('--file-path', required=True, help='Path to DICOM file to send')
# Add to the parse_args() function after the 'process' command
process_study_parser = subparsers.add_parser('process-study', help='Process full workflow for a specific study by StudyInstanceUID')
process_study_parser.add_argument('--study-uid', required=True, help='StudyInstanceUID to process')
process_study_parser.add_argument('--log-dir', help='Directory to save JSON logs (default: JSON_OUTPUT_DIR)')
# Process command - full workflow: find, get, and send DICOM data for a date range
process_parser = subparsers.add_parser('process', help='Process full workflow (find, get, send) for a date range')
process_parser.add_argument('--start-date', required=True, help='Start date in YYYYMMDD format')
process_parser.add_argument('--end-date', required=True, help='End date in YYYYMMDD format')
process_parser.add_argument('--log-dir', help='Directory to save JSON logs (default: JSON_OUTPUT_DIR)')
process_parser.add_argument('--skip-existing', action='store_true', help='Skip studies that already have JSON logs')
return parser.parse_args()
def find_studies(args):
"""
Find studies by date range and save results to JSON.
Args:
args: Command line arguments
"""
start_date = args.start_date
end_date = args.end_date
# Validate date format
try:
datetime.strptime(start_date, '%Y%m%d')
datetime.strptime(end_date, '%Y%m%d')
except ValueError:
logger.error("Invalid date format. Use YYYYMMDD format.")
sys.exit(1)
finder = DicomFinder()
try:
# Find studies
studies = finder.find_studies_by_date_range(start_date, end_date)
# Convert to list of dictionaries for JSON serialization
study_data = []
for study in studies:
study_dict = {
'StudyInstanceUID': study.StudyInstanceUID,
'StudyDate': study.StudyDate,
'StudyTime': getattr(study, 'StudyTime', ''),
'AccessionNumber': getattr(study, 'AccessionNumber', ''),
'PatientID': getattr(study, 'PatientID', ''), # MedrecID
'StudyDescription': getattr(study, 'StudyDescription', ''),
'NumberOfStudyRelatedSeries': getattr(study, 'NumberOfStudyRelatedSeries', '')
}
study_data.append(study_dict)
# Save to JSON
output_file = args.output_file or f"studies_{start_date}_to_{end_date}.json"
save_json_data(study_data, output_file, settings.JSON_OUTPUT_DIR)
logger.info(f"Found {len(study_data)} studies from {start_date} to {end_date}")
logger.info(f"Results saved to {os.path.join(settings.JSON_OUTPUT_DIR, output_file)}")
except Exception as e:
logger.error(f"Error finding studies: {str(e)}")
sys.exit(1)
def find_series(args):
"""
Find series for a study and save results to JSON.
Args:
args: Command line arguments
"""
study_uid = args.study_uid
finder = DicomFinder()
try:
# Find series
series_list = finder.find_series_for_study(study_uid)
# Convert to list of dictionaries for JSON serialization
series_data = []
for series in series_list:
series_dict = {
'StudyInstanceUID': series.StudyInstanceUID,
'SeriesInstanceUID': series.SeriesInstanceUID,
'SeriesNumber': getattr(series, 'SeriesNumber', ''),
'SeriesDescription': getattr(series, 'SeriesDescription', ''),
'Modality': getattr(series, 'Modality', ''),
'NumberOfSeriesRelatedInstances': getattr(series, 'NumberOfSeriesRelatedInstances', '')
}
series_data.append(series_dict)
# Save to JSON
output_file = args.output_file or f"series_{study_uid}.json"
save_json_data(series_data, output_file, settings.JSON_OUTPUT_DIR)
logger.info(f"Found {len(series_data)} series for study {study_uid}")
logger.info(f"Results saved to {os.path.join(settings.JSON_OUTPUT_DIR, output_file)}")
except Exception as e:
logger.error(f"Error finding series: {str(e)}")
sys.exit(1)
def find_instance(args):
"""
Find first instance for a series and save results to JSON.
Args:
args: Command line arguments
"""
study_uid = args.study_uid
series_uid = args.series_uid
finder = DicomFinder()
try:
# Find first instance
instance = finder.find_first_instance_for_series(study_uid, series_uid)
if instance:
# Convert to dictionary for JSON serialization
instance_dict = {
'StudyInstanceUID': instance.StudyInstanceUID,
'SeriesInstanceUID': instance.SeriesInstanceUID,
'SOPInstanceUID': instance.SOPInstanceUID,
'SOPClassUID': getattr(instance, 'SOPClassUID', ''),
'InstanceNumber': getattr(instance, 'InstanceNumber', '')
}
# Save to JSON
output_file = args.output_file or f"instance_{series_uid}.json"
save_json_data(instance_dict, output_file, settings.JSON_OUTPUT_DIR)
logger.info(f"Found first instance for series {series_uid}")
logger.info(f"Results saved to {os.path.join(settings.JSON_OUTPUT_DIR, output_file)}")
else:
logger.warning(f"No instances found for series {series_uid}")
except Exception as e:
logger.error(f"Error finding instance: {str(e)}")
sys.exit(1)
def get_study(args):
"""
Retrieve a complete study using C-GET.
Args:
args: Command line arguments
"""
study_uid = args.study_uid
logger.info(f"Starting retrieval of study: {study_uid}")
retriever = DicomRetriever()
try:
# Retrieve the study
result = retriever.retrieve_study(study_uid)
if result['success']:
logger.info(f"Successfully retrieved study {study_uid}")
logger.info(f"Retrieved {result['successful_instances']} of {result['total_instances']} instances")
logger.info(f"DICOM files stored in: {result['study_dir']}")
else:
logger.error(f"Failed to retrieve study {study_uid}: {result['status']}")
if result['error']:
logger.error(f"Error details: {result['error']}")
sys.exit(1)
except Exception as e:
logger.error(f"Error retrieving study: {str(e)}")
sys.exit(1)
def get_series(args):
"""
Retrieve a specific series using C-GET.
Args:
args: Command line arguments
"""
study_uid = args.study_uid
series_uid = args.series_uid
logger.info(f"Starting retrieval of series: {series_uid} from study: {study_uid}")
retriever = DicomRetriever()
try:
# Retrieve the series
result = retriever.retrieve_series(study_uid, series_uid)
if result['success']:
logger.info(f"Successfully retrieved series {series_uid}")
logger.info(f"Retrieved {result['successful_instances']} of {result['total_instances']} instances")
logger.info(f"DICOM files stored in: {result['series_dir']}")
else:
logger.error(f"Failed to retrieve series {series_uid}: {result['status']}")
if result['error']:
logger.error(f"Error details: {result['error']}")
sys.exit(1)
except Exception as e:
logger.error(f"Error retrieving series: {str(e)}")
sys.exit(1)
def send_study(args):
"""
Send a complete study to destination PACS using C-STORE.
Args:
args: Command line arguments
"""
study_uid = args.study_uid
study_path = args.study_path or os.path.join(settings.DICOM_STORE_DIR, study_uid)
logger.info(f"Starting sending of study: {study_uid} from {study_path}")
sender = DicomSender()
try:
# Send the study
result = sender.send_study(study_path)
if result['success']:
logger.info(f"Successfully sent study {study_uid}")
logger.info(f"Sent {result['successful_sends']} of {result['total_files']} files")
if result['failed_sends'] > 0:
logger.warning(f"Failed to send {result['failed_sends']} files")
else:
logger.error(f"Failed to send study {study_uid}")
if result.get('error'):
logger.error(f"Error details: {result['error']}")
sys.exit(1)
except Exception as e:
logger.error(f"Error sending study: {str(e)}")
sys.exit(1)
def send_series(args):
"""
Send a specific series to destination PACS using C-STORE.
Args:
args: Command line arguments
"""
study_uid = args.study_uid
series_uid = args.series_uid
logger.info(f"Starting sending of series: {series_uid} from study: {study_uid}")
sender = DicomSender()
try:
# Send the series
result = sender.send_series(study_uid, series_uid)
if result['success']:
logger.info(f"Successfully sent series {series_uid}")
logger.info(f"Sent {result['successful_sends']} of {result['total_files']} files")
if result['failed_sends'] > 0:
logger.warning(f"Failed to send {result['failed_sends']} files")
else:
logger.error(f"Failed to send series {series_uid}")
if result.get('error'):
logger.error(f"Error details: {result['error']}")
sys.exit(1)
except Exception as e:
logger.error(f"Error sending series: {str(e)}")
sys.exit(1)
def send_file(args):
"""
Send a single DICOM file to destination PACS using C-STORE.
Args:
args: Command line arguments
"""
file_path = args.file_path
logger.info(f"Starting sending of DICOM file: {file_path}")
sender = DicomSender()
try:
# Send the file
result = sender.send_file(file_path)
if result['success']:
logger.info(f"Successfully sent file {file_path}")
else:
logger.error(f"Failed to send file {file_path}: {result['status']}")
if result['error']:
logger.error(f"Error details: {result['error']}")
sys.exit(1)
except Exception as e:
logger.error(f"Error sending file: {str(e)}")
sys.exit(1)
def process_workflow(args):
"""
Process the full workflow: find studies by date range, retrieve them, and send to destination PACS.
Also creates detailed JSON logs for each series.
Args:
args: Command line arguments
"""
start_date = args.start_date
end_date = args.end_date
log_dir = args.log_dir or settings.JSON_OUTPUT_DIR
skip_existing = args.skip_existing
# Validate date format
try:
datetime.strptime(start_date, '%Y%m%d')
datetime.strptime(end_date, '%Y%m%d')
except ValueError:
logger.error("Invalid date format. Use YYYYMMDD format.")
sys.exit(1)
logger.info(f"Starting full workflow process for studies from {start_date} to {end_date}")
# Create services
finder = DicomFinder()
retriever = DicomRetriever()
sender = DicomSender()
# Create directory for logs
create_directory_if_not_exists(log_dir)
# STEP 1: Find studies in the date range
try:
studies = finder.find_studies_by_date_range(start_date, end_date)
logger.info(f"Found {len(studies)} studies between {start_date} and {end_date}")
# Save studies summary
studies_summary = [{
'StudyInstanceUID': study.StudyInstanceUID,
'StudyDate': getattr(study, 'StudyDate', ''),
'StudyTime': getattr(study, 'StudyTime', ''),
'AccessionNumber': getattr(study, 'AccessionNumber', ''),
'PatientID': getattr(study, 'PatientID', ''),
'StudyDescription': getattr(study, 'StudyDescription', '')
} for study in studies]
save_json_data(studies_summary, f"studies_{start_date}_to_{end_date}.json", log_dir)
# STEP 2, 3, 4: For each study, retrieve, send, and log details
his_log_filename = f"sendtohis_{start_date}_{end_date}.json"
his_log_path = os.path.join("logs", his_log_filename)
his_log = []
his_fail_log_filename = f"fail_sendtohis_{start_date}_{end_date}.json"
his_fail_log_path = os.path.join("logs", his_fail_log_filename)
his_fail_log = []
for i, study in enumerate(studies):
accession_number = study.AccessionNumber
study_uid = study.StudyInstanceUID
logger.info(f"Processing study {i+1}/{len(studies)}: {accession_number} with Study_IUID ({study_uid})")
# Check if we should skip this study
log_file_path = os.path.join(log_dir, f"{study_uid}.json")
if skip_existing and os.path.exists(log_file_path):
logger.info(f"Skipping study {study_uid} - log file already exists")
continue
# STEP 2: Retrieve the study
try:
retrieve_result = retriever.retrieve_study(study_uid, accession_number=accession_number)
if not retrieve_result['success']:
logger.error(f"Failed to retrieve study {study_uid}: {retrieve_result['status']}")
continue
logger.info(f"Retrieved {retrieve_result['successful_instances']} instances for study {study_uid}")
# STEP 3: Send the study to destination PACS
send_result = sender.send_study(os.path.join(settings.DICOM_STORE_DIR, study_uid))
if not send_result['success']:
logger.error(f"Failed to send study {study_uid}: {send_result.get('error', 'Unknown error')}")
logger.info(f"Sent {send_result['successful_sends']} of {send_result['total_files']} files to destination PACS")
# STEP 4: Find all series and create detailed logs
series_list = finder.find_series_for_study(study_uid, accession_number=accession_number)
study_date = getattr(study, 'StudyDate', '')
study_time = getattr(study, 'StudyTime', '000000') # Default to midnight if StudyTime is not available
study_datetime = f"{study_date}{study_time}"
study_log = {
'Study_IUID': study_uid,
'AccessionNumber': getattr(study, 'AccessionNumber', ''),
'PatientID': getattr(study, 'PatientID', ''), # MedrecID
'StudyDescription': getattr(study, 'StudyDescription', ''),
'StudyDateTime': study_datetime,
'CstoreSuccess': send_result['success'],
'Series': []
}
for series in series_list:
series_uid = series.SeriesInstanceUID
# Get first instance for the series
instance = finder.find_first_instance_for_series(study_uid, series_uid)
series_info = {
'Series_IUID': getattr(series, 'SeriesInstanceUID', ''),
'SeriesNumber': getattr(series, 'SeriesNumber', ''),
'SeriesDescription': getattr(series, 'SeriesDescription', ''),
'NumberOfInstances': getattr(series, 'NumberOfSeriesRelatedInstances', ''),
'SOP_IUID': getattr(instance, 'SOPInstanceUID', '') if instance else ''
}
study_log['Series'].append(series_info)
# Save the detailed log
# save_json_data(study_log, f"{study_uid}.json", log_dir)
# logger.info(f"Created detailed log for study {study_uid}")
# STEP 5: Send study_log to HIS API
his_url = f"http://{settings.HIS_HOST}{settings.HIS_URL}"
try:
response = requests.post(his_url, json=study_log)
if response.status_code == 200 and response.json().get('OK') == "1":
his_log.append(study_log)
logger.info(f"Successfully sent JSON {accession_number} to HIS API")
else:
his_fail_log.append(study_log)
logger.error(f"Failed to send JSON for {accession_number} to HIS API: {response.msg}. Study_IUID: {study_uid}")
except Exception as e:
logger.error(f"Error sending study for {accession_number} to HIS API: {str(e)}")
except Exception as e:
logger.error(f"Error processing study {accession_number}: {str(e)}")
continue
# Save HIS log
if his_log:
save_json_data(his_log, his_log_filename, "logs")
if his_fail_log:
save_json_data(his_fail_log, his_fail_log_filename, "logs")
logger.info(f"Completed processing {len(studies)} studies from {start_date} to {end_date}")
except Exception as e:
logger.error(f"Error during workflow processing: {str(e)}")
sys.exit(1)
def process_workflow_by_study(args):
"""
Process the full workflow for a specific study: retrieve it, send to destination PACS,
and create detailed JSON logs.
Args:
args: Command line arguments
"""
study_uid = args.study_uid
log_dir = args.log_dir or settings.JSON_OUTPUT_DIR
logger.info(f"Starting full workflow process for study: {study_uid}")
# Create services
finder = DicomFinder()
retriever = DicomRetriever()
sender = DicomSender()
# Create directory for logs
create_directory_if_not_exists(log_dir)
# Process the study
try:
# First, get the study metadata with a query
study_metadata = finder.find_study_by_uid(study_uid)
if not study_metadata:
logger.error(f"Study not found: {study_uid}")
sys.exit(1)
accession_number = getattr(study_metadata, 'AccessionNumber', '')
logger.info(f"Processing study: {accession_number} with Study_IUID ({study_uid})")
# STEP 1: Retrieve the study
retrieve_result = retriever.retrieve_study(study_uid, accession_number=accession_number)
if not retrieve_result['success']:
logger.error(f"Failed to retrieve study {study_uid}: {retrieve_result['status']}")
sys.exit(1)
logger.info(f"Retrieved {retrieve_result['successful_instances']} instances for study {study_uid}")
# STEP 2: Send the study to destination PACS
send_result = sender.send_study(os.path.join(settings.DICOM_STORE_DIR, study_uid))
if not send_result['success']:
logger.error(f"Failed to send study {study_uid}: {send_result.get('error', 'Unknown error')}")
logger.info(f"Sent {send_result['successful_sends']} of {send_result['total_files']} files to destination PACS")
# STEP 3: Find all series and create detailed logs
series_list = finder.find_series_for_study(study_uid, accession_number=accession_number)
study_date = getattr(study_metadata, 'StudyDate', '')
study_time = getattr(study_metadata, 'StudyTime', '000000') # Default to midnight if StudyTime is not available
study_datetime = f"{study_date}{study_time}"
study_log = {
'Study_IUID': study_uid,
'AccessionNumber': getattr(study_metadata, 'AccessionNumber', ''),
'PatientID': getattr(study_metadata, 'PatientID', ''), # MedrecID
'StudyDescription': getattr(study_metadata, 'StudyDescription', ''),
'StudyDateTime': study_datetime,
'CstoreSuccess': send_result['success'],
'Series': []
}
for series in series_list:
series_uid = series.SeriesInstanceUID
# Get first instance for the series
instance = finder.find_first_instance_for_series(study_uid, series_uid)
series_info = {
'Series_IUID': getattr(series, 'SeriesInstanceUID', ''),
'SeriesNumber': getattr(series, 'SeriesNumber', ''),
'SeriesDescription': getattr(series, 'SeriesDescription', ''),
'NumberOfInstances': getattr(series, 'NumberOfSeriesRelatedInstances', ''),
'SOP_IUID': getattr(instance, 'SOPInstanceUID', '') if instance else ''
}
study_log['Series'].append(series_info)
# STEP 4: Send study_log to HIS API
his_url = f"http://{settings.HIS_HOST}{settings.HIS_URL}"
try:
response = requests.post(his_url, json=study_log)
if response.status_code == 200 and response.json().get('OK') == "1":
logger.info(f"Successfully sent JSON {accession_number} to HIS API")
# Save successful log
save_json_data([study_log], f"sendtohis_study_{study_uid}.json", "logs")
else:
logger.error(f"Failed to send JSON for {accession_number} to HIS API. Study_IUID: {study_uid}")
# Save failed log
save_json_data([study_log], f"fail_sendtohis_study_{study_uid}.json", "logs")
except Exception as e:
logger.error(f"Error sending study for {accession_number} to HIS API: {str(e)}")
logger.info(f"Completed processing study: {study_uid}")
except Exception as e:
logger.error(f"Error during study processing: {str(e)}")
sys.exit(1)
def main():
"""Main function."""
# Register cleanup handlers for graceful exit
register_exit_handlers()
# Create necessary directories
create_directory_if_not_exists(settings.JSON_OUTPUT_DIR)
create_directory_if_not_exists(settings.DICOM_STORE_DIR)
# Register the main DICOM store directory for cleanup
if os.path.exists(settings.DICOM_STORE_DIR):
# Only register top-level directories in the DICOM store directory
for item in os.listdir(settings.DICOM_STORE_DIR):
item_path = os.path.join(settings.DICOM_STORE_DIR, item)
if os.path.isdir(item_path):
register_cleanup_dir(item_path)
# Parse arguments
args = parse_args()
if args.command == 'find-studies':
find_studies(args)
elif args.command == 'find-series':
find_series(args)
elif args.command == 'find-instance':
find_instance(args)
elif args.command == 'get-study':
get_study(args)
elif args.command == 'get-series':
get_series(args)
elif args.command == 'send-study':
send_study(args)
elif args.command == 'send-series':
send_series(args)
elif args.command == 'send-file':
send_file(args)
elif args.command == 'process':
process_workflow(args)
elif args.command == 'process-study':
process_workflow_by_study(args)
else:
logger.error("No command specified. Use --help for options.")
sys.exit(1)
if __name__ == "__main__":
main()